Skip to content

Commit 485f3e2

Browse files
authored
Merge pull request #2159 from aboutcode-org/export-v3
Add pipeline to federate package vulnerabilities
2 parents 37f1312 + 365f6ce commit 485f3e2

File tree

13 files changed

+670
-10
lines changed

13 files changed

+670
-10
lines changed

aboutcode/federated/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,7 @@ def large_size_configs(cls):
10281028
"mlflow": 16,
10291029
"pub": 16,
10301030
"rpm": 16,
1031-
# Small Ecosystem all use the defaul
1031+
# Small Ecosystem all use the default
10321032
"default": 1,
10331033
}
10341034
return [
@@ -1069,7 +1069,7 @@ def medium_size_configs(cls):
10691069
"mlflow": 8,
10701070
"pub": 8,
10711071
"rpm": 8,
1072-
# Small Ecosystem all use the defaul
1072+
# Small Ecosystem all use the default
10731073
"default": 1,
10741074
}
10751075
return [
@@ -1110,7 +1110,7 @@ def small_size_configs(cls):
11101110
"mlflow": 4,
11111111
"pub": 4,
11121112
"rpm": 4,
1113-
# Small Ecosystem all use the defaul
1113+
# Small Ecosystem all use the default
11141114
"default": 1,
11151115
}
11161116
return [
@@ -1181,7 +1181,7 @@ def cluster_preset():
11811181
DataCluster(
11821182
data_kind="security_advisories",
11831183
description="VulnerableCode security advisories for each package version.",
1184-
datafile_path_template="{/namespace}/{name}/{version}/advisories.json",
1184+
datafile_path_template="{/namespace}/{name}/{version}/advisories.yml",
11851185
purl_type_configs=[PurlTypeConfig.default_config()],
11861186
data_schema_url="",
11871187
documentation_url="",

aboutcode/federated/tests/test_data/all-presets/foo/aboutcode-federated-config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -933,7 +933,7 @@ data_clusters:
933933
data_license: CC-BY-4.0
934934
data_maintainers: []
935935
- data_kind: security_advisories
936-
datafile_path_template: '{/namespace}/{name}/{version}/advisories.json'
936+
datafile_path_template: '{/namespace}/{name}/{version}/advisories.yml'
937937
purl_type_configs:
938938
- purl_type: default
939939
number_of_repos: 1

vulnerabilities/models.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2344,13 +2344,14 @@ def save(self, *args, **kwargs):
23442344
@property
23452345
def pipeline_class(self):
23462346
"""Return the pipeline class."""
2347+
23472348
from vulnerabilities.importers import IMPORTERS_REGISTRY
23482349
from vulnerabilities.improvers import IMPROVERS_REGISTRY
2350+
from vulnerabilities.pipelines.exporters import EXPORTERS_REGISTRY
2351+
2352+
pipeline_registry = IMPORTERS_REGISTRY | IMPROVERS_REGISTRY | EXPORTERS_REGISTRY
23492353

2350-
if self.pipeline_id in IMPROVERS_REGISTRY:
2351-
return IMPROVERS_REGISTRY.get(self.pipeline_id)
2352-
if self.pipeline_id in IMPORTERS_REGISTRY:
2353-
return IMPORTERS_REGISTRY.get(self.pipeline_id)
2354+
return pipeline_registry[self.pipeline_id]
23542355

23552356
@property
23562357
def description(self):

vulnerabilities/pipelines/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ def log(self, message, level=logging.INFO):
141141
class VulnerableCodePipeline(PipelineDefinition, BasePipelineRun):
142142
pipeline_id = None # Unique Pipeline ID
143143

144+
# When set to true pipeline is run only once.
145+
# To rerun onetime pipeline reset is_active field to True via migration.
146+
run_once = False
147+
144148
def on_failure(self):
145149
"""
146150
Tasks to run in the event that pipeline execution fails.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright (c) nexB Inc. and others. All rights reserved.
2+
# VulnerableCode is a trademark of nexB Inc.
3+
# SPDX-License-Identifier: Apache-2.0
4+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
5+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
6+
# See https://aboutcode.org for more information about nexB OSS projects.
7+
#
8+
9+
from vulnerabilities.pipelines.exporters import federate_vulnerabilities
10+
from vulnerabilities.utils import create_registry
11+
12+
EXPORTERS_REGISTRY = create_registry(
13+
[
14+
federate_vulnerabilities.FederatePackageVulnerabilities,
15+
]
16+
)
Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
# Copyright (c) nexB Inc. and others. All rights reserved.
2+
# VulnerableCode is a trademark of nexB Inc.
3+
# SPDX-License-Identifier: Apache-2.0
4+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
5+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
6+
# See https://aboutcode.org for more information about nexB OSS projects.
7+
#
8+
9+
10+
import itertools
11+
import shutil
12+
from operator import attrgetter
13+
from pathlib import Path
14+
15+
import saneyaml
16+
from aboutcode.pipeline import LoopProgress
17+
from django.conf import settings
18+
from django.db.models import Prefetch
19+
20+
from aboutcode.federated import DataFederation
21+
from vulnerabilities.models import AdvisoryV2
22+
from vulnerabilities.models import ImpactedPackage
23+
from vulnerabilities.models import PackageV2
24+
from vulnerabilities.pipelines import VulnerableCodePipeline
25+
from vulnerabilities.pipes import federatedcode
26+
27+
28+
class FederatePackageVulnerabilities(VulnerableCodePipeline):
29+
"""Export package vulnerabilities and advisory to FederatedCode."""
30+
31+
pipeline_id = "federate_vulnerabilities_v2"
32+
33+
@classmethod
34+
def steps(cls):
35+
return (
36+
cls.check_federatedcode_eligibility,
37+
cls.create_federatedcode_working_dir,
38+
cls.fetch_federation_config,
39+
cls.clone_federation_repository,
40+
cls.publish_package_related_advisories,
41+
cls.publish_advisories,
42+
cls.delete_working_dir,
43+
)
44+
45+
def check_federatedcode_eligibility(self):
46+
"""Check if FederatedCode is configured."""
47+
federatedcode.check_federatedcode_configured_and_available(self.log)
48+
49+
def create_federatedcode_working_dir(self):
50+
"""Create temporary working dir."""
51+
self.working_path = federatedcode.create_federatedcode_working_dir()
52+
53+
def fetch_federation_config(self):
54+
"""Fetch config for PackageURL Federation."""
55+
data_federation = DataFederation.from_url(
56+
name="aboutcode-data",
57+
remote_root_url="https://github.com/aboutcode-data",
58+
)
59+
self.data_cluster = data_federation.get_cluster("security_advisories")
60+
61+
def clone_federation_repository(self):
62+
self.repo = federatedcode.clone_repository(
63+
repo_url=settings.FEDERATEDCODE_VULNERABILITIES_REPO,
64+
clone_path=self.working_path / "advisories-data",
65+
logger=self.log,
66+
)
67+
68+
def publish_package_related_advisories(self):
69+
"""Publish package advisories relations to FederatedCode"""
70+
repo_path = Path(self.repo.working_dir)
71+
commit_count = 1
72+
batch_size = 2000
73+
chunk_size = 500
74+
files_to_commit = set()
75+
76+
distinct_packages_count = (
77+
PackageV2.objects.values("type", "namespace", "name", "version")
78+
.distinct("type", "namespace", "name", "version")
79+
.count()
80+
)
81+
package_qs = package_prefetched_qs()
82+
grouped_packages = itertools.groupby(
83+
package_qs.iterator(chunk_size=chunk_size),
84+
key=attrgetter("type", "namespace", "name", "version"),
85+
)
86+
87+
self.log(f"Exporting advisory relation for {distinct_packages_count} packages.")
88+
progress = LoopProgress(
89+
total_iterations=distinct_packages_count,
90+
progress_step=5,
91+
logger=self.log,
92+
)
93+
for _, packages in progress.iter(grouped_packages):
94+
purl, package_vulnerabilities = get_package_related_advisory(packages)
95+
package_repo, datafile_path = self.data_cluster.get_datafile_repo_and_path(purl)
96+
package_vulnerability_path = f"packages/{package_repo}/{datafile_path}"
97+
98+
write_file(
99+
repo_path=repo_path,
100+
file_path=package_vulnerability_path,
101+
data=package_vulnerabilities,
102+
)
103+
files_to_commit.add(package_vulnerability_path)
104+
105+
if len(files_to_commit) > batch_size:
106+
if federatedcode.commit_and_push_changes(
107+
commit_message=self.commit_message("package advisory relations", commit_count),
108+
repo=self.repo,
109+
files_to_commit=files_to_commit,
110+
logger=self.log,
111+
):
112+
commit_count += 1
113+
files_to_commit.clear()
114+
115+
if files_to_commit:
116+
federatedcode.commit_and_push_changes(
117+
commit_message=self.commit_message(
118+
"package advisory relations",
119+
commit_count,
120+
commit_count,
121+
),
122+
repo=self.repo,
123+
files_to_commit=files_to_commit,
124+
logger=self.log,
125+
)
126+
127+
self.log(f"Federated {distinct_packages_count} package advisories.")
128+
129+
def publish_advisories(self):
130+
"""Publish advisory to FederatedCode"""
131+
repo_path = Path(self.repo.working_dir)
132+
commit_count = 1
133+
batch_size = 2000
134+
chunk_size = 1000
135+
files_to_commit = set()
136+
advisory_qs = advisory_prefetched_qs()
137+
advisory_count = advisory_qs.count()
138+
139+
self.log(f"Exporting {advisory_count} advisory.")
140+
progress = LoopProgress(
141+
total_iterations=advisory_count,
142+
progress_step=5,
143+
logger=self.log,
144+
)
145+
for advisory in progress.iter(advisory_qs.iterator(chunk_size=chunk_size)):
146+
advisory_data = serialize_advisory(advisory)
147+
adv_file = f"advisories/{advisory.avid}.yml"
148+
write_file(
149+
repo_path=repo_path,
150+
file_path=adv_file,
151+
data=advisory_data,
152+
)
153+
files_to_commit.add(adv_file)
154+
155+
if len(files_to_commit) > batch_size:
156+
if federatedcode.commit_and_push_changes(
157+
commit_message=self.commit_message("advisories", commit_count),
158+
repo=self.repo,
159+
files_to_commit=files_to_commit,
160+
logger=self.log,
161+
):
162+
commit_count += 1
163+
files_to_commit.clear()
164+
165+
if files_to_commit:
166+
federatedcode.commit_and_push_changes(
167+
commit_message=self.commit_message(
168+
"advisories",
169+
commit_count,
170+
commit_count,
171+
),
172+
repo=self.repo,
173+
files_to_commit=files_to_commit,
174+
logger=self.log,
175+
)
176+
177+
self.log(f"Successfully federated {advisory_count} advisories.")
178+
179+
def delete_working_dir(self):
180+
"""Remove temporary working dir."""
181+
if hasattr(self, "working_path") and self.working_path:
182+
shutil.rmtree(self.working_path)
183+
184+
def on_failure(self):
185+
self.delete_working_dir()
186+
187+
def commit_message(
188+
self,
189+
item_type,
190+
commit_count,
191+
total_commit_count="many",
192+
):
193+
"""Commit message for pushing package vulnerability."""
194+
return federatedcode.commit_message(
195+
item_type=item_type,
196+
commit_count=commit_count,
197+
total_commit_count=total_commit_count,
198+
)
199+
200+
201+
def package_prefetched_qs():
202+
return (
203+
PackageV2.objects.order_by("type", "namespace", "name", "version")
204+
.only("package_url", "type", "namespace", "name", "version")
205+
.prefetch_related(
206+
Prefetch(
207+
"affected_in_impacts",
208+
queryset=ImpactedPackage.objects.only("advisory_id").prefetch_related(
209+
Prefetch(
210+
"advisory",
211+
queryset=AdvisoryV2.objects.only("avid"),
212+
)
213+
),
214+
),
215+
Prefetch(
216+
"fixed_in_impacts",
217+
queryset=ImpactedPackage.objects.only("advisory_id").prefetch_related(
218+
Prefetch(
219+
"advisory",
220+
queryset=AdvisoryV2.objects.only("avid"),
221+
)
222+
),
223+
),
224+
)
225+
)
226+
227+
228+
def get_package_related_advisory(packages):
229+
package_vulnerabilities = []
230+
for package in packages:
231+
affected_by_vulnerabilities = [
232+
impact.advisory.avid for impact in package.affected_in_impacts.all()
233+
]
234+
fixing_vulnerabilities = [impact.advisory.avid for impact in package.fixed_in_impacts.all()]
235+
236+
package_vulnerability = {
237+
"purl": package.package_url,
238+
"affected_by_advisories": sorted(affected_by_vulnerabilities),
239+
"fixing_advisories": sorted(fixing_vulnerabilities),
240+
}
241+
package_vulnerabilities.append(package_vulnerability)
242+
243+
return package.package_url, package_vulnerabilities
244+
245+
246+
def advisory_prefetched_qs():
247+
return AdvisoryV2.objects.prefetch_related(
248+
"impacted_packages",
249+
"aliases",
250+
"references",
251+
"severities",
252+
"weaknesses",
253+
)
254+
255+
256+
def serialize_severity(sev):
257+
return {
258+
"score": sev.value,
259+
"scoring_system": sev.scoring_system,
260+
"scoring_elements": sev.scoring_elements,
261+
"published_at": str(sev.published_at),
262+
"url": sev.url,
263+
}
264+
265+
266+
def serialize_references(reference):
267+
return {
268+
"url": reference.url,
269+
"reference_type": reference.reference_type,
270+
"reference_id": reference.reference_id,
271+
}
272+
273+
274+
def serialize_advisory(advisory):
275+
"""Return a plain data mapping serialized from advisory object."""
276+
aliases = sorted([a.alias for a in advisory.aliases.all()])
277+
severities = [serialize_severity(sev) for sev in advisory.severities.all()]
278+
weaknesses = [wkns.cwe for wkns in advisory.weaknesses.all()]
279+
references = [serialize_references(ref) for ref in advisory.references.all()]
280+
impacts = [
281+
{
282+
"purl": impact.base_purl,
283+
"affected_versions": impact.affecting_vers,
284+
"fixed_versions": impact.fixed_vers,
285+
}
286+
for impact in advisory.impacted_packages.all()
287+
]
288+
289+
return {
290+
"advisory_id": advisory.advisory_id,
291+
"datasource_id": advisory.avid,
292+
"datasource_url": advisory.url,
293+
"aliases": aliases,
294+
"summary": advisory.summary,
295+
"impacted_packages": impacts,
296+
"severities": severities,
297+
"weaknesses": weaknesses,
298+
"references": references,
299+
}
300+
301+
302+
def write_file(repo_path, file_path, data):
303+
"""Write ``data`` as YAML to ``repo_path``."""
304+
write_to = repo_path / file_path
305+
write_to.parent.mkdir(parents=True, exist_ok=True)
306+
with open(write_to, encoding="utf-8", mode="w") as f:
307+
f.write(saneyaml.dump(data))

0 commit comments

Comments
 (0)