1# SPDX-License-Identifier: GPL-2.0+
2# Copyright 2022 Google LLC
3# Copyright (C) 2022 Weidmüller Interface GmbH & Co. KG
4# Stefan Herbrechtsmeier <stefan.herbrechtsmeier@weidmueller.com>
5#
6"""Base class for all bintools
7
8This defines the common functionality for all bintools, including running
9the tool, checking its version and fetching it if needed.
10"""
11
12import collections
13import glob
14import importlib
15import multiprocessing
16import os
17import shutil
18import tempfile
19import urllib.error
20
21from u_boot_pylib import command
22from u_boot_pylib import terminal
23from u_boot_pylib import tools
24from u_boot_pylib import tout
25
26BINMAN_DIR = os.path.dirname(os.path.realpath(__file__))
27
28# Format string for listing bintools, see also the header in list_all()
29FORMAT = '%-16.16s %-12.12s %-26.26s %s'
30
31# List of known modules, to avoid importing the module multiple times
32modules = {}
33
34# Possible ways of fetching a tool (FETCH_COUNT is number of ways)
35FETCH_ANY, FETCH_BIN, FETCH_BUILD, FETCH_COUNT = range(4)
36
37FETCH_NAMES = {
38    FETCH_ANY: 'any method',
39    FETCH_BIN: 'binary download',
40    FETCH_BUILD: 'build from source'
41    }
42
43# Status of tool fetching
44FETCHED, FAIL, PRESENT, STATUS_COUNT = range(4)
45
46class Bintool:
47    """Tool which operates on binaries to help produce entry contents
48
49    This is the base class for all bintools
50    """
51    # List of bintools to regard as missing
52    missing_list = []
53
54    # Directory to store tools. Note that this set up by set_tool_dir() which
55    # must be called before this class is used.
56    tooldir = ''
57
58    # Flag to run 'apt-get update -y' once on first use of apt_install()
59    apt_updated = False
60
61    def __init__(self, name, desc, version_regex=None, version_args='-V'):
62        self.name = name
63        self.desc = desc
64        self.version_regex = version_regex
65        self.version_args = version_args
66
67    @staticmethod
68    def find_bintool_class(btype):
69        """Look up the bintool class for bintool
70
71        Args:
72            byte: Bintool to use, e.g. 'mkimage'
73
74        Returns:
75            The bintool class object if found, else a tuple:
76                module name that could not be found
77                exception received
78        """
79        # Convert something like 'u-boot' to 'u_boot' since we are only
80        # interested in the type.
81        module_name = btype.replace('-', '_')
82        module = modules.get(module_name)
83        class_name = f'Bintool{module_name}'
84
85        # Import the module if we have not already done so
86        if not module:
87            try:
88                module = importlib.import_module('binman.btool.' + module_name)
89            except ImportError as exc:
90                try:
91                    # Deal with classes which must be renamed due to conflicts
92                    # with Python libraries
93                    module = importlib.import_module('binman.btool.btool_' +
94                                                     module_name)
95                except ImportError:
96                    return module_name, exc
97            modules[module_name] = module
98
99        # Look up the expected class name
100        return getattr(module, class_name)
101
102    @staticmethod
103    def create(name):
104        """Create a new bintool object
105
106        Args:
107            name (str): Bintool to create, e.g. 'mkimage'
108
109        Returns:
110            A new object of the correct type (a subclass of Binutil)
111        """
112        cls = Bintool.find_bintool_class(name)
113        if isinstance(cls, tuple):
114            raise ValueError("Cannot import bintool module '%s': %s" % cls)
115
116        # Call its constructor to get the object we want.
117        obj = cls(name)
118        return obj
119
120    @classmethod
121    def set_tool_dir(cls, pathname):
122        """Set the path to use to store and find tools"""
123        cls.tooldir = pathname
124
125    def show(self):
126        """Show a line of information about a bintool"""
127        if self.is_present():
128            version = self.version()
129        else:
130            version = '-'
131        print(FORMAT % (self.name, version, self.desc,
132                        self.get_path() or '(not found)'))
133
134    @classmethod
135    def set_missing_list(cls, missing_list):
136        cls.missing_list = missing_list or []
137
138    @staticmethod
139    def get_tool_list(include_testing=False):
140        """Get a list of the known tools
141
142        Returns:
143            list of str: names of all tools known to binman
144        """
145        files = glob.glob(os.path.join(BINMAN_DIR, 'btool/*'))
146        names = [os.path.splitext(os.path.basename(fname))[0]
147                 for fname in files]
148        names = [name for name in names if name[0] != '_']
149        names = [name[6:] if name.startswith('btool_') else name
150                 for name in names]
151        if include_testing:
152            names.append('_testing')
153        return sorted(names)
154
155    @staticmethod
156    def list_all():
157        """List all the bintools known to binman"""
158        names = Bintool.get_tool_list()
159        print(FORMAT % ('Name', 'Version', 'Description', 'Path'))
160        print(FORMAT % ('-' * 15,'-' * 11, '-' * 25, '-' * 30))
161        for name in names:
162            btool = Bintool.create(name)
163            btool.show()
164
165    def is_present(self):
166        """Check if a bintool is available on the system
167
168        Returns:
169            bool: True if available, False if not
170        """
171        if self.name in self.missing_list:
172            return False
173        return bool(self.get_path())
174
175    def get_path(self):
176        """Get the path of a bintool
177
178        Returns:
179            str: Path to the tool, if available, else None
180        """
181        return tools.tool_find(self.name)
182
183    def fetch_tool(self, method, col, skip_present):
184        """Fetch a single tool
185
186        Args:
187            method (FETCH_...): Method to use
188            col (terminal.Color): Color terminal object
189            skip_present (boo;): Skip fetching if it is already present
190
191        Returns:
192            int: Result of fetch either FETCHED, FAIL, PRESENT
193        """
194        def try_fetch(meth):
195            res = None
196            try:
197                res = self.fetch(meth)
198            except urllib.error.URLError as uerr:
199                message = uerr.reason
200                print(col.build(col.RED, f'- {message}'))
201
202            except ValueError as exc:
203                print(f'Exception: {exc}')
204            return res
205
206        if skip_present and self.is_present():
207            return PRESENT
208        print(col.build(col.YELLOW, 'Fetch: %s' % self.name))
209        if method == FETCH_ANY:
210            for try_method in range(1, FETCH_COUNT):
211                print(f'- trying method: {FETCH_NAMES[try_method]}')
212                result = try_fetch(try_method)
213                if result:
214                    break
215        else:
216            result = try_fetch(method)
217        if not result:
218            return FAIL
219        if result is not True:
220            fname, tmpdir = result
221            dest = os.path.join(self.tooldir, self.name)
222            os.makedirs(self.tooldir, exist_ok=True)
223            print(f"- writing to '{dest}'")
224            shutil.move(fname, dest)
225            if tmpdir:
226                shutil.rmtree(tmpdir)
227        return FETCHED
228
229    @staticmethod
230    def fetch_tools(method, names_to_fetch):
231        """Fetch bintools from a suitable place
232
233        This fetches or builds the requested bintools so that they can be used
234        by binman
235
236        Args:
237            names_to_fetch (list of str): names of bintools to fetch
238
239        Returns:
240            True on success, False on failure
241        """
242        def show_status(color, prompt, names):
243            print(col.build(
244                color, f'{prompt}:%s{len(names):2}: %s' %
245                (' ' * (16 - len(prompt)), ' '.join(names))))
246
247        col = terminal.Color()
248        skip_present = False
249        name_list = names_to_fetch
250        if len(names_to_fetch) == 1 and names_to_fetch[0] in ['all', 'missing']:
251            name_list = Bintool.get_tool_list()
252            if names_to_fetch[0] == 'missing':
253                skip_present = True
254            print(col.build(col.YELLOW,
255                            'Fetching tools:      %s' % ' '.join(name_list)))
256        status = collections.defaultdict(list)
257        for name in name_list:
258            btool = Bintool.create(name)
259            result = btool.fetch_tool(method, col, skip_present)
260            status[result].append(name)
261            if result == FAIL:
262                if method == FETCH_ANY:
263                    print('- failed to fetch with all methods')
264                else:
265                    print(f"- method '{FETCH_NAMES[method]}' is not supported")
266
267        if len(name_list) > 1:
268            if skip_present:
269                show_status(col.GREEN, 'Already present', status[PRESENT])
270            show_status(col.GREEN, 'Tools fetched', status[FETCHED])
271            if status[FAIL]:
272                show_status(col.RED, 'Failures', status[FAIL])
273        return not status[FAIL]
274
275    def run_cmd_result(self, *args, binary=False, raise_on_error=True):
276        """Run the bintool using command-line arguments
277
278        Args:
279            args (list of str): Arguments to provide, in addition to the bintool
280                name
281            binary (bool): True to return output as bytes instead of str
282            raise_on_error (bool): True to raise a ValueError exception if the
283                tool returns a non-zero return code
284
285        Returns:
286            CommandResult: Resulting output from the bintool, or None if the
287                tool is not present
288        """
289        if self.name in self.missing_list:
290            return None
291        name = os.path.expanduser(self.name)  # Expand paths containing ~
292        all_args = (name,) + args
293        env = tools.get_env_with_path()
294        tout.debug(f"bintool: {' '.join(all_args)}")
295        result = command.run_pipe(
296            [all_args], capture=True, capture_stderr=True, env=env,
297            raise_on_error=False, binary=binary)
298
299        if result.return_code:
300            # Return None if the tool was not found. In this case there is no
301            # output from the tool and it does not appear on the path. We still
302            # try to run it (as above) since RunPipe() allows faking the tool's
303            # output
304            if not any([result.stdout, result.stderr, tools.tool_find(name)]):
305                tout.info(f"bintool '{name}' not found")
306                return None
307            if raise_on_error:
308                tout.info(f"bintool '{name}' failed")
309                raise ValueError("Error %d running '%s': %s" %
310                                (result.return_code, ' '.join(all_args),
311                                result.stderr or result.stdout))
312        if result.stdout:
313            tout.debug(result.stdout)
314        if result.stderr:
315            tout.debug(result.stderr)
316        return result
317
318    def run_cmd(self, *args, binary=False):
319        """Run the bintool using command-line arguments
320
321        Args:
322            args (list of str): Arguments to provide, in addition to the bintool
323                name
324            binary (bool): True to return output as bytes instead of str
325
326        Returns:
327            str or bytes: Resulting stdout from the bintool
328        """
329        result = self.run_cmd_result(*args, binary=binary)
330        if result:
331            return result.stdout
332
333    @classmethod
334    def build_from_git(cls, git_repo, make_targets, bintool_path,
335            flags=None, git_branch=None, make_path=None):
336        """Build a bintool from a git repo
337
338        This clones the repo in a temporary directory, builds it with 'make',
339        then returns the filename of the resulting executable bintool
340
341        Args:
342            git_repo (str): URL of git repo
343            make_targets (list of str): List of targets to pass to 'make' to build
344                the tool
345            bintool_path (str): Relative path of the tool in the repo, after
346                build is complete
347            flags (list of str): Flags or variables to pass to make, or None
348            git_branch (str): Branch of git repo, or None to use the default
349            make_path (str): Relative path inside git repo containing the
350                Makefile, or None
351
352        Returns:
353            tuple:
354                str: Filename of fetched file to copy to a suitable directory
355                str: Name of temp directory to remove, or None
356            or None on error
357        """
358        tmpdir = tempfile.mkdtemp(prefix='binmanf.')
359        print(f"- clone git repo '{git_repo}' to '{tmpdir}'")
360        if git_branch:
361            tools.run('git', 'clone', '--depth', '1', '--branch', git_branch,
362                      git_repo, tmpdir)
363        else:
364            tools.run('git', 'clone', '--depth', '1', git_repo, tmpdir)
365        for target in make_targets:
366            print(f"- build target '{target}'")
367            makedir = tmpdir
368            if make_path:
369                makedir = os.path.join(tmpdir, make_path)
370            cmd = ['make', '-C', makedir, '-j', f'{multiprocessing.cpu_count()}',
371                   target]
372            if flags:
373                cmd += flags
374            tools.run(*cmd)
375
376        fname = os.path.join(tmpdir, bintool_path)
377        if not os.path.exists(fname):
378            print(f"- File '{fname}' was not produced")
379            return None
380        return fname, tmpdir
381
382    @classmethod
383    def fetch_from_url(cls, url):
384        """Fetch a bintool from a URL
385
386        Args:
387            url (str): URL to fetch from
388
389        Returns:
390            tuple:
391                str: Filename of fetched file to copy to a suitable directory
392                str: Name of temp directory to remove, or None
393        """
394        fname, tmpdir = tools.download(url)
395        tools.run('chmod', 'a+x', fname)
396        return fname, tmpdir
397
398    @classmethod
399    def fetch_from_drive(cls, drive_id):
400        """Fetch a bintool from Google drive
401
402        Args:
403            drive_id (str): ID of file to fetch. For a URL of the form
404            'https://drive.google.com/file/d/xxx/view?usp=sharing' the value
405            passed here should be 'xxx'
406
407        Returns:
408            tuple:
409                str: Filename of fetched file to copy to a suitable directory
410                str: Name of temp directory to remove, or None
411        """
412        url = f'https://drive.google.com/uc?export=download&id={drive_id}'
413        return cls.fetch_from_url(url)
414
415    @classmethod
416    def apt_install(cls, package):
417        """Install a bintool using the 'apt' tool
418
419        This requires use of servo so may request a password
420
421        Args:
422            package (str): Name of package to install
423
424        Returns:
425            True, assuming it completes without error
426        """
427        if not cls.apt_updated:
428            args = ['sudo', 'apt-get', 'update', '-y']
429            print('- %s' % ' '.join(args))
430            tools.run(*args)
431            cls.apt_updated = True
432        args = ['sudo', 'apt-get', 'install', '-y', package]
433        print('- %s' % ' '.join(args))
434        tools.run(*args)
435        return True
436
437    @staticmethod
438    def WriteDocs(modules, test_missing=None):
439        """Write out documentation about the various bintools to stdout
440
441        Args:
442            modules: List of modules to include
443            test_missing: Used for testing. This is a module to report
444                as missing
445        """
446        print('''.. SPDX-License-Identifier: GPL-2.0+
447
448Binman bintool Documentation
449============================
450
451This file describes the bintools (binary tools) supported by binman. Bintools
452are binman's name for external executables that it runs to generate or process
453binaries. It is fairly easy to create new bintools. Just add a new file to the
454'btool' directory. You can use existing bintools as examples.
455
456
457''')
458        modules = sorted(modules)
459        missing = []
460        for name in modules:
461            module = Bintool.find_bintool_class(name)
462            docs = getattr(module, '__doc__')
463            if test_missing == name:
464                docs = None
465            if docs:
466                lines = docs.splitlines()
467                first_line = lines[0]
468                rest = [line[4:] for line in lines[1:]]
469                hdr = 'Bintool: %s: %s' % (name, first_line)
470                print(hdr)
471                print('-' * len(hdr))
472                print('\n'.join(rest))
473                print()
474                print()
475            else:
476                missing.append(name)
477
478        if missing:
479            raise ValueError('Documentation is missing for modules: %s' %
480                             ', '.join(missing))
481
482    # pylint: disable=W0613
483    def fetch(self, method):
484        """Fetch handler for a bintool
485
486        This should be implemented by the base class
487
488        Args:
489            method (FETCH_...): Method to use
490
491        Returns:
492            tuple:
493                str: Filename of fetched file to copy to a suitable directory
494                str: Name of temp directory to remove, or None
495            or True if the file was fetched and already installed
496            or None if no fetch() implementation is available
497
498        Raises:
499            Valuerror: Fetching could not be completed
500        """
501        print(f"No method to fetch bintool '{self.name}'")
502        return False
503
504    def version(self):
505        """Version handler for a bintool
506
507        Returns:
508            str: Version string for this bintool
509        """
510        if self.version_regex is None:
511            return 'unknown'
512
513        import re
514
515        result = self.run_cmd_result(self.version_args)
516        out = result.stdout.strip()
517        if not out:
518            out = result.stderr.strip()
519        if not out:
520            return 'unknown'
521
522        m_version = re.search(self.version_regex, out)
523        return m_version.group(1) if m_version else out
524
525
526class BintoolPacker(Bintool):
527    """Tool which compression / decompression entry contents
528
529    This is a bintools base class for compression / decompression packer
530
531    Properties:
532        name: Name of packer tool
533        compression: Compression type (COMPRESS_...), value of 'name' property
534            if none
535        compress_args: List of positional args provided to tool for compress,
536            ['--compress'] if none
537        decompress_args: List of positional args provided to tool for
538            decompress, ['--decompress'] if none
539        fetch_package: Name of the tool installed using the apt, value of 'name'
540            property if none
541        version_regex: Regular expressions to extract the version from tool
542            version output,  '(v[0-9.]+)' if none
543    """
544    def __init__(self, name, compression=None, compress_args=None,
545                 decompress_args=None, fetch_package=None,
546                 version_regex=r'(v[0-9.]+)', version_args='-V'):
547        desc = '%s compression' % (compression if compression else name)
548        super().__init__(name, desc, version_regex, version_args)
549        if compress_args is None:
550            compress_args = ['--compress']
551        self.compress_args = compress_args
552        if decompress_args is None:
553            decompress_args = ['--decompress']
554        self.decompress_args = decompress_args
555        if fetch_package is None:
556            fetch_package = name
557        self.fetch_package = fetch_package
558
559    def compress(self, indata):
560        """Compress data
561
562        Args:
563            indata (bytes): Data to compress
564
565        Returns:
566            bytes: Compressed data
567        """
568        with tempfile.NamedTemporaryFile(prefix='comp.tmp',
569                                         dir=tools.get_output_dir()) as tmp:
570            tools.write_file(tmp.name, indata)
571            args = self.compress_args + ['--stdout', tmp.name]
572            return self.run_cmd(*args, binary=True)
573
574    def decompress(self, indata):
575        """Decompress data
576
577        Args:
578            indata (bytes): Data to decompress
579
580        Returns:
581            bytes: Decompressed data
582        """
583        with tempfile.NamedTemporaryFile(prefix='decomp.tmp',
584                                         dir=tools.get_output_dir()) as inf:
585            tools.write_file(inf.name, indata)
586            args = self.decompress_args + ['--stdout', inf.name]
587            return self.run_cmd(*args, binary=True)
588
589    def fetch(self, method):
590        """Fetch handler
591
592        This installs the gzip package using the apt utility.
593
594        Args:
595            method (FETCH_...): Method to use
596
597        Returns:
598            True if the file was fetched and now installed, None if a method
599            other than FETCH_BIN was requested
600
601        Raises:
602            Valuerror: Fetching could not be completed
603        """
604        if method != FETCH_BIN:
605            return None
606        return self.apt_install(self.fetch_package)
607