Skip to content

Commit 7fe2c65

Browse files
committed
Add AdvisoryV2 API endpoint to api/v2/ #2224
Signed-off-by: shivamshrma09 <shivamsharma27107@gmail.com>
1 parent 963058f commit 7fe2c65

File tree

3 files changed

+208
-0
lines changed

3 files changed

+208
-0
lines changed

vulnerabilities/api_v2.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,6 +1064,83 @@ def get_view_name(self):
10641064
return "Pipeline Jobs"
10651065

10661066

1067+
class AdvisoryV2FilterSet(filters.FilterSet):
1068+
alias = CharInFilter(
1069+
field_name="aliases__alias",
1070+
lookup_expr="in",
1071+
label="Alias",
1072+
help_text="Filter by one or more aliases (e.g. CVE-2021-1234). Multi-value supported (comma-separated).",
1073+
)
1074+
advisory_id = CharInFilter(
1075+
field_name="avid",
1076+
lookup_expr="in",
1077+
label="Advisory ID",
1078+
help_text="Filter by one or more advisory IDs (avid). Multi-value supported (comma-separated).",
1079+
)
1080+
datasource_id = filters.CharFilter(
1081+
field_name="datasource_id",
1082+
label="Datasource ID",
1083+
help_text="Filter by datasource ID (e.g. nginx_importer_v2).",
1084+
)
1085+
1086+
class Meta:
1087+
model = AdvisoryV2
1088+
fields = ["alias", "advisory_id", "datasource_id"]
1089+
1090+
1091+
@extend_schema_view(
1092+
list=extend_schema(
1093+
parameters=[
1094+
OpenApiParameter(
1095+
name="alias",
1096+
description="Filter by one or more aliases (e.g. CVE-2021-1234). Comma-separated.",
1097+
required=False,
1098+
type={"type": "array", "items": {"type": "string"}},
1099+
location=OpenApiParameter.QUERY,
1100+
),
1101+
OpenApiParameter(
1102+
name="advisory_id",
1103+
description="Filter by one or more advisory IDs (avid). Comma-separated.",
1104+
required=False,
1105+
type={"type": "array", "items": {"type": "string"}},
1106+
location=OpenApiParameter.QUERY,
1107+
),
1108+
OpenApiParameter(
1109+
name="datasource_id",
1110+
description="Filter by datasource ID.",
1111+
required=False,
1112+
type=str,
1113+
location=OpenApiParameter.QUERY,
1114+
),
1115+
]
1116+
)
1117+
)
1118+
class AdvisoryV2ViewSet(viewsets.ReadOnlyModelViewSet):
1119+
"""
1120+
Lookup for advisories by advisory ID, alias, or datasource.
1121+
"""
1122+
1123+
queryset = (
1124+
AdvisoryV2.objects.prefetch_related(
1125+
"aliases",
1126+
"references",
1127+
"severities",
1128+
"weaknesses",
1129+
"related_ssvcs",
1130+
"source_ssvcs",
1131+
)
1132+
.order_by("datasource_id", "advisory_id")
1133+
.distinct()
1134+
)
1135+
serializer_class = AdvisoryV2Serializer
1136+
lookup_field = "avid"
1137+
# avid contains slashes (e.g. nginx_importer_v2/CVE-2021-1234)
1138+
lookup_value_regex = r"[^/]+/[^/]+"
1139+
filter_backends = [filters.DjangoFilterBackend]
1140+
filterset_class = AdvisoryV2FilterSet
1141+
throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle]
1142+
1143+
10671144
class PackageV3ViewSet(viewsets.ReadOnlyModelViewSet):
10681145
queryset = PackageV2.objects.all()
10691146
serializer_class = PackageV3Serializer

vulnerabilities/tests/test_api_v2.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
from rest_framework.test import APIClient
1818
from rest_framework.test import APITestCase
1919

