| #!/usr/bin/env python3 |
| # SPDX-License-Identifier: GPL-2.0-or-later |
| # |
| # Enriches the input CycloneDX SBOM with vulnerability information from the NVD |
| # database. |
| # |
| # The NVD database is cloned using a mirror of it and the content is compared |
| # locally. |
| # |
| # Example usage: |
| # $ make show-info | utils/generate-cyclonedx | support/script/cve-check --nvd-path dl/buildroot-nvd/ |
| from collections import defaultdict |
| from pathlib import Path |
| from typing import TypedDict |
| import argparse |
| import sys |
| import json |
| |
| import cve as cvecheck |
| |
| |
| class Options(TypedDict, total=True): |
| include_resolved: bool |
| |
| |
| DESCRIPTION = """ |
| Enriches the input CycloneDX SBOM with vulnerability information from the NVD |
| database. |
| |
| The NVD database is cloned using a mirror of it and the content is compared |
| locally. |
| """ |
| |
| |
| brpath = Path(__file__).parent.parent.parent |
| |
| |
| def cve_api_get_lang_from_list(values, lang="en") -> (str | None): |
| for x in values: |
| if x.get("lang") == lang: |
| return x.get("value") |
| return None |
| |
| |
| def nvd_cve_weaknesses_to_cdx(weaknesses) -> list[int]: |
| """ |
| See the CycloneDX specification for 'cwes' [1] |
| |
| [1] https://cyclonedx.org/docs/1.6/json/#vulnerabilities_items_cwes |
| """ |
| res = [] |
| |
| for node in weaknesses: |
| value = cve_api_get_lang_from_list(node.get("description", [])) |
| if value is None: |
| continue |
| |
| cwe = value.replace("CWE-", "") |
| |
| if not cwe.isnumeric(): |
| continue |
| res.append(int(cwe)) |
| |
| return res |
| |
| |
| def nvd_cve_cvss_to_cdx(metrics): |
| """ |
| See the CycloneDX specification for 'ratings' [1] |
| |
| [1] https://cyclonedx.org/docs/1.6/json/#vulnerabilities_items_ratings |
| """ |
| |
| KEY_METHOD_DICT = { |
| "cvssMetricV40": "CVSSv4", |
| "cvssMetricV31": "CVSSv31", |
| "cvssMetricV3": "CVSSv3", |
| "cvssMetricV2": "CVSSv2" |
| } |
| |
| res = [] |
| |
| for key, values in metrics.items(): |
| for value in values: |
| data = value.get("cvssData", {}) |
| res.append({ |
| "method": KEY_METHOD_DICT.get(key, "other"), |
| **({ |
| "score": data["baseScore"], |
| } if "baseScore" in data else {}), |
| **({ |
| "severity": data["baseSeverity"].lower(), |
| } if "baseSeverity" in data else {}), |
| **({ |
| "vector": data["vectorString"], |
| } if "vectorString" in data else {}), |
| }) |
| |
| return res |
| |
| |
| def nvd_cve_references_to_cdx(references): |
| advisories = [] |
| |
| for ref in references: |
| if not {"url", "tags"}.issubset(ref): |
| continue |
| |
| tags = ref["tags"] |
| if not isinstance(tags, list) or len(tags) == 0: |
| continue |
| |
| advisories.append({ |
| "title": next((t for t in tags if "Advisory" not in t), tags[0]), |
| "url": ref["url"] |
| }) |
| |
| return advisories |
| |
| |
| def nvd_cve_to_cdx_vulnerability(nvd_cve): |
| """ |
| Turns the CVE object fetched from the NVD API into a CycloneDX |
| vulnerability that fits the spec (see [1]). |
| |
| [1] https://cyclonedx.org/docs/1.6/json/#vulnerabilities |
| """ |
| vulnerability = { |
| "bom-ref": nvd_cve["id"], |
| "id": nvd_cve["id"], |
| "description": cve_api_get_lang_from_list(nvd_cve.get("descriptions", [])) or "", |
| "source": { |
| "name": "NVD", |
| "url": "https://nvd.nist.gov/" |
| }, |
| **({ |
| "published": nvd_cve["published"], |
| } if "published" in nvd_cve else {}), |
| **({ |
| "updated": nvd_cve["lastModified"], |
| } if "lastModified" in nvd_cve else {}), |
| **({ |
| "cwes": nvd_cve_weaknesses_to_cdx(nvd_cve["weaknesses"]), |
| } if "weaknesses" in nvd_cve else {}), |
| **({ |
| "ratings": nvd_cve_cvss_to_cdx(nvd_cve["metrics"]), |
| } if "metrics" in nvd_cve else {}), |
| **({ |
| "advisories": nvd_cve_references_to_cdx(nvd_cve["references"]), |
| } if "references" in nvd_cve else {}), |
| } |
| |
| return vulnerability |
| |
| |
| def vuln_append_or_update_affects_if_exists(vulnerabilities, vulnerability): |
| """ |
| Append 'vulnerability' passed as argument to the 'vulnerabilities' argument |
| if an entry with the same 'id' doesn't exist yet. |
| If the vulnerability already exists, the input reference is added to the |
| 'affects' list of the existing entry. |
| |
| Args: |
| vulnerabilities (list): The vulnerabilities array reference retrieved |
| from the input CycloneDX SBOM |
| vulnerability (dict): Vulnerability to add to the 'vulnerabilities' list. |
| """ |
| # Search if a vulnerability with the same identifier already exists in the |
| # SBOM vulnerability list. |
| matching_vuln = next( |
| (vuln for vuln in vulnerabilities if vuln.get("id") == vulnerability["id"]), |
| None |
| ) |
| |
| # bom-ref to the component is passed to the affects of the vulnerability |
| # passed as argument |
| bom_ref = next((a["ref"] for a in vulnerability.get("affects", [])), None) |
| |
| if matching_vuln is not None: |
| # Remove the affect to not use it while updating matching vuln. |
| if "affects" in vulnerability: |
| del vulnerability["affects"] |
| |
| if matching_vuln.get("analysis") is not None and "analysis" in vulnerability: |
| # We don't update vulnerabilities that already have an |
| # 'analysis'. |
| # Buildroot ignored vulnerabilities will already have |
| # an analysis and need to remain as such. |
| del vulnerability["analysis"] |
| |
| affects = matching_vuln.setdefault("affects", []) |
| |
| if bom_ref is not None: |
| ref = next((a["ref"] for a in affects if a["ref"] == bom_ref), None) |
| if ref is None: |
| # Add a 'ref' (bom reference) to the component if not |
| # already present in the 'affects' list. |
| affects.append({ |
| "ref": bom_ref |
| }) |
| |
| # Update the metadata of the vulnerability with the one |
| # downloaded from the database. |
| matching_vuln.update(vulnerability) |
| else: |
| vulnerabilities.append(vulnerability) |
| |
| |
| def check_package_cve_affects(cve: cvecheck.CVE, cpe_product_pkgs, sbom, opt: Options): |
| vulnerabilities = sbom.setdefault("vulnerabilities", []) |
| |
| for product in cve.affected_products: |
| for comp in cpe_product_pkgs.get(product, []): |
| cve_status = cve.affects(comp["name"], comp["version"], comp["cpe"]) |
| |
| if cve_status == cve.CVE_UNKNOWN: |
| continue |
| |
| if cve_status == cve.CVE_DOESNT_AFFECT and not opt["include_resolved"]: |
| continue |
| |
| vulnerability = nvd_cve_to_cdx_vulnerability(cve.nvd_cve) |
| |
| vulnerability["analysis"] = { |
| "state": "exploitable" if cve_status == cve.CVE_AFFECTS else "resolved" |
| } |
| |
| vulnerability["affects"] = [{ |
| "ref": comp["bom-ref"] |
| }] |
| |
| vuln_append_or_update_affects_if_exists(vulnerabilities, vulnerability) |
| |
| |
| def check_package_cves(nvd_path: Path, sbom, opt: Options): |
| """ |
| Iterate over every entry of the NVD API mirror. Each vulnerability is |
| compared to the set of components passed as argument in the 'sbom'. |
| The vulnerabilities set of that 'sbom' argument is enriched with analysis |
| of vulnerabilities that match that set of components. |
| |
| Args: |
| nvd_path (Path): Path of the mirror of the NVD API. |
| sbom (dict): Input SBOM containing a set of vulnerabilities that will be enriched. |
| opt (Options): Options for the analysis. |
| """ |
| cpe_product_pkgs = defaultdict(list) |
| |
| for comp in sbom.get("components", []): |
| if comp.get("cpe") and comp.get("version"): |
| cpe_product = cvecheck.CPE(comp["cpe"]).product |
| cpe_product_pkgs[cpe_product].append(comp) |
| |
| for cve in cvecheck.CVE.read_nvd_dir(nvd_path): |
| check_package_cve_affects(cve, cpe_product_pkgs, sbom, opt) |
| |
| |
| def enrich_vulnerabilities(nvd_path: Path, sbom): |
| """ |
| Iterate over the vulnerabilities present in the 'sbom' passed as arguments |
| and enrich the vulnerability with content from the NVD API mirror. |
| |
| Args: |
| nvd_path (Path): Path of the mirror of the NVD API. |
| sbom (dict): Input SBOM containing a set of vulnerabilities that will be enriched. |
| """ |
| vulnerabilities = sbom.setdefault("vulnerabilities", []) |
| |
| for vuln in vulnerabilities: |
| vuln_id = vuln.get("id") |
| if vuln_id is None or not vuln_id.upper().startswith("CVE-"): |
| continue |
| |
| cve = cvecheck.CVE.read_nvd_entry(nvd_path, vuln_id) |
| |
| if cve is None: |
| print(f"Warning: '{vuln_id}' doesn't exist in NVD database.", file=sys.stderr) |
| continue |
| |
| vulnerability = nvd_cve_to_cdx_vulnerability(cve.nvd_cve) |
| vuln_append_or_update_affects_if_exists(vulnerabilities, vulnerability) |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description=DESCRIPTION) |
| parser.add_argument("-i", "--in-file", nargs="?", type=argparse.FileType("r"), |
| default=(None if sys.stdin.isatty() else sys.stdin)) |
| parser.add_argument("-o", "--out-file", nargs="?", type=argparse.FileType("w"), |
| default=sys.stdout) |
| parser.add_argument('--nvd-path', dest='nvd_path', |
| default=brpath / 'dl' / 'buildroot-nvd', |
| help='Path to the local NVD database', |
| type=lambda p: Path(p).expanduser().resolve()) |
| parser.add_argument("--enrich-only", default=False, action='store_true', |
| help="Only update metadata for the vulnerabilities currently present " + |
| "in the input CycloneDX SBOM. Don't do an analysis.") |
| parser.add_argument("--include-resolved", default=False, action='store_true', |
| help="Add vulnerabilities already 'resolved' that don't affect a " + |
| "component to the output CycloneDX vulnerabilities analysis.") |
| parser.add_argument("--no-nvd-update", default=False, action='store_true', |
| help="Doesn't update the NVD database.") |
| |
| args = parser.parse_args() |
| |
| if args.in_file is None or args.nvd_path is None: |
| parser.print_help() |
| sys.exit(1) |
| |
| sbom = json.load(args.in_file) |
| |
| opt = Options( |
| include_resolved=args.include_resolved, |
| ) |
| |
| args.nvd_path.mkdir(parents=True, exist_ok=True) |
| if not args.no_nvd_update: |
| cvecheck.CVE.download_nvd(args.nvd_path) |
| |
| if args.enrich_only: |
| enrich_vulnerabilities(args.nvd_path, sbom) |
| else: |
| check_package_cves(args.nvd_path, sbom, opt) |
| |
| args.out_file.write(json.dumps(sbom, indent=2)) |
| args.out_file.write('\n') |
| |
| |
| if __name__ == "__main__": |
| main() |