1#!/usr/bin/env python3 2 3# Copyright (C) 2009 by Thomas Petazzoni <thomas.petazzoni@free-electrons.com> 4# Copyright (C) 2020 by Gregory CLEMENT <gregory.clement@bootlin.com> 5# 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14# General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program; if not, write to the Free Software 18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 19 20import datetime 21import os 22import distutils.version 23import json 24import subprocess 25import sys 26import operator 27 28sys.path.append('utils/') 29 30NVD_START_YEAR = 1999 31NVD_BASE_URL = "https://github.com/fkie-cad/nvd-json-data-feeds/" 32 33ops = { 34 '>=': operator.ge, 35 '>': operator.gt, 36 '<=': operator.le, 37 '<': operator.lt, 38 '=': operator.eq 39} 40 41 42# Check if two CPE IDs match each other 43def cpe_matches(cpe1, cpe2): 44 cpe1_elems = cpe1.split(":") 45 cpe2_elems = cpe2.split(":") 46 47 remains = filter(lambda x: x[0] not in ["*", "-"] and x[1] not in ["*", "-"] and x[0] != x[1], 48 zip(cpe1_elems, cpe2_elems)) 49 return len(list(remains)) == 0 50 51 52def cpe_product(cpe): 53 return cpe.split(':')[4] 54 55 56def cpe_version(cpe): 57 return cpe.split(':')[5] 58 59 60class CVE: 61 """An accessor class for CVE Items in NVD files""" 62 CVE_AFFECTS = 1 63 CVE_DOESNT_AFFECT = 2 64 CVE_UNKNOWN = 3 65 66 def __init__(self, nvd_cve): 67 """Initialize a CVE from its NVD JSON representation""" 68 self.nvd_cve = nvd_cve 69 70 @staticmethod 71 def download_nvd(nvd_git_dir): 72 print(f"Updating from {NVD_BASE_URL}") 73 if os.path.exists(nvd_git_dir): 74 subprocess.check_call( 75 ["git", "pull"], 76 cwd=nvd_git_dir, 77 stdout=subprocess.DEVNULL, 78 stderr=subprocess.DEVNULL, 79 ) 80 else: 81 # Create the directory and its parents; git 82 # happily clones into an empty directory. 83 os.makedirs(nvd_git_dir) 84 subprocess.check_call( 85 ["git", "clone", NVD_BASE_URL, nvd_git_dir], 86 stdout=subprocess.DEVNULL, 87 stderr=subprocess.DEVNULL, 88 ) 89 90 @staticmethod 91 def sort_id(cve_ids): 92 def cve_key(cve_id): 93 year, id_ = cve_id.split('-')[1:] 94 return (int(year), int(id_)) 95 return sorted(cve_ids, key=cve_key) 96 97 @classmethod 98 def read_nvd_dir(cls, nvd_dir): 99 """ 100 Iterate over all the CVEs contained in NIST Vulnerability Database 101 feeds since NVD_START_YEAR. If the files are missing or outdated in 102 nvd_dir, a fresh copy will be downloaded, and kept in .json.gz 103 """ 104 nvd_git_dir = os.path.join(nvd_dir, "git") 105 CVE.download_nvd(nvd_git_dir) 106 for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1): 107 for dirpath, _, filenames in os.walk(os.path.join(nvd_git_dir, f"CVE-{year}")): 108 for filename in filenames: 109 if filename[-5:] != ".json": 110 continue 111 with open(os.path.join(dirpath, filename), "rb") as f: 112 yield cls(json.load(f)) 113 114 def each_product(self): 115 """Iterate over each product section of this cve""" 116 for vendor in self.nvd_cve['cve']['affects']['vendor']['vendor_data']: 117 for product in vendor['product']['product_data']: 118 yield product 119 120 def parse_node(self, node): 121 """ 122 Parse the node inside the configurations section to extract the 123 cpe information usefull to know if a product is affected by 124 the CVE. Actually only the product name and the version 125 descriptor are needed, but we also provide the vendor name. 126 """ 127 128 # The node containing the cpe entries matching the CVE can also 129 # contain sub-nodes, so we need to manage it. 130 for child in node.get('children', ()): 131 for parsed_node in self.parse_node(child): 132 yield parsed_node 133 134 for cpe in node.get('cpeMatch', ()): 135 if not cpe['vulnerable']: 136 return 137 product = cpe_product(cpe['criteria']) 138 version = cpe_version(cpe['criteria']) 139 # ignore when product is '-', which means N/A 140 if product == '-': 141 return 142 op_start = '' 143 op_end = '' 144 v_start = '' 145 v_end = '' 146 147 if version != '*' and version != '-': 148 # Version is defined, this is a '=' match 149 op_start = '=' 150 v_start = version 151 else: 152 # Parse start version, end version and operators 153 if 'versionStartIncluding' in cpe: 154 op_start = '>=' 155 v_start = cpe['versionStartIncluding'] 156 157 if 'versionStartExcluding' in cpe: 158 op_start = '>' 159 v_start = cpe['versionStartExcluding'] 160 161 if 'versionEndIncluding' in cpe: 162 op_end = '<=' 163 v_end = cpe['versionEndIncluding'] 164 165 if 'versionEndExcluding' in cpe: 166 op_end = '<' 167 v_end = cpe['versionEndExcluding'] 168 169 yield { 170 'id': cpe['criteria'], 171 'v_start': v_start, 172 'op_start': op_start, 173 'v_end': v_end, 174 'op_end': op_end 175 } 176 177 def each_cpe(self): 178 for nodes in self.nvd_cve.get('configurations', []): 179 for node in nodes['nodes']: 180 for cpe in self.parse_node(node): 181 yield cpe 182 183 @property 184 def identifier(self): 185 """The CVE unique identifier""" 186 return self.nvd_cve['id'] 187 188 @property 189 def affected_products(self): 190 """The set of CPE products referred by this CVE definition""" 191 return set(cpe_product(p['id']) for p in self.each_cpe()) 192 193 def affects(self, name, version, cve_ignore_list, cpeid=None): 194 """ 195 True if the Buildroot Package object passed as argument is affected 196 by this CVE. 197 """ 198 if self.identifier in cve_ignore_list: 199 return self.CVE_DOESNT_AFFECT 200 201 pkg_version = distutils.version.LooseVersion(version) 202 if not hasattr(pkg_version, "version"): 203 print("Cannot parse package '%s' version '%s'" % (name, version)) 204 pkg_version = None 205 206 # if we don't have a cpeid, build one based on name and version 207 if not cpeid: 208 cpeid = "cpe:2.3:*:*:%s:%s:*:*:*:*:*:*:*" % (name, version) 209 # if we have a cpeid, use its version instead of the package 210 # version, as they might be different due to 211 # <pkg>_CPE_ID_VERSION 212 else: 213 pkg_version = distutils.version.LooseVersion(cpe_version(cpeid)) 214 215 for cpe in self.each_cpe(): 216 if not cpe_matches(cpe['id'], cpeid): 217 continue 218 if not cpe['v_start'] and not cpe['v_end']: 219 return self.CVE_AFFECTS 220 if not pkg_version: 221 continue 222 223 if cpe['v_start']: 224 try: 225 cve_affected_version = distutils.version.LooseVersion(cpe['v_start']) 226 inrange = ops.get(cpe['op_start'])(pkg_version, cve_affected_version) 227 except TypeError: 228 return self.CVE_UNKNOWN 229 230 # current package version is before v_start, so we're 231 # not affected by the CVE 232 if not inrange: 233 continue 234 235 if cpe['v_end']: 236 try: 237 cve_affected_version = distutils.version.LooseVersion(cpe['v_end']) 238 inrange = ops.get(cpe['op_end'])(pkg_version, cve_affected_version) 239 except TypeError: 240 return self.CVE_UNKNOWN 241 242 # current package version is after v_end, so we're 243 # not affected by the CVE 244 if not inrange: 245 continue 246 247 # We're in the version range affected by this CVE 248 return self.CVE_AFFECTS 249 250 return self.CVE_DOESNT_AFFECT 251