|
9 | 9 |
|
10 | 10 | import os |
11 | 11 | from unittest import TestCase |
| 12 | +from unittest.mock import MagicMock |
| 13 | +from unittest.mock import patch |
12 | 14 |
|
| 15 | +from vulnerabilities.pipelines.v2_importers.alpine_security_importer import ( |
| 16 | + AlpineSecurityImporterPipeline, |
| 17 | +) |
13 | 18 | from vulnerabilities.pipelines.v2_importers.alpine_security_importer import parse_advisory |
14 | 19 | from vulnerabilities.tests import util_tests |
15 | 20 | from vulnerabilities.utils import load_json |
@@ -83,3 +88,28 @@ def test_parse_advisory_skips_unfixed_states(self): |
83 | 88 | result = parse_advisory(data) |
84 | 89 | self.assertIsNotNone(result) |
85 | 90 | self.assertEqual(result.affected_packages, []) |
| 91 | + |
| 92 | + |
| 93 | +class TestAlpineSecurityImporterPipeline(TestCase): |
| 94 | + @patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.get_branches") |
| 95 | + @patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.requests.get") |
| 96 | + def test_collect_advisories_yields_advisory(self, mock_get, mock_branches): |
| 97 | + mock_branches.return_value = ["3.19-main"] |
| 98 | + data = load_json(os.path.join(TEST_DATA, "alpine_security_mock1.json")) |
| 99 | + resp = MagicMock() |
| 100 | + resp.json.return_value = {"items": [data]} |
| 101 | + resp.raise_for_status.return_value = None |
| 102 | + mock_get.return_value = resp |
| 103 | + advisories = list(AlpineSecurityImporterPipeline().collect_advisories()) |
| 104 | + self.assertGreater(len(advisories), 0) |
| 105 | + |
| 106 | + @patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.get_branches") |
| 107 | + @patch("vulnerabilities.pipelines.v2_importers.alpine_security_importer.requests.get") |
| 108 | + def test_collect_advisories_http_error_logs_and_continues(self, mock_get, mock_branches): |
| 109 | + mock_branches.return_value = ["3.19-main"] |
| 110 | + mock_get.side_effect = Exception("timeout") |
| 111 | + logger_name = "vulnerabilities.pipelines.v2_importers.alpine_security_importer" |
| 112 | + with self.assertLogs(logger_name, level="ERROR") as cm: |
| 113 | + advisories = list(AlpineSecurityImporterPipeline().collect_advisories()) |
| 114 | + self.assertEqual(advisories, []) |
| 115 | + self.assertTrue(any("timeout" in msg for msg in cm.output)) |
0 commit comments