1#!/usr/bin/env python3
2
3# Copyright (C) 2009 by Thomas Petazzoni <thomas.petazzoni@free-electrons.com>
4# Copyright (C) 2020 by Gregory CLEMENT <gregory.clement@bootlin.com>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program; if not, write to the Free Software
18# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
20import datetime
21import os
22import distutils.version
23import json
24import subprocess
25import sys
26import operator
27
28sys.path.append('utils/')
29
30NVD_START_YEAR = 1999
31NVD_BASE_URL = "https://github.com/fkie-cad/nvd-json-data-feeds/"
32
33ops = {
34    '>=': operator.ge,
35    '>': operator.gt,
36    '<=': operator.le,
37    '<': operator.lt,
38    '=': operator.eq
39}
40
41
42# Check if two CPE IDs match each other
43def cpe_matches(cpe1, cpe2):
44    cpe1_elems = cpe1.split(":")
45    cpe2_elems = cpe2.split(":")
46
47    remains = filter(lambda x: x[0] not in ["*", "-"] and x[1] not in ["*", "-"] and x[0] != x[1],
48                     zip(cpe1_elems, cpe2_elems))
49    return len(list(remains)) == 0
50
51
52def cpe_product(cpe):
53    return cpe.split(':')[4]
54
55
56def cpe_version(cpe):
57    return cpe.split(':')[5]
58
59
60class CVE:
61    """An accessor class for CVE Items in NVD files"""
62    CVE_AFFECTS = 1
63    CVE_DOESNT_AFFECT = 2
64    CVE_UNKNOWN = 3
65
66    def __init__(self, nvd_cve):
67        """Initialize a CVE from its NVD JSON representation"""
68        self.nvd_cve = nvd_cve
69
70    @staticmethod
71    def download_nvd(nvd_git_dir):
72        print(f"Updating from {NVD_BASE_URL}")
73        if os.path.exists(nvd_git_dir):
74            subprocess.check_call(
75                ["git", "pull"],
76                cwd=nvd_git_dir,
77                stdout=subprocess.DEVNULL,
78                stderr=subprocess.DEVNULL,
79            )
80        else:
81            # Create the directory and its parents; git
82            # happily clones into an empty directory.
83            os.makedirs(nvd_git_dir)
84            subprocess.check_call(
85                ["git", "clone", NVD_BASE_URL, nvd_git_dir],
86                stdout=subprocess.DEVNULL,
87                stderr=subprocess.DEVNULL,
88            )
89
90    @staticmethod
91    def sort_id(cve_ids):
92        def cve_key(cve_id):
93            year, id_ = cve_id.split('-')[1:]
94            return (int(year), int(id_))
95        return sorted(cve_ids, key=cve_key)
96
97    @classmethod
98    def read_nvd_dir(cls, nvd_dir):
99        """
100        Iterate over all the CVEs contained in NIST Vulnerability Database
101        feeds since NVD_START_YEAR. If the files are missing or outdated in
102        nvd_dir, a fresh copy will be downloaded, and kept in .json.gz
103        """
104        nvd_git_dir = os.path.join(nvd_dir, "git")
105        CVE.download_nvd(nvd_git_dir)
106        for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1):
107            for dirpath, _, filenames in os.walk(os.path.join(nvd_git_dir, f"CVE-{year}")):
108                for filename in filenames:
109                    if filename[-5:] != ".json":
110                        continue
111                    with open(os.path.join(dirpath, filename), "rb") as f:
112                        yield cls(json.load(f))
113
114    def each_product(self):
115        """Iterate over each product section of this cve"""
116        for vendor in self.nvd_cve['cve']['affects']['vendor']['vendor_data']:
117            for product in vendor['product']['product_data']:
118                yield product
119
120    def parse_node(self, node):
121        """
122        Parse the node inside the configurations section to extract the
123        cpe information usefull to know if a product is affected by
124        the CVE. Actually only the product name and the version
125        descriptor are needed, but we also provide the vendor name.
126        """
127
128        # The node containing the cpe entries matching the CVE can also
129        # contain sub-nodes, so we need to manage it.
130        for child in node.get('children', ()):
131            for parsed_node in self.parse_node(child):
132                yield parsed_node
133
134        for cpe in node.get('cpeMatch', ()):
135            if not cpe['vulnerable']:
136                return
137            product = cpe_product(cpe['criteria'])
138            version = cpe_version(cpe['criteria'])
139            # ignore when product is '-', which means N/A
140            if product == '-':
141                return
142            op_start = ''
143            op_end = ''
144            v_start = ''
145            v_end = ''
146
147            if version != '*' and version != '-':
148                # Version is defined, this is a '=' match
149                op_start = '='
150                v_start = version
151            else:
152                # Parse start version, end version and operators
153                if 'versionStartIncluding' in cpe:
154                    op_start = '>='
155                    v_start = cpe['versionStartIncluding']
156
157                if 'versionStartExcluding' in cpe:
158                    op_start = '>'
159                    v_start = cpe['versionStartExcluding']
160
161                if 'versionEndIncluding' in cpe:
162                    op_end = '<='
163                    v_end = cpe['versionEndIncluding']
164
165                if 'versionEndExcluding' in cpe:
166                    op_end = '<'
167                    v_end = cpe['versionEndExcluding']
168
169            yield {
170                'id': cpe['criteria'],
171                'v_start': v_start,
172                'op_start': op_start,
173                'v_end': v_end,
174                'op_end': op_end
175            }
176
177    def each_cpe(self):
178        for nodes in self.nvd_cve.get('configurations', []):
179            for node in nodes['nodes']:
180                for cpe in self.parse_node(node):
181                    yield cpe
182
183    @property
184    def identifier(self):
185        """The CVE unique identifier"""
186        return self.nvd_cve['id']
187
188    @property
189    def affected_products(self):
190        """The set of CPE products referred by this CVE definition"""
191        return set(cpe_product(p['id']) for p in self.each_cpe())
192
193    def affects(self, name, version, cve_ignore_list, cpeid=None):
194        """
195        True if the Buildroot Package object passed as argument is affected
196        by this CVE.
197        """
198        if self.identifier in cve_ignore_list:
199            return self.CVE_DOESNT_AFFECT
200
201        pkg_version = distutils.version.LooseVersion(version)
202        if not hasattr(pkg_version, "version"):
203            print("Cannot parse package '%s' version '%s'" % (name, version))
204            pkg_version = None
205
206        # if we don't have a cpeid, build one based on name and version
207        if not cpeid:
208            cpeid = "cpe:2.3:*:*:%s:%s:*:*:*:*:*:*:*" % (name, version)
209        # if we have a cpeid, use its version instead of the package
210        # version, as they might be different due to
211        # <pkg>_CPE_ID_VERSION
212        else:
213            pkg_version = distutils.version.LooseVersion(cpe_version(cpeid))
214
215        for cpe in self.each_cpe():
216            if not cpe_matches(cpe['id'], cpeid):
217                continue
218            if not cpe['v_start'] and not cpe['v_end']:
219                return self.CVE_AFFECTS
220            if not pkg_version:
221                continue
222
223            if cpe['v_start']:
224                try:
225                    cve_affected_version = distutils.version.LooseVersion(cpe['v_start'])
226                    inrange = ops.get(cpe['op_start'])(pkg_version, cve_affected_version)
227                except TypeError:
228                    return self.CVE_UNKNOWN
229
230                # current package version is before v_start, so we're
231                # not affected by the CVE
232                if not inrange:
233                    continue
234
235            if cpe['v_end']:
236                try:
237                    cve_affected_version = distutils.version.LooseVersion(cpe['v_end'])
238                    inrange = ops.get(cpe['op_end'])(pkg_version, cve_affected_version)
239                except TypeError:
240                    return self.CVE_UNKNOWN
241
242                # current package version is after v_end, so we're
243                # not affected by the CVE
244                if not inrange:
245                    continue
246
247            # We're in the version range affected by this CVE
248            return self.CVE_AFFECTS
249
250        return self.CVE_DOESNT_AFFECT
251