blob: 8a67e509e11f4636d2b8e7e8549edcf86e24c014 [file] [log] [blame]
#!/usr/bin/env python
# Copyright (C) 2009 by Thomas Petazzoni <thomas.petazzoni@free-electrons.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import argparse
import datetime
import fnmatch
import os
from collections import defaultdict
import re
import subprocess
import requests # URL checking
import json
import ijson
import certifi
import distutils.version
import time
import gzip
from urllib3 import HTTPSConnectionPool
from urllib3.exceptions import HTTPError
from multiprocessing import Pool
NVD_START_YEAR = 2002
NVD_JSON_VERSION = "1.0"
NVD_BASE_URL = "https://nvd.nist.gov/feeds/json/cve/" + NVD_JSON_VERSION
INFRA_RE = re.compile(r"\$\(eval \$\(([a-z-]*)-package\)\)")
URL_RE = re.compile(r"\s*https?://\S*\s*$")
RM_API_STATUS_ERROR = 1
RM_API_STATUS_FOUND_BY_DISTRO = 2
RM_API_STATUS_FOUND_BY_PATTERN = 3
RM_API_STATUS_NOT_FOUND = 4
# Used to make multiple requests to the same host. It is global
# because it's used by sub-processes.
http_pool = None
class Package:
all_licenses = list()
all_license_files = list()
all_versions = dict()
all_ignored_cves = dict()
def __init__(self, name, path):
self.name = name
self.path = path
self.infras = None
self.has_license = False
self.has_license_files = False
self.has_hash = False
self.patch_count = 0
self.warnings = 0
self.current_version = None
self.url = None
self.url_status = None
self.url_worker = None
self.cves = list()
self.latest_version = (RM_API_STATUS_ERROR, None, None)
def pkgvar(self):
return self.name.upper().replace("-", "_")
def set_url(self):
"""
Fills in the .url field
"""
self.url_status = "No Config.in"
for filename in os.listdir(os.path.dirname(self.path)):
if fnmatch.fnmatch(filename, 'Config.*'):
fp = open(os.path.join(os.path.dirname(self.path), filename), "r")
for config_line in fp:
if URL_RE.match(config_line):
self.url = config_line.strip()
self.url_status = "Found"
fp.close()
return
self.url_status = "Missing"
fp.close()
def set_infra(self):
"""
Fills in the .infras field
"""
self.infras = list()
with open(self.path, 'r') as f:
lines = f.readlines()
for l in lines:
match = INFRA_RE.match(l)
if not match:
continue
infra = match.group(1)
if infra.startswith("host-"):
self.infras.append(("host", infra[5:]))
else:
self.infras.append(("target", infra))
def set_license(self):
"""
Fills in the .has_license and .has_license_files fields
"""
var = self.pkgvar()
if var in self.all_licenses:
self.has_license = True
if var in self.all_license_files:
self.has_license_files = True
def set_hash_info(self):
"""
Fills in the .has_hash field
"""
hashpath = self.path.replace(".mk", ".hash")
self.has_hash = os.path.exists(hashpath)
def set_patch_count(self):
"""
Fills in the .patch_count field
"""
self.patch_count = 0
pkgdir = os.path.dirname(self.path)
for subdir, _, _ in os.walk(pkgdir):
self.patch_count += len(fnmatch.filter(os.listdir(subdir), '*.patch'))
def set_current_version(self):
"""
Fills in the .current_version field
"""
var = self.pkgvar()
if var in self.all_versions:
self.current_version = self.all_versions[var]
def set_check_package_warnings(self):
"""
Fills in the .warnings field
"""
cmd = ["./utils/check-package"]
pkgdir = os.path.dirname(self.path)
for root, dirs, files in os.walk(pkgdir):
for f in files:
if f.endswith(".mk") or f.endswith(".hash") or f == "Config.in" or f == "Config.in.host":
cmd.append(os.path.join(root, f))
o = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[1]
lines = o.splitlines()
for line in lines:
m = re.match("^([0-9]*) warnings generated", line.decode())
if m:
self.warnings = int(m.group(1))
return
def is_cve_ignored(self, cve):
"""
Tells if the CVE is ignored by the package
"""
return cve in self.all_ignored_cves.get(self.pkgvar(), [])
def __eq__(self, other):
return self.path == other.path
def __lt__(self, other):
return self.path < other.path
def __str__(self):
return "%s (path='%s', license='%s', license_files='%s', hash='%s', patches=%d)" % \
(self.name, self.path, self.has_license, self.has_license_files, self.has_hash, self.patch_count)
class CVE:
"""An accessor class for CVE Items in NVD files"""
def __init__(self, nvd_cve):
"""Initialize a CVE from its NVD JSON representation"""
self.nvd_cve = nvd_cve
@staticmethod
def download_nvd_year(nvd_path, year):
metaf = "nvdcve-%s-%s.meta" % (NVD_JSON_VERSION, year)
path_metaf = os.path.join(nvd_path, metaf)
jsonf_gz = "nvdcve-%s-%s.json.gz" % (NVD_JSON_VERSION, year)
path_jsonf_gz = os.path.join(nvd_path, jsonf_gz)
# If the database file is less than a day old, we assume the NVD data
# locally available is recent enough.
if os.path.exists(path_jsonf_gz) and os.stat(path_jsonf_gz).st_mtime >= time.time() - 86400:
return path_jsonf_gz
# If not, we download the meta file
url = "%s/%s" % (NVD_BASE_URL, metaf)
print("Getting %s" % url)
page_meta = requests.get(url)
page_meta.raise_for_status()
# If the meta file already existed, we compare the existing
# one with the data newly downloaded. If they are different,
# we need to re-download the database.
# If the database does not exist locally, we need to redownload it in
# any case.
if os.path.exists(path_metaf) and os.path.exists(path_jsonf_gz):
meta_known = open(path_metaf, "r").read()
if page_meta.text == meta_known:
return path_jsonf_gz
# Grab the compressed JSON NVD, and write files to disk
url = "%s/%s" % (NVD_BASE_URL, jsonf_gz)
print("Getting %s" % url)
page_json = requests.get(url)
page_json.raise_for_status()
open(path_jsonf_gz, "wb").write(page_json.content)
open(path_metaf, "w").write(page_meta.text)
return path_jsonf_gz
@classmethod
def read_nvd_dir(cls, nvd_dir):
"""
Iterate over all the CVEs contained in NIST Vulnerability Database
feeds since NVD_START_YEAR. If the files are missing or outdated in
nvd_dir, a fresh copy will be downloaded, and kept in .json.gz
"""
for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1):
filename = CVE.download_nvd_year(nvd_dir, year)
try:
content = ijson.items(gzip.GzipFile(filename), 'CVE_Items.item')
except:
print("ERROR: cannot read %s. Please remove the file then rerun this script" % filename)
raise
for cve in content:
yield cls(cve['cve'])
def each_product(self):
"""Iterate over each product section of this cve"""
for vendor in self.nvd_cve['affects']['vendor']['vendor_data']:
for product in vendor['product']['product_data']:
yield product
@property
def identifier(self):
"""The CVE unique identifier"""
return self.nvd_cve['CVE_data_meta']['ID']
@property
def pkg_names(self):
"""The set of package names referred by this CVE definition"""
return set(p['product_name'] for p in self.each_product())
def affects(self, br_pkg):
"""
True if the Buildroot Package object passed as argument is affected
by this CVE.
"""
if br_pkg.is_cve_ignored(self.identifier):
return False
for product in self.each_product():
if product['product_name'] != br_pkg.name:
continue
for v in product['version']['version_data']:
if v["version_affected"] == "=":
if br_pkg.current_version == v["version_value"]:
return True
elif v["version_affected"] == "<=":
pkg_version = distutils.version.LooseVersion(br_pkg.current_version)
if not hasattr(pkg_version, "version"):
print("Cannot parse package '%s' version '%s'" % (br_pkg.name, br_pkg.current_version))
continue
cve_affected_version = distutils.version.LooseVersion(v["version_value"])
if not hasattr(cve_affected_version, "version"):
print("Cannot parse CVE affected version '%s'" % v["version_value"])
continue
return pkg_version <= cve_affected_version
else:
print("version_affected: %s" % v['version_affected'])
return False
def get_pkglist(npackages, package_list):
"""
Builds the list of Buildroot packages, returning a list of Package
objects. Only the .name and .path fields of the Package object are
initialized.
npackages: limit to N packages
package_list: limit to those packages in this list
"""
WALK_USEFUL_SUBDIRS = ["boot", "linux", "package", "toolchain"]
WALK_EXCLUDES = ["boot/common.mk",
"linux/linux-ext-.*.mk",
"package/freescale-imx/freescale-imx.mk",
"package/gcc/gcc.mk",
"package/gstreamer/gstreamer.mk",
"package/gstreamer1/gstreamer1.mk",
"package/gtk2-themes/gtk2-themes.mk",
"package/matchbox/matchbox.mk",
"package/opengl/opengl.mk",
"package/qt5/qt5.mk",
"package/x11r7/x11r7.mk",
"package/doc-asciidoc.mk",
"package/pkg-.*.mk",
"package/nvidia-tegra23/nvidia-tegra23.mk",
"toolchain/toolchain-external/pkg-toolchain-external.mk",
"toolchain/toolchain-external/toolchain-external.mk",
"toolchain/toolchain.mk",
"toolchain/helpers.mk",
"toolchain/toolchain-wrapper.mk"]
packages = list()
count = 0
for root, dirs, files in os.walk("."):
rootdir = root.split("/")
if len(rootdir) < 2:
continue
if rootdir[1] not in WALK_USEFUL_SUBDIRS:
continue
for f in files:
if not f.endswith(".mk"):
continue
# Strip ending ".mk"
pkgname = f[:-3]
if package_list and pkgname not in package_list:
continue
pkgpath = os.path.join(root, f)
skip = False
for exclude in WALK_EXCLUDES:
# pkgpath[2:] strips the initial './'
if re.match(exclude, pkgpath[2:]):
skip = True
continue
if skip:
continue
p = Package(pkgname, pkgpath)
packages.append(p)
count += 1
if npackages and count == npackages:
return packages
return packages
def package_init_make_info():
# Fetch all variables at once
variables = subprocess.check_output(["make", "BR2_HAVE_DOT_CONFIG=y", "-s", "printvars",
"VARS=%_LICENSE %_LICENSE_FILES %_VERSION %_IGNORE_CVES"])
variable_list = variables.decode().splitlines()
# We process first the host package VERSION, and then the target
# package VERSION. This means that if a package exists in both
# target and host variants, with different values (eg. version
# numbers (unlikely)), we'll report the target one.
variable_list = [x[5:] for x in variable_list if x.startswith("HOST_")] + \
[x for x in variable_list if not x.startswith("HOST_")]
for l in variable_list:
# Get variable name and value
pkgvar, value = l.split("=")
# Strip the suffix according to the variable
if pkgvar.endswith("_LICENSE"):
# If value is "unknown", no license details available
if value == "unknown":
continue
pkgvar = pkgvar[:-8]
Package.all_licenses.append(pkgvar)
elif pkgvar.endswith("_LICENSE_FILES"):
if pkgvar.endswith("_MANIFEST_LICENSE_FILES"):
continue
pkgvar = pkgvar[:-14]
Package.all_license_files.append(pkgvar)
elif pkgvar.endswith("_VERSION"):
if pkgvar.endswith("_DL_VERSION"):
continue
pkgvar = pkgvar[:-8]
Package.all_versions[pkgvar] = value
elif pkgvar.endswith("_IGNORE_CVES"):
pkgvar = pkgvar[:-12]
Package.all_ignored_cves[pkgvar] = value.split()
def check_url_status_worker(url, url_status):
if url_status != "Missing" and url_status != "No Config.in":
try:
url_status_code = requests.head(url, timeout=30).status_code
if url_status_code >= 400:
return "Invalid(%s)" % str(url_status_code)
except requests.exceptions.RequestException:
return "Invalid(Err)"
return "Ok"
return url_status
def check_package_urls(packages):
pool = Pool(processes=64)
for pkg in packages:
pkg.url_worker = pool.apply_async(check_url_status_worker, (pkg.url, pkg.url_status))
for pkg in packages:
pkg.url_status = pkg.url_worker.get(timeout=3600)
del pkg.url_worker
pool.terminate()
def release_monitoring_get_latest_version_by_distro(pool, name):
try:
req = pool.request('GET', "/api/project/Buildroot/%s" % name)
except HTTPError:
return (RM_API_STATUS_ERROR, None, None)
if req.status != 200:
return (RM_API_STATUS_NOT_FOUND, None, None)
data = json.loads(req.data)
if 'version' in data:
return (RM_API_STATUS_FOUND_BY_DISTRO, data['version'], data['id'])
else:
return (RM_API_STATUS_FOUND_BY_DISTRO, None, data['id'])
def release_monitoring_get_latest_version_by_guess(pool, name):
try:
req = pool.request('GET', "/api/projects/?pattern=%s" % name)
except HTTPError:
return (RM_API_STATUS_ERROR, None, None)
if req.status != 200:
return (RM_API_STATUS_NOT_FOUND, None, None)
data = json.loads(req.data)
projects = data['projects']
projects.sort(key=lambda x: x['id'])
for p in projects:
if p['name'] == name and 'version' in p:
return (RM_API_STATUS_FOUND_BY_PATTERN, p['version'], p['id'])
return (RM_API_STATUS_NOT_FOUND, None, None)
def check_package_latest_version_worker(name):
"""Wrapper to try both by name then by guess"""
print(name)
res = release_monitoring_get_latest_version_by_distro(http_pool, name)
if res[0] == RM_API_STATUS_NOT_FOUND:
res = release_monitoring_get_latest_version_by_guess(http_pool, name)
return res
def check_package_latest_version(packages):
"""
Fills in the .latest_version field of all Package objects
This field has a special format:
(status, version, id)
with:
- status: one of RM_API_STATUS_ERROR,
RM_API_STATUS_FOUND_BY_DISTRO, RM_API_STATUS_FOUND_BY_PATTERN,
RM_API_STATUS_NOT_FOUND
- version: string containing the latest version known by
release-monitoring.org for this package
- id: string containing the id of the project corresponding to this
package, as known by release-monitoring.org
"""
global http_pool
http_pool = HTTPSConnectionPool('release-monitoring.org', port=443,
cert_reqs='CERT_REQUIRED', ca_certs=certifi.where(),
timeout=30)
worker_pool = Pool(processes=64)
results = worker_pool.map(check_package_latest_version_worker, (pkg.name for pkg in packages))
for pkg, r in zip(packages, results):
pkg.latest_version = r
worker_pool.terminate()
del http_pool
def check_package_cves(nvd_path, packages):
if not os.path.isdir(nvd_path):
os.makedirs(nvd_path)
for cve in CVE.read_nvd_dir(nvd_path):
for pkg_name in cve.pkg_names:
if pkg_name in packages and cve.affects(packages[pkg_name]):
packages[pkg_name].cves.append(cve.identifier)
def calculate_stats(packages):
stats = defaultdict(int)
for pkg in packages:
# If packages have multiple infra, take the first one. For the
# vast majority of packages, the target and host infra are the
# same. There are very few packages that use a different infra
# for the host and target variants.
if len(pkg.infras) > 0:
infra = pkg.infras[0][1]
stats["infra-%s" % infra] += 1
else:
stats["infra-unknown"] += 1
if pkg.has_license:
stats["license"] += 1
else:
stats["no-license"] += 1
if pkg.has_license_files:
stats["license-files"] += 1
else:
stats["no-license-files"] += 1
if pkg.has_hash:
stats["hash"] += 1
else:
stats["no-hash"] += 1
if pkg.latest_version[0] == RM_API_STATUS_FOUND_BY_DISTRO:
stats["rmo-mapping"] += 1
else:
stats["rmo-no-mapping"] += 1
if not pkg.latest_version[1]:
stats["version-unknown"] += 1
elif pkg.latest_version[1] == pkg.current_version:
stats["version-uptodate"] += 1
else:
stats["version-not-uptodate"] += 1
stats["patches"] += pkg.patch_count
stats["total-cves"] += len(pkg.cves)
if len(pkg.cves) != 0:
stats["pkg-cves"] += 1
return stats
html_header = """
<head>
<script src=\"https://www.kryogenix.org/code/browser/sorttable/sorttable.js\"></script>
<style type=\"text/css\">
table {
width: 100%;
}
td {
border: 1px solid black;
}
td.centered {
text-align: center;
}
td.wrong {
background: #ff9a69;
}
td.correct {
background: #d2ffc4;
}
td.nopatches {
background: #d2ffc4;
}
td.somepatches {
background: #ffd870;
}
td.lotsofpatches {
background: #ff9a69;
}
td.good_url {
background: #d2ffc4;
}
td.missing_url {
background: #ffd870;
}
td.invalid_url {
background: #ff9a69;
}
td.version-good {
background: #d2ffc4;
}
td.version-needs-update {
background: #ff9a69;
}
td.version-unknown {
background: #ffd870;
}
td.version-error {
background: #ccc;
}
</style>
<title>Statistics of Buildroot packages</title>
</head>
<a href=\"#results\">Results</a><br/>
<p id=\"sortable_hint\"></p>
"""
html_footer = """
</body>
<script>
if (typeof sorttable === \"object\") {
document.getElementById(\"sortable_hint\").innerHTML =
\"hint: the table can be sorted by clicking the column headers\"
}
</script>
</html>
"""
def infra_str(infra_list):
if not infra_list:
return "Unknown"
elif len(infra_list) == 1:
return "<b>%s</b><br/>%s" % (infra_list[0][1], infra_list[0][0])
elif infra_list[0][1] == infra_list[1][1]:
return "<b>%s</b><br/>%s + %s" % \
(infra_list[0][1], infra_list[0][0], infra_list[1][0])
else:
return "<b>%s</b> (%s)<br/><b>%s</b> (%s)" % \
(infra_list[0][1], infra_list[0][0],
infra_list[1][1], infra_list[1][0])
def boolean_str(b):
if b:
return "Yes"
else:
return "No"
def dump_html_pkg(f, pkg):
f.write(" <tr>\n")
f.write(" <td>%s</td>\n" % pkg.path[2:])
# Patch count
td_class = ["centered"]
if pkg.patch_count == 0:
td_class.append("nopatches")
elif pkg.patch_count < 5:
td_class.append("somepatches")
else:
td_class.append("lotsofpatches")
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), str(pkg.patch_count)))
# Infrastructure
infra = infra_str(pkg.infras)
td_class = ["centered"]
if infra == "Unknown":
td_class.append("wrong")
else:
td_class.append("correct")
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), infra_str(pkg.infras)))
# License
td_class = ["centered"]
if pkg.has_license:
td_class.append("correct")
else:
td_class.append("wrong")
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), boolean_str(pkg.has_license)))
# License files
td_class = ["centered"]
if pkg.has_license_files:
td_class.append("correct")
else:
td_class.append("wrong")
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), boolean_str(pkg.has_license_files)))
# Hash
td_class = ["centered"]
if pkg.has_hash:
td_class.append("correct")
else:
td_class.append("wrong")
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), boolean_str(pkg.has_hash)))
# Current version
if len(pkg.current_version) > 20:
current_version = pkg.current_version[:20] + "..."
else:
current_version = pkg.current_version
f.write(" <td class=\"centered\">%s</td>\n" % current_version)
# Latest version
if pkg.latest_version[0] == RM_API_STATUS_ERROR:
td_class.append("version-error")
if pkg.latest_version[1] is None:
td_class.append("version-unknown")
elif pkg.latest_version[1] != pkg.current_version:
td_class.append("version-needs-update")
else:
td_class.append("version-good")
if pkg.latest_version[0] == RM_API_STATUS_ERROR:
latest_version_text = "<b>Error</b>"
elif pkg.latest_version[0] == RM_API_STATUS_NOT_FOUND:
latest_version_text = "<b>Not found</b>"
else:
if pkg.latest_version[1] is None:
latest_version_text = "<b>Found, but no version</b>"
else:
latest_version_text = "<a href=\"https://release-monitoring.org/project/%s\"><b>%s</b></a>" % \
(pkg.latest_version[2], str(pkg.latest_version[1]))
latest_version_text += "<br/>"
if pkg.latest_version[0] == RM_API_STATUS_FOUND_BY_DISTRO:
latest_version_text += "found by <a href=\"https://release-monitoring.org/distro/Buildroot/\">distro</a>"
else:
latest_version_text += "found by guess"
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), latest_version_text))
# Warnings
td_class = ["centered"]
if pkg.warnings == 0:
td_class.append("correct")
else:
td_class.append("wrong")
f.write(" <td class=\"%s\">%d</td>\n" %
(" ".join(td_class), pkg.warnings))
# URL status
td_class = ["centered"]
url_str = pkg.url_status
if pkg.url_status == "Missing" or pkg.url_status == "No Config.in":
td_class.append("missing_url")
elif pkg.url_status.startswith("Invalid"):
td_class.append("invalid_url")
url_str = "<a href=%s>%s</a>" % (pkg.url, pkg.url_status)
else:
td_class.append("good_url")
url_str = "<a href=%s>Link</a>" % pkg.url
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), url_str))
# CVEs
td_class = ["centered"]
if len(pkg.cves) == 0:
td_class.append("correct")
else:
td_class.append("wrong")
f.write(" <td class=\"%s\">\n" % " ".join(td_class))
for cve in pkg.cves:
f.write(" <a href=\"https://security-tracker.debian.org/tracker/%s\">%s<br/>\n" % (cve, cve))
f.write(" </td>\n")
f.write(" </tr>\n")
def dump_html_all_pkgs(f, packages):
f.write("""
<table class=\"sortable\">
<tr>
<td>Package</td>
<td class=\"centered\">Patch count</td>
<td class=\"centered\">Infrastructure</td>
<td class=\"centered\">License</td>
<td class=\"centered\">License files</td>
<td class=\"centered\">Hash file</td>
<td class=\"centered\">Current version</td>
<td class=\"centered\">Latest version</td>
<td class=\"centered\">Warnings</td>
<td class=\"centered\">Upstream URL</td>
<td class=\"centered\">CVEs</td>
</tr>
""")
for pkg in sorted(packages):
dump_html_pkg(f, pkg)
f.write("</table>")
def dump_html_stats(f, stats):
f.write("<a id=\"results\"></a>\n")
f.write("<table>\n")
infras = [infra[6:] for infra in stats.keys() if infra.startswith("infra-")]
for infra in infras:
f.write(" <tr><td>Packages using the <i>%s</i> infrastructure</td><td>%s</td></tr>\n" %
(infra, stats["infra-%s" % infra]))
f.write(" <tr><td>Packages having license information</td><td>%s</td></tr>\n" %
stats["license"])
f.write(" <tr><td>Packages not having license information</td><td>%s</td></tr>\n" %
stats["no-license"])
f.write(" <tr><td>Packages having license files information</td><td>%s</td></tr>\n" %
stats["license-files"])
f.write(" <tr><td>Packages not having license files information</td><td>%s</td></tr>\n" %
stats["no-license-files"])
f.write(" <tr><td>Packages having a hash file</td><td>%s</td></tr>\n" %
stats["hash"])
f.write(" <tr><td>Packages not having a hash file</td><td>%s</td></tr>\n" %
stats["no-hash"])
f.write(" <tr><td>Total number of patches</td><td>%s</td></tr>\n" %
stats["patches"])
f.write("<tr><td>Packages having a mapping on <i>release-monitoring.org</i></td><td>%s</td></tr>\n" %
stats["rmo-mapping"])
f.write("<tr><td>Packages lacking a mapping on <i>release-monitoring.org</i></td><td>%s</td></tr>\n" %
stats["rmo-no-mapping"])
f.write("<tr><td>Packages that are up-to-date</td><td>%s</td></tr>\n" %
stats["version-uptodate"])
f.write("<tr><td>Packages that are not up-to-date</td><td>%s</td></tr>\n" %
stats["version-not-uptodate"])
f.write("<tr><td>Packages with no known upstream version</td><td>%s</td></tr>\n" %
stats["version-unknown"])
f.write("<tr><td>Packages affected by CVEs</td><td>%s</td></tr>\n" %
stats["pkg-cves"])
f.write("<tr><td>Total number of CVEs affecting all packages</td><td>%s</td></tr>\n" %
stats["total-cves"])
f.write("</table>\n")
def dump_html_gen_info(f, date, commit):
# Updated on Mon Feb 19 08:12:08 CET 2018, Git commit aa77030b8f5e41f1c53eb1c1ad664b8c814ba032
f.write("<p><i>Updated on %s, git commit %s</i></p>\n" % (str(date), commit))
def dump_html(packages, stats, date, commit, output):
with open(output, 'w') as f:
f.write(html_header)
dump_html_all_pkgs(f, packages)
dump_html_stats(f, stats)
dump_html_gen_info(f, date, commit)
f.write(html_footer)
def dump_json(packages, stats, date, commit, output):
# Format packages as a dictionnary instead of a list
# Exclude local field that does not contains real date
excluded_fields = ['url_worker', 'name']
pkgs = {
pkg.name: {
k: v
for k, v in pkg.__dict__.items()
if k not in excluded_fields
} for pkg in packages
}
# Aggregate infrastructures into a single dict entry
statistics = {
k: v
for k, v in stats.items()
if not k.startswith('infra-')
}
statistics['infra'] = {k[6:]: v for k, v in stats.items() if k.startswith('infra-')}
# The actual structure to dump, add commit and date to it
final = {'packages': pkgs,
'stats': statistics,
'commit': commit,
'date': str(date)}
with open(output, 'w') as f:
json.dump(final, f, indent=2, separators=(',', ': '))
f.write('\n')
def parse_args():
parser = argparse.ArgumentParser()
output = parser.add_argument_group('output', 'Output file(s)')
output.add_argument('--html', dest='html', action='store',
help='HTML output file')
output.add_argument('--json', dest='json', action='store',
help='JSON output file')
packages = parser.add_mutually_exclusive_group()
packages.add_argument('-n', dest='npackages', type=int, action='store',
help='Number of packages')
packages.add_argument('-p', dest='packages', action='store',
help='List of packages (comma separated)')
parser.add_argument('--nvd-path', dest='nvd_path',
help='Path to the local NVD database')
args = parser.parse_args()
if not args.html and not args.json:
parser.error('at least one of --html or --json (or both) is required')
return args
def __main__():
args = parse_args()
if args.packages:
package_list = args.packages.split(",")
else:
package_list = None
date = datetime.datetime.utcnow()
commit = subprocess.check_output(['git', 'rev-parse',
'HEAD']).splitlines()[0].decode()
print("Build package list ...")
packages = get_pkglist(args.npackages, package_list)
print("Getting package make info ...")
package_init_make_info()
print("Getting package details ...")
for pkg in packages:
pkg.set_infra()
pkg.set_license()
pkg.set_hash_info()
pkg.set_patch_count()
pkg.set_check_package_warnings()
pkg.set_current_version()
pkg.set_url()
print("Checking URL status")
check_package_urls(packages)
print("Getting latest versions ...")
check_package_latest_version(packages)
if args.nvd_path:
print("Checking packages CVEs")
check_package_cves(args.nvd_path, {p.name: p for p in packages})
print("Calculate stats")
stats = calculate_stats(packages)
if args.html:
print("Write HTML")
dump_html(packages, stats, date, commit, args.html)
if args.json:
print("Write JSON")
dump_json(packages, stats, date, commit, args.json)
__main__()