|
| 1 | +import logging |
| 2 | +from typing import Iterable |
| 3 | +from typing import List |
| 4 | + |
| 5 | +import requests |
| 6 | +from django.db.models import Q |
| 7 | +from django.db.models.query import QuerySet |
| 8 | +from packageurl import PackageURL |
| 9 | +from univers.versions import GentooVersion |
| 10 | + |
| 11 | +from vulnerabilities.importer import AdvisoryData |
| 12 | +from vulnerabilities.importer import AffectedPackage |
| 13 | +from vulnerabilities.importer import UnMergeablePackageError |
| 14 | +from vulnerabilities.importers.gentoo import GentooImporter |
| 15 | +from vulnerabilities.improver import MAX_CONFIDENCE |
| 16 | +from vulnerabilities.improver import Improver |
| 17 | +from vulnerabilities.improver import Inference |
| 18 | +from vulnerabilities.models import Advisory |
| 19 | +from vulnerabilities.utils import AffectedPackage as LegacyAffectedPackage |
| 20 | +from vulnerabilities.utils import get_affected_packages_by_patched_package |
| 21 | +from vulnerabilities.utils import nearest_patched_package |
| 22 | +from vulnerabilities.utils import resolve_version_range |
| 23 | + |
| 24 | +logger = logging.getLogger(__name__) |
| 25 | + |
| 26 | +GENTOO_PACKAGES_API_URL = "https://packages.gentoo.org/packages/{category}/{name}.json" |
| 27 | + |
| 28 | + |
| 29 | +def fetch_gentoo_package_versions(category, name): |
| 30 | + """ |
| 31 | + Fetch all known versions of a Gentoo package from the packages.gentoo.org API. |
| 32 | + Return a list of version strings. |
| 33 | + """ |
| 34 | + url = GENTOO_PACKAGES_API_URL.format(category=category, name=name) |
| 35 | + try: |
| 36 | + response = requests.get(url, timeout=30) |
| 37 | + response.raise_for_status() |
| 38 | + data = response.json() |
| 39 | + versions = [] |
| 40 | + for version_info in data.get("versions", []): |
| 41 | + version = version_info.get("version") |
| 42 | + if version: |
| 43 | + versions.append(version) |
| 44 | + return versions |
| 45 | + except Exception as e: |
| 46 | + logger.error(f"Error fetching Gentoo versions for {category}/{name}: {e}") |
| 47 | + return [] |
| 48 | + |
| 49 | + |
| 50 | +def get_revision_versions(base_version_str, all_versions): |
| 51 | + """ |
| 52 | + Given a base version string (e.g., "1.2.3") and a list of all versions, |
| 53 | + return a sorted list of all revisions of that base version. |
| 54 | +
|
| 55 | + For example, if base_version_str is "1.2.3" and all_versions contains |
| 56 | + ["1.2.3", "1.2.3-r1", "1.2.3-r2", "1.2.4"], returns ["1.2.3", "1.2.3-r1", "1.2.3-r2"]. |
| 57 | +
|
| 58 | + Gentoo revision versions share the same base version but differ in revision |
| 59 | + suffix (-r0, -r1, -r2, etc.). A version without -rN is equivalent to -r0. |
| 60 | + """ |
| 61 | + base = base_version_str.split("-r")[0] |
| 62 | + matching = [] |
| 63 | + for v in all_versions: |
| 64 | + v_base = v.split("-r")[0] |
| 65 | + if v_base == base: |
| 66 | + try: |
| 67 | + matching.append((GentooVersion(v), v)) |
| 68 | + except Exception: |
| 69 | + continue |
| 70 | + matching.sort(key=lambda x: x[0]) |
| 71 | + return [v for _, v in matching] |
| 72 | + |
| 73 | + |
| 74 | +def get_last_revision(base_version_str, all_versions): |
| 75 | + """ |
| 76 | + Given a base version string and a list of all known versions, |
| 77 | + find the last (highest) revision of that base version. |
| 78 | +
|
| 79 | + This is needed because Gentoo revision operators (rge, rle, rgt) apply only |
| 80 | + to revisions of a specific version, not to all subsequent versions. |
| 81 | + To create a bounded range, we use the last known revision as the upper bound. |
| 82 | + """ |
| 83 | + revisions = get_revision_versions(base_version_str, all_versions) |
| 84 | + if revisions: |
| 85 | + return revisions[-1] |
| 86 | + return None |
| 87 | + |
| 88 | + |
| 89 | +class GentooBasicImprover(Improver): |
| 90 | + """ |
| 91 | + Improve Gentoo advisory data by fetching all known versions of affected |
| 92 | + ebuild packages from the packages.gentoo.org API and resolving which |
| 93 | + versions fall within the affected version ranges. |
| 94 | +
|
| 95 | + This handles Gentoo's non-standard versioning by working with actual |
| 96 | + published versions rather than relying solely on version range arithmetic. |
| 97 | + """ |
| 98 | + |
| 99 | + @property |
| 100 | + def interesting_advisories(self) -> QuerySet: |
| 101 | + return Advisory.objects.filter( |
| 102 | + Q(created_by=GentooImporter.qualified_name) |
| 103 | + | Q(created_by="gentoo_importer_v2") |
| 104 | + ).paginated() |
| 105 | + |
| 106 | + def get_package_versions(self, package_url: PackageURL) -> List[str]: |
| 107 | + """ |
| 108 | + Fetch all known versions for a Gentoo ebuild package. |
| 109 | + """ |
| 110 | + if package_url.type != "ebuild": |
| 111 | + return [] |
| 112 | + category = package_url.namespace |
| 113 | + name = package_url.name |
| 114 | + if not category or not name: |
| 115 | + return [] |
| 116 | + return fetch_gentoo_package_versions(category, name) |
| 117 | + |
| 118 | + def get_inferences(self, advisory_data: AdvisoryData) -> Iterable[Inference]: |
| 119 | + if not advisory_data.affected_packages: |
| 120 | + return |
| 121 | + |
| 122 | + try: |
| 123 | + purl, affected_version_ranges, fixed_versions = AffectedPackage.merge( |
| 124 | + advisory_data.affected_packages |
| 125 | + ) |
| 126 | + except UnMergeablePackageError: |
| 127 | + logger.error( |
| 128 | + f"GentooBasicImprover: Cannot merge with different purls: " |
| 129 | + f"{advisory_data.affected_packages!r}" |
| 130 | + ) |
| 131 | + for affected_package in advisory_data.affected_packages: |
| 132 | + yield from self._process_single_package(affected_package, advisory_data) |
| 133 | + return |
| 134 | + |
| 135 | + pkg_type = purl.type |
| 136 | + pkg_namespace = purl.namespace |
| 137 | + pkg_name = purl.name |
| 138 | + |
| 139 | + fixed_purls = [ |
| 140 | + PackageURL( |
| 141 | + type=pkg_type, |
| 142 | + namespace=pkg_namespace, |
| 143 | + name=pkg_name, |
| 144 | + version=str(version), |
| 145 | + ) |
| 146 | + for version in fixed_versions |
| 147 | + ] |
| 148 | + |
| 149 | + if not affected_version_ranges: |
| 150 | + for fixed_purl in fixed_purls: |
| 151 | + yield Inference.from_advisory_data( |
| 152 | + advisory_data, |
| 153 | + confidence=MAX_CONFIDENCE, |
| 154 | + affected_purls=[], |
| 155 | + fixed_purl=fixed_purl, |
| 156 | + ) |
| 157 | + return |
| 158 | + |
| 159 | + valid_versions = self.get_package_versions(purl) |
| 160 | + if not valid_versions: |
| 161 | + return |
| 162 | + |
| 163 | + for affected_version_range in affected_version_ranges: |
| 164 | + yield from self._generate_inferences( |
| 165 | + affected_version_range=affected_version_range, |
| 166 | + pkg_type=pkg_type, |
| 167 | + pkg_namespace=pkg_namespace, |
| 168 | + pkg_name=pkg_name, |
| 169 | + valid_versions=valid_versions, |
| 170 | + advisory_data=advisory_data, |
| 171 | + ) |
| 172 | + |
| 173 | + def _process_single_package(self, affected_package, advisory_data): |
| 174 | + """Process a single affected package that could not be merged.""" |
| 175 | + purl = affected_package.package |
| 176 | + affected_version_range = affected_package.affected_version_range |
| 177 | + fixed_version = affected_package.fixed_version |
| 178 | + |
| 179 | + if not affected_version_range and fixed_version: |
| 180 | + yield Inference.from_advisory_data( |
| 181 | + advisory_data, |
| 182 | + confidence=MAX_CONFIDENCE, |
| 183 | + affected_purls=[], |
| 184 | + fixed_purl=PackageURL( |
| 185 | + type=purl.type, |
| 186 | + namespace=purl.namespace, |
| 187 | + name=purl.name, |
| 188 | + version=str(fixed_version), |
| 189 | + ), |
| 190 | + ) |
| 191 | + return |
| 192 | + |
| 193 | + valid_versions = self.get_package_versions(purl) |
| 194 | + if not valid_versions: |
| 195 | + return |
| 196 | + |
| 197 | + if affected_version_range: |
| 198 | + yield from self._generate_inferences( |
| 199 | + affected_version_range=affected_version_range, |
| 200 | + pkg_type=purl.type, |
| 201 | + pkg_namespace=purl.namespace, |
| 202 | + pkg_name=purl.name, |
| 203 | + valid_versions=valid_versions, |
| 204 | + advisory_data=advisory_data, |
| 205 | + ) |
| 206 | + |
| 207 | + def _generate_inferences( |
| 208 | + self, |
| 209 | + affected_version_range, |
| 210 | + pkg_type, |
| 211 | + pkg_namespace, |
| 212 | + pkg_name, |
| 213 | + valid_versions, |
| 214 | + advisory_data, |
| 215 | + ): |
| 216 | + """ |
| 217 | + Generate Inferences by resolving the affected_version_range |
| 218 | + against the list of known valid_versions. |
| 219 | + """ |
| 220 | + aff_vers, unaff_vers = resolve_version_range( |
| 221 | + affected_version_range=affected_version_range, |
| 222 | + ignorable_versions=[], |
| 223 | + package_versions=valid_versions, |
| 224 | + ) |
| 225 | + |
| 226 | + affected_purls = [ |
| 227 | + PackageURL(type=pkg_type, namespace=pkg_namespace, name=pkg_name, version=v) |
| 228 | + for v in aff_vers |
| 229 | + ] |
| 230 | + |
| 231 | + unaffected_purls = [ |
| 232 | + PackageURL(type=pkg_type, namespace=pkg_namespace, name=pkg_name, version=v) |
| 233 | + for v in unaff_vers |
| 234 | + ] |
| 235 | + |
| 236 | + affected_packages: List[LegacyAffectedPackage] = nearest_patched_package( |
| 237 | + vulnerable_packages=affected_purls, resolved_packages=unaffected_purls |
| 238 | + ) |
| 239 | + |
| 240 | + for ( |
| 241 | + fixed_package, |
| 242 | + affected_purls, |
| 243 | + ) in get_affected_packages_by_patched_package(affected_packages).items(): |
| 244 | + yield Inference.from_advisory_data( |
| 245 | + advisory_data, |
| 246 | + confidence=MAX_CONFIDENCE, |
| 247 | + affected_purls=affected_purls, |
| 248 | + fixed_purl=fixed_package, |
| 249 | + ) |
0 commit comments