20+
from vulnerabilities.api_v2 import AdvisoryV2Serializer
2021
from vulnerabilities.api_v2 import PackageV2Serializer
2122
from vulnerabilities.api_v2 import VulnerabilityListSerializer
23+
from vulnerabilities.models import AdvisoryAlias
2224
from vulnerabilities.models import AdvisoryV2
2325
from vulnerabilities.models import Alias
2426
from vulnerabilities.models import ApiUser
@@ -905,3 +907,130 @@ def test_get_all_vulnerable_purls(self):
905907
response = self.client.get(url)
906908
assert response.status_code == 200
907909
assert "pkg:pypi/sample@1.0.0" in response.data
910+
911+
912+
class AdvisoryV2ViewSetTest(APITestCase):
913+
def setUp(self):
914+
self.advisory1 = AdvisoryV2.objects.create(
915+
datasource_id="nginx_importer_v2",
916+
advisory_id="CVE-2021-1234",
917+
avid="nginx_importer_v2/CVE-2021-1234",
918+
unique_content_id="a" * 64,
919+
url="https://example.com/advisory1",
920+
date_collected="2024-01-01T00:00:00Z",
921+
summary="Test advisory 1",
922+
)
923+
self.advisory2 = AdvisoryV2.objects.create(
924+
datasource_id="pypa_importer_v2",
925+
advisory_id="PYSEC-2022-5678",
926+
avid="pypa_importer_v2/PYSEC-2022-5678",
927+
unique_content_id="b" * 64,
928+
url="https://example.com/advisory2",
929+
date_collected="2024-01-01T00:00:00Z",
930+
summary="Test advisory 2",
931+
)
932+
933+
self.alias1 = AdvisoryAlias.objects.create(alias="CVE-2021-1234")
934+
self.advisory1.aliases.add(self.alias1)
935+
936+
self.alias2 = AdvisoryAlias.objects.create(alias="GHSA-xxxx-yyyy-zzzz")
937+
self.advisory2.aliases.add(self.alias2)
938+
939+
cache.clear()
940+
self.client = APIClient(enforce_csrf_checks=True)
941+
942+
def test_list_advisories(self):
943+
"""
944+
Test listing all advisories without filters.
945+
"""
946+
url = reverse("advisory-v2-list")
947+
response = self.client.get(url, format="json")
948+
self.assertEqual(response.status_code, status.HTTP_200_OK)
949+
self.assertIn("results", response.data)
950+
self.assertEqual(response.data["count"], 2)
951+
952+
def test_retrieve_advisory_by_avid(self):
953+
"""
954+
Test retrieving a specific advisory by its avid.
955+
The avid contains a slash, handled by lookup_value_regex.
956+
"""
957+
url = reverse("advisory-v2-detail", kwargs={"avid": self.advisory1.avid})
958+
response = self.client.get(url, format="json")
959+
self.assertEqual(response.status_code, status.HTTP_200_OK)
960+
self.assertEqual(response.data["advisory_id"], self.advisory1.avid)
961+
self.assertEqual(response.data["url"], self.advisory1.url)
962+
self.assertIn("CVE-2021-1234", response.data["aliases"])
963+
964+
def test_retrieve_nonexistent_advisory_returns_404(self):
965+
"""
966+
Test that a non-existent advisory returns 404.
967+
"""
968+
url = reverse("advisory-v2-detail", kwargs={"avid": "fake_source/FAKE-0000"})
969+
response = self.client.get(url, format="json")
970+
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
971+
972+
def test_filter_by_alias(self):
973+
"""
974+
Test filtering advisories by alias returns only matching advisory.
975+
"""
976+
url = reverse("advisory-v2-list")
977+
response = self.client.get(url, {"alias": "CVE-2021-1234"}, format="json")
978+
self.assertEqual(response.status_code, status.HTTP_200_OK)
979+
self.assertEqual(response.data["count"], 1)
980+
result = response.data["results"][0]
981+
self.assertIn("CVE-2021-1234", result["aliases"])
982+
983+
def test_filter_by_advisory_id(self):
984+
"""
985+
Test filtering advisories by advisory_id (avid).
986+
"""
987+
url = reverse("advisory-v2-list")
988+
response = self.client.get(
989+
url, {"advisory_id": "nginx_importer_v2/CVE-2021-1234"}, format="json"
990+
)
991+
self.assertEqual(response.status_code, status.HTTP_200_OK)
992+
self.assertEqual(response.data["count"], 1)
993+
self.assertEqual(response.data["results"][0]["advisory_id"], self.advisory1.avid)
994+
995+
def test_filter_by_datasource_id(self):
996+
"""
997+
Test filtering advisories by datasource_id returns only that source's advisories.
998+
"""
999+
url = reverse("advisory-v2-list")
1000+
response = self.client.get(url, {"datasource_id": "nginx_importer_v2"}, format="json")
1001+
self.assertEqual(response.status_code, status.HTTP_200_OK)
1002+
self.assertEqual(response.data["count"], 1)
1003+
self.assertEqual(response.data["results"][0]["advisory_id"], self.advisory1.avid)
1004+
1005+
def test_filter_by_nonexistent_alias_returns_empty(self):
1006+
"""
1007+
Test that filtering by a non-existent alias returns an empty list.
1008+
"""
1009+
url = reverse("advisory-v2-list")
1010+
response = self.client.get(url, {"alias": "CVE-9999-9999"}, format="json")
1011+
self.assertEqual(response.status_code, status.HTTP_200_OK)
1012+
self.assertEqual(response.data["count"], 0)
1013+
1014+
def test_advisory_serializer_fields(self):
1015+
"""
1016+
Test that AdvisoryV2Serializer returns all required fields.
1017+
"""
1018+
serializer = AdvisoryV2Serializer(self.advisory1)
1019+
data = serializer.data
1020+
expected_fields = [
1021+
"advisory_id",
1022+
"url",
1023+
"aliases",
1024+
"summary",
1025+
"severities",
1026+
"weaknesses",
1027+
"references",
1028+
"exploitability",
1029+
"weighted_severity",
1030+
"risk_score",
1031+
"related_ssvc_trees",
1032+
]
1033+
for field in expected_fields:
1034+
self.assertIn(field, data)
1035+
self.assertEqual(data["advisory_id"], self.advisory1.avid)
1036+
self.assertIn("CVE-2021-1234", data["aliases"])

vulnerablecode/urls.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from vulnerabilities.api import CPEViewSet
2121
from vulnerabilities.api import PackageViewSet
2222
from vulnerabilities.api import VulnerabilityViewSet
23+
from vulnerabilities.api_v2 import AdvisoryV2ViewSet
2324
from vulnerabilities.api_v2 import CodeFixV2ViewSet
2425
from vulnerabilities.api_v2 import CodeFixViewSet
2526
from vulnerabilities.api_v2 import PackageV2ViewSet
@@ -66,6 +67,7 @@ def __init__(self, *args, **kwargs):
6667
api_v2_router.register("codefixes", CodeFixViewSet, basename="codefix")
6768
api_v2_router.register("pipelines", PipelineScheduleV2ViewSet, basename="pipelines")
6869
api_v2_router.register("advisory-codefixes", CodeFixV2ViewSet, basename="advisory-codefix")
70+
api_v2_router.register("advisories", AdvisoryV2ViewSet, basename="advisory-v2")
6971

7072
api_v3_router = OptionalSlashRouter()
7173

0 commit comments

Comments
 (0)