Skip to content

Commit d7ed5f2

Browse files
committed
Add docstrings and use pipeline logger
Signed-off-by: Sampurna Pyne <sampurnapyne1710@gmail.com>
1 parent 6547ea2 commit d7ed5f2

File tree

1 file changed

+19
-21
lines changed

1 file changed

+19
-21
lines changed

vulnerabilities/pipelines/v2_importers/tuxcare_importer.py

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,13 @@
88
#
99

1010
import json
11-
import logging
1211
from typing import Iterable
1312

1413
from dateutil.parser import parse
1514
from packageurl import PackageURL
1615
from pytz import UTC
16+
from univers.version_range import RANGE_CLASS_BY_SCHEMES
1717
from univers.version_range import AlpineLinuxVersionRange
18-
from univers.version_range import DebianVersionRange
19-
from univers.version_range import GenericVersionRange
20-
from univers.version_range import RpmVersionRange
2118

2219
from vulnerabilities.importer import AdvisoryData
2320
from vulnerabilities.importer import AffectedPackageV2
@@ -26,18 +23,16 @@
2623
from vulnerabilities.severity_systems import GENERIC
2724
from vulnerabilities.utils import fetch_response
2825

29-
logger = logging.getLogger(__name__)
30-
3126
# See https://docs.tuxcare.com/els-for-os/#cve-status-definition
3227
NON_AFFECTED_STATUSES = ["Not Vulnerable"]
3328
AFFECTED_STATUSES = ["Ignored", "Needs Triage", "In Testing", "In Progress", "In Rollout"]
3429
FIXED_STATUSES = ["Released", "Already Fixed"]
3530

3631
VERSION_RANGE_BY_PURL_TYPE = {
37-
"rpm": RpmVersionRange,
38-
"deb": DebianVersionRange,
32+
"rpm": RANGE_CLASS_BY_SCHEMES["rpm"],
33+
"deb": RANGE_CLASS_BY_SCHEMES["deb"],
3934
"apk": AlpineLinuxVersionRange,
40-
"generic": GenericVersionRange,
35+
"generic": RANGE_CLASS_BY_SCHEMES["generic"],
4136
}
4237

4338

@@ -61,14 +56,17 @@ def fetch(self) -> None:
6156
self._grouped = self._group_records_by_cve()
6257

6358
def _group_records_by_cve(self) -> dict:
59+
"""
60+
A single CVE can appear in multiple records across different operating systems, distributions, or package versions. This method groups all records with the same CVE together and skips entries that are invalid or marked as not affected. The result is a dictionary keyed by CVE ID, with each value containing the related records.
61+
"""
6462
grouped = {}
6563
skipped_invalid = 0
6664
skipped_non_affected = 0
6765

6866
for record in self.response:
6967
cve_id = record.get("cve", "").strip()
70-
if not cve_id or not cve_id.startswith("CVE-"):
71-
logger.warning(f"Skipping invalid CVE ID: {cve_id}")
68+
if not cve_id:
69+
self.log(f"Skipping record with empty CVE ID")
7270
skipped_invalid += 1
7371
continue
7472

@@ -78,7 +76,7 @@ def _group_records_by_cve(self) -> dict:
7876
status = record.get("status", "").strip()
7977

8078
if not all([os_name, project_name, version, status]):
81-
logger.warning(f"Skipping {cve_id}: missing required fields")
79+
self.log(f"Skipping {cve_id}: missing required fields")
8280
skipped_invalid += 1
8381
continue
8482

@@ -88,7 +86,7 @@ def _group_records_by_cve(self) -> dict:
8886
continue
8987

9088
if status not in AFFECTED_STATUSES and status not in FIXED_STATUSES:
91-
logger.warning(f"Skipping {cve_id}: unrecognized status '{status}'")
89+
self.log(f"Skipping {cve_id}: unrecognized status '{status}'")
9290
skipped_invalid += 1
9391
continue
9492

@@ -146,7 +144,6 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
146144
severities = []
147145
date_published = None
148146
all_records = []
149-
severity_added = False
150147

151148
for record in records:
152149
os_name = record.get("os_name", "").strip()
@@ -159,16 +156,17 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
159156

160157
purl = self._create_purl(project_name, os_name)
161158
if not purl:
162-
logger.warning(
159+
self.log(
163160
f"Skipping package {project_name} on {os_name} for {cve_id} - unexpected OS type"
164161
)
165162
continue
166163

167-
version_range_class = VERSION_RANGE_BY_PURL_TYPE.get(purl.type, GenericVersionRange)
164+
version_range_class = VERSION_RANGE_BY_PURL_TYPE.get(purl.type)
165+
168166
try:
169167
version_range = version_range_class.from_versions([version])
170168
except ValueError as e:
171-
logger.warning(f"Failed to parse version {version} for {cve_id}: {e}")
169+
self.log(f"Failed to parse version {version} for {cve_id}: {e}")
172170
continue
173171

174172
affected_version_range = None
@@ -187,28 +185,28 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
187185
)
188186
)
189187

190-
if severity and score and not severity_added:
188+
# Severity is per-CVE hence we add it only once
189+
if severity and score and not severities:
191190
severities.append(
192191
VulnerabilitySeverity(
193192
system=GENERIC,
194193
value=score,
195194
scoring_elements=severity,
196195
)
197196
)
198-
severity_added = True
199197

200198
if last_updated:
201199
try:
202200
current_date = parse(last_updated).replace(tzinfo=UTC)
203201
if date_published is None or current_date > date_published:
204202
date_published = current_date
205203
except ValueError as e:
206-
logger.warning(f"Failed to parse date {last_updated} for {cve_id}: {e}")
204+
self.log(f"Failed to parse date {last_updated} for {cve_id}: {e}")
207205

208206
all_records.append(record)
209207

210208
if not affected_packages:
211-
logger.warning(f"Skipping {cve_id} - no valid affected packages")
209+
self.log(f"Skipping {cve_id} - no valid affected packages")
212210
continue
213211

214212
yield AdvisoryData(

0 commit comments

Comments
 (0)