1# Copyright (c) 2022 Nordic Semiconductor ASA
2#
3# SPDX-License-Identifier: Apache-2.0
4
5import argparse
6import os
7import sys
8import textwrap
9from pathlib import Path
10from urllib.parse import urlparse
11
12from west.commands import WestCommand
13from zephyr_ext_common import ZEPHYR_BASE
14
15sys.path.append(os.fspath(Path(__file__).parent.parent))
16import zephyr_module
17
18
19class Blobs(WestCommand):
20
21    DEFAULT_LIST_FMT = '{module} {status} {path} {type} {abspath}'
22
23    def __init__(self):
24        super().__init__(
25            'blobs',
26            # Keep this in sync with the string in west-commands.yml.
27            'work with binary blobs',
28            'Work with binary blobs',
29            accepts_unknown_args=False)
30
31    def do_add_parser(self, parser_adder):
32        parser = parser_adder.add_parser(
33            self.name,
34            help=self.help,
35            formatter_class=argparse.RawDescriptionHelpFormatter,
36            description=self.description,
37            epilog=textwrap.dedent(f'''\
38            FORMAT STRINGS
39            --------------
40
41            Blobs are listed using a Python 3 format string. Arguments
42            to the format string are accessed by name.
43
44            The default format string is:
45
46            "{self.DEFAULT_LIST_FMT}"
47
48            The following arguments are available:
49
50            - module: name of the module that contains this blob
51            - abspath: blob absolute path
52            - status: short status (A: present, M: hash failure, D: not present)
53            - path: blob local path from <module>/zephyr/blobs/
54            - sha256: blob SHA256 hash in hex
55            - type: type of blob
56            - version: version string
57            - license_path: path to the license file for the blob
58            - license-abspath: absolute path to the license file for the blob
59            - click-through: need license click-through or not
60            - uri: URI to the remote location of the blob
61            - description: blob text description
62            - doc-url: URL to the documentation for this blob
63            '''))
64
65        # Remember to update west-completion.bash if you add or remove
66        # flags
67        parser.add_argument('subcmd', nargs=1,
68                            choices=['list', 'fetch', 'clean'],
69                            help='sub-command to execute')
70
71        parser.add_argument('modules', metavar='MODULE', nargs='*',
72                            help='''zephyr modules to operate on;
73                            all modules will be used if not given''')
74
75        group = parser.add_argument_group('west blob list options')
76        group.add_argument('-f', '--format',
77                            help='''format string to use to list each blob;
78                                    see FORMAT STRINGS below''')
79
80        group = parser.add_argument_group('west blobs fetch options')
81        group.add_argument('-a', '--auto-accept', action='store_true',
82                            help='''auto accept license if the fetching needs click-through''')
83
84        return parser
85
86    def get_blobs(self, args):
87        blobs = []
88        modules = args.modules
89        all_modules = zephyr_module.parse_modules(ZEPHYR_BASE, self.manifest)
90        all_names = [m.meta.get('name', None) for m in all_modules]
91
92        unknown = set(modules) - set(all_names)
93
94        if len(unknown):
95            self.die(f'Unknown module(s): {unknown}')
96
97        for module in all_modules:
98            # Filter by module
99            module_name = module.meta.get('name', None)
100            if len(modules) and module_name not in modules:
101                continue
102
103            blobs += zephyr_module.process_blobs(module.project, module.meta)
104
105        return blobs
106
107    def list(self, args):
108        blobs = self.get_blobs(args)
109        fmt = args.format or self.DEFAULT_LIST_FMT
110        for blob in blobs:
111            self.inf(fmt.format(**blob))
112
113    def ensure_folder(self, path):
114        path.parent.mkdir(parents=True, exist_ok=True)
115
116    def fetch_blob(self, url, path):
117        scheme = urlparse(url).scheme
118        self.dbg(f'Fetching {path} with {scheme}')
119        import fetchers
120        fetcher = fetchers.get_fetcher_cls(scheme)
121
122        self.dbg(f'Found fetcher: {fetcher}')
123        inst = fetcher()
124        self.ensure_folder(path)
125        inst.fetch(url, path)
126
127    # Compare the checksum of a file we've just downloaded
128    # to the digest in blob metadata, warn user if they differ.
129    def verify_blob(self, blob) -> bool:
130        self.dbg('Verifying blob {module}: {abspath}'.format(**blob))
131
132        status = zephyr_module.get_blob_status(blob['abspath'], blob['sha256'])
133        if status == zephyr_module.BLOB_OUTDATED:
134            self.err(textwrap.dedent(
135                f'''\
136                The checksum of the downloaded file does not match that
137                in the blob metadata:
138                - if it is not certain that the download was successful,
139                  try running 'west blobs fetch {blob['module']}'
140                  to re-download the file
141                - if the error persists, please consider contacting
142                  the maintainers of the module so that they can check
143                  the corresponding blob metadata
144
145                Module: {blob['module']}
146                Blob:   {blob['path']}
147                URL:    {blob['url']}
148                Info:   {blob['description']}'''))
149            return False
150        return True
151
152    def fetch(self, args):
153        bad_checksum_count = 0
154        blobs = self.get_blobs(args)
155        for blob in blobs:
156            if blob['status'] == zephyr_module.BLOB_PRESENT:
157                self.dbg('Blob {module}: {abspath} is up to date'.format(**blob))
158                continue
159            self.inf('Fetching blob {module}: {abspath}'.format(**blob))
160
161            if blob['click-through'] and not args.auto_accept:
162                while True:
163                    user_input = input("For this blob, need to read and accept "
164                                       "license to continue. Read it?\n"
165                                       "Please type 'y' or 'n' and press enter to confirm: ")
166                    if user_input.upper() == "Y" or user_input.upper() == "N":
167                        break
168
169                if user_input.upper() != "Y":
170                    self.wrn('Skip fetching this blob.')
171                    continue
172
173                with open(blob['license-abspath']) as license_file:
174                    license_content = license_file.read()
175                    print(license_content)
176
177                while True:
178                    user_input = input("Accept license to continue?\n"
179                                       "Please type 'y' or 'n' and press enter to confirm: ")
180                    if user_input.upper() == "Y" or user_input.upper() == "N":
181                        break
182
183                if user_input.upper() != "Y":
184                    self.wrn('Skip fetching this blob.')
185                    continue
186
187            self.fetch_blob(blob['url'], blob['abspath'])
188            if not self.verify_blob(blob):
189                bad_checksum_count += 1
190
191        if bad_checksum_count:
192            self.err(f"{bad_checksum_count} blobs have bad checksums")
193            sys.exit(os.EX_DATAERR)
194
195    def clean(self, args):
196        blobs = self.get_blobs(args)
197        for blob in blobs:
198            if blob['status'] == zephyr_module.BLOB_NOT_PRESENT:
199                self.dbg('Blob {module}: {abspath} not in filesystem'.format(**blob))
200                continue
201            self.inf('Deleting blob {module}: {status} {abspath}'.format(**blob))
202            blob['abspath'].unlink()
203
204    def do_run(self, args, _):
205        self.dbg(f'subcmd: \'{args.subcmd[0]}\' modules: {args.modules}')
206
207        subcmd = getattr(self, args.subcmd[0])
208
209        if args.subcmd[0] != 'list' and args.format is not None:
210            self.die('unexpected --format argument; this is a "west blobs list" option')
211
212        subcmd(args)
213