1# SPDX-License-Identifier: GPL-2.0+
2# Copyright 2022 Google LLC
3# Copyright (C) 2022 Weidmüller Interface GmbH & Co. KG
4# Stefan Herbrechtsmeier <stefan.herbrechtsmeier@weidmueller.com>
5#
6"""Base class for all bintools
7
8This defines the common functionality for all bintools, including running
9the tool, checking its version and fetching it if needed.
10"""
11
12import collections
13import glob
14import importlib
15import multiprocessing
16import os
17import shutil
18import tempfile
19import urllib.error
20
21from u_boot_pylib import command
22from u_boot_pylib import terminal
23from u_boot_pylib import tools
24from u_boot_pylib import tout
25
26BINMAN_DIR = os.path.dirname(os.path.realpath(__file__))
27
28# Format string for listing bintools, see also the header in list_all()
29FORMAT = '%-16.16s %-12.12s %-26.26s %s'
30
31# List of known modules, to avoid importing the module multiple times
32modules = {}
33
34# Possible ways of fetching a tool (FETCH_COUNT is number of ways)
35FETCH_ANY, FETCH_BIN, FETCH_BUILD, FETCH_COUNT = range(4)
36
37FETCH_NAMES = {
38    FETCH_ANY: 'any method',
39    FETCH_BIN: 'binary download',
40    FETCH_BUILD: 'build from source'
41    }
42
43# Status of tool fetching
44FETCHED, FAIL, PRESENT, STATUS_COUNT = range(4)
45
46class Bintool:
47    """Tool which operates on binaries to help produce entry contents
48
49    This is the base class for all bintools
50    """
51    # List of bintools to regard as missing
52    missing_list = []
53
54    # Directory to store tools. Note that this set up by set_tool_dir() which
55    # must be called before this class is used.
56    tooldir = ''
57
58    def __init__(self, name, desc, version_regex=None, version_args='-V'):
59        self.name = name
60        self.desc = desc
61        self.version_regex = version_regex
62        self.version_args = version_args
63
64    @staticmethod
65    def find_bintool_class(btype):
66        """Look up the bintool class for bintool
67
68        Args:
69            byte: Bintool to use, e.g. 'mkimage'
70
71        Returns:
72            The bintool class object if found, else a tuple:
73                module name that could not be found
74                exception received
75        """
76        # Convert something like 'u-boot' to 'u_boot' since we are only
77        # interested in the type.
78        module_name = btype.replace('-', '_')
79        module = modules.get(module_name)
80        class_name = f'Bintool{module_name}'
81
82        # Import the module if we have not already done so
83        if not module:
84            try:
85                module = importlib.import_module('binman.btool.' + module_name)
86            except ImportError as exc:
87                try:
88                    # Deal with classes which must be renamed due to conflicts
89                    # with Python libraries
90                    module = importlib.import_module('binman.btool.btool_' +
91                                                     module_name)
92                except ImportError:
93                    return module_name, exc
94            modules[module_name] = module
95
96        # Look up the expected class name
97        return getattr(module, class_name)
98
99    @staticmethod
100    def create(name):
101        """Create a new bintool object
102
103        Args:
104            name (str): Bintool to create, e.g. 'mkimage'
105
106        Returns:
107            A new object of the correct type (a subclass of Binutil)
108        """
109        cls = Bintool.find_bintool_class(name)
110        if isinstance(cls, tuple):
111            raise ValueError("Cannot import bintool module '%s': %s" % cls)
112
113        # Call its constructor to get the object we want.
114        obj = cls(name)
115        return obj
116
117    @classmethod
118    def set_tool_dir(cls, pathname):
119        """Set the path to use to store and find tools"""
120        cls.tooldir = pathname
121
122    def show(self):
123        """Show a line of information about a bintool"""
124        if self.is_present():
125            version = self.version()
126        else:
127            version = '-'
128        print(FORMAT % (self.name, version, self.desc,
129                        self.get_path() or '(not found)'))
130
131    @classmethod
132    def set_missing_list(cls, missing_list):
133        cls.missing_list = missing_list or []
134
135    @staticmethod
136    def get_tool_list(include_testing=False):
137        """Get a list of the known tools
138
139        Returns:
140            list of str: names of all tools known to binman
141        """
142        files = glob.glob(os.path.join(BINMAN_DIR, 'btool/*'))
143        names = [os.path.splitext(os.path.basename(fname))[0]
144                 for fname in files]
145        names = [name for name in names if name[0] != '_']
146        names = [name[6:] if name.startswith('btool_') else name
147                 for name in names]
148        if include_testing:
149            names.append('_testing')
150        return sorted(names)
151
152    @staticmethod
153    def list_all():
154        """List all the bintools known to binman"""
155        names = Bintool.get_tool_list()
156        print(FORMAT % ('Name', 'Version', 'Description', 'Path'))
157        print(FORMAT % ('-' * 15,'-' * 11, '-' * 25, '-' * 30))
158        for name in names:
159            btool = Bintool.create(name)
160            btool.show()
161
162    def is_present(self):
163        """Check if a bintool is available on the system
164
165        Returns:
166            bool: True if available, False if not
167        """
168        if self.name in self.missing_list:
169            return False
170        return bool(self.get_path())
171
172    def get_path(self):
173        """Get the path of a bintool
174
175        Returns:
176            str: Path to the tool, if available, else None
177        """
178        return tools.tool_find(self.name)
179
180    def fetch_tool(self, method, col, skip_present):
181        """Fetch a single tool
182
183        Args:
184            method (FETCH_...): Method to use
185            col (terminal.Color): Color terminal object
186            skip_present (boo;): Skip fetching if it is already present
187
188        Returns:
189            int: Result of fetch either FETCHED, FAIL, PRESENT
190        """
191        def try_fetch(meth):
192            res = None
193            try:
194                res = self.fetch(meth)
195            except urllib.error.URLError as uerr:
196                message = uerr.reason
197                print(col.build(col.RED, f'- {message}'))
198
199            except ValueError as exc:
200                print(f'Exception: {exc}')
201            return res
202
203        if skip_present and self.is_present():
204            return PRESENT
205        print(col.build(col.YELLOW, 'Fetch: %s' % self.name))
206        if method == FETCH_ANY:
207            for try_method in range(1, FETCH_COUNT):
208                print(f'- trying method: {FETCH_NAMES[try_method]}')
209                result = try_fetch(try_method)
210                if result:
211                    break
212        else:
213            result = try_fetch(method)
214        if not result:
215            return FAIL
216        if result is not True:
217            fname, tmpdir = result
218            dest = os.path.join(self.tooldir, self.name)
219            os.makedirs(self.tooldir, exist_ok=True)
220            print(f"- writing to '{dest}'")
221            shutil.move(fname, dest)
222            if tmpdir:
223                shutil.rmtree(tmpdir)
224        return FETCHED
225
226    @staticmethod
227    def fetch_tools(method, names_to_fetch):
228        """Fetch bintools from a suitable place
229
230        This fetches or builds the requested bintools so that they can be used
231        by binman
232
233        Args:
234            names_to_fetch (list of str): names of bintools to fetch
235
236        Returns:
237            True on success, False on failure
238        """
239        def show_status(color, prompt, names):
240            print(col.build(
241                color, f'{prompt}:%s{len(names):2}: %s' %
242                (' ' * (16 - len(prompt)), ' '.join(names))))
243
244        col = terminal.Color()
245        skip_present = False
246        name_list = names_to_fetch
247        if len(names_to_fetch) == 1 and names_to_fetch[0] in ['all', 'missing']:
248            name_list = Bintool.get_tool_list()
249            if names_to_fetch[0] == 'missing':
250                skip_present = True
251            print(col.build(col.YELLOW,
252                            'Fetching tools:      %s' % ' '.join(name_list)))
253        status = collections.defaultdict(list)
254        for name in name_list:
255            btool = Bintool.create(name)
256            result = btool.fetch_tool(method, col, skip_present)
257            status[result].append(name)
258            if result == FAIL:
259                if method == FETCH_ANY:
260                    print('- failed to fetch with all methods')
261                else:
262                    print(f"- method '{FETCH_NAMES[method]}' is not supported")
263
264        if len(name_list) > 1:
265            if skip_present:
266                show_status(col.GREEN, 'Already present', status[PRESENT])
267            show_status(col.GREEN, 'Tools fetched', status[FETCHED])
268            if status[FAIL]:
269                show_status(col.RED, 'Failures', status[FAIL])
270        return not status[FAIL]
271
272    def run_cmd_result(self, *args, binary=False, raise_on_error=True):
273        """Run the bintool using command-line arguments
274
275        Args:
276            args (list of str): Arguments to provide, in addition to the bintool
277                name
278            binary (bool): True to return output as bytes instead of str
279            raise_on_error (bool): True to raise a ValueError exception if the
280                tool returns a non-zero return code
281
282        Returns:
283            CommandResult: Resulting output from the bintool, or None if the
284                tool is not present
285        """
286        if self.name in self.missing_list:
287            return None
288        name = os.path.expanduser(self.name)  # Expand paths containing ~
289        all_args = (name,) + args
290        env = tools.get_env_with_path()
291        tout.detail(f"bintool: {' '.join(all_args)}")
292        result = command.run_pipe(
293            [all_args], capture=True, capture_stderr=True, env=env,
294            raise_on_error=False, binary=binary)
295
296        if result.return_code:
297            # Return None if the tool was not found. In this case there is no
298            # output from the tool and it does not appear on the path. We still
299            # try to run it (as above) since RunPipe() allows faking the tool's
300            # output
301            if not any([result.stdout, result.stderr, tools.tool_find(name)]):
302                tout.info(f"bintool '{name}' not found")
303                return None
304            if raise_on_error:
305                tout.info(f"bintool '{name}' failed")
306                raise ValueError("Error %d running '%s': %s" %
307                                (result.return_code, ' '.join(all_args),
308                                result.stderr or result.stdout))
309        if result.stdout:
310            tout.debug(result.stdout)
311        if result.stderr:
312            tout.debug(result.stderr)
313        return result
314
315    def run_cmd(self, *args, binary=False):
316        """Run the bintool using command-line arguments
317
318        Args:
319            args (list of str): Arguments to provide, in addition to the bintool
320                name
321            binary (bool): True to return output as bytes instead of str
322
323        Returns:
324            str or bytes: Resulting stdout from the bintool
325        """
326        result = self.run_cmd_result(*args, binary=binary)
327        if result:
328            return result.stdout
329
330    @classmethod
331    def build_from_git(cls, git_repo, make_target, bintool_path, flags=None):
332        """Build a bintool from a git repo
333
334        This clones the repo in a temporary directory, builds it with 'make',
335        then returns the filename of the resulting executable bintool
336
337        Args:
338            git_repo (str): URL of git repo
339            make_target (str): Target to pass to 'make' to build the tool
340            bintool_path (str): Relative path of the tool in the repo, after
341                build is complete
342            flags (list of str): Flags or variables to pass to make, or None
343
344        Returns:
345            tuple:
346                str: Filename of fetched file to copy to a suitable directory
347                str: Name of temp directory to remove, or None
348            or None on error
349        """
350        tmpdir = tempfile.mkdtemp(prefix='binmanf.')
351        print(f"- clone git repo '{git_repo}' to '{tmpdir}'")
352        tools.run('git', 'clone', '--depth', '1', git_repo, tmpdir)
353        print(f"- build target '{make_target}'")
354        cmd = ['make', '-C', tmpdir, '-j', f'{multiprocessing.cpu_count()}',
355               make_target]
356        if flags:
357            cmd += flags
358        tools.run(*cmd)
359        fname = os.path.join(tmpdir, bintool_path)
360        if not os.path.exists(fname):
361            print(f"- File '{fname}' was not produced")
362            return None
363        return fname, tmpdir
364
365    @classmethod
366    def fetch_from_url(cls, url):
367        """Fetch a bintool from a URL
368
369        Args:
370            url (str): URL to fetch from
371
372        Returns:
373            tuple:
374                str: Filename of fetched file to copy to a suitable directory
375                str: Name of temp directory to remove, or None
376        """
377        fname, tmpdir = tools.download(url)
378        tools.run('chmod', 'a+x', fname)
379        return fname, tmpdir
380
381    @classmethod
382    def fetch_from_drive(cls, drive_id):
383        """Fetch a bintool from Google drive
384
385        Args:
386            drive_id (str): ID of file to fetch. For a URL of the form
387            'https://drive.google.com/file/d/xxx/view?usp=sharing' the value
388            passed here should be 'xxx'
389
390        Returns:
391            tuple:
392                str: Filename of fetched file to copy to a suitable directory
393                str: Name of temp directory to remove, or None
394        """
395        url = f'https://drive.google.com/uc?export=download&id={drive_id}'
396        return cls.fetch_from_url(url)
397
398    @classmethod
399    def apt_install(cls, package):
400        """Install a bintool using the 'apt' tool
401
402        This requires use of servo so may request a password
403
404        Args:
405            package (str): Name of package to install
406
407        Returns:
408            True, assuming it completes without error
409        """
410        args = ['sudo', 'apt', 'install', '-y', package]
411        print('- %s' % ' '.join(args))
412        tools.run(*args)
413        return True
414
415    @staticmethod
416    def WriteDocs(modules, test_missing=None):
417        """Write out documentation about the various bintools to stdout
418
419        Args:
420            modules: List of modules to include
421            test_missing: Used for testing. This is a module to report
422                as missing
423        """
424        print('''.. SPDX-License-Identifier: GPL-2.0+
425
426Binman bintool Documentation
427============================
428
429This file describes the bintools (binary tools) supported by binman. Bintools
430are binman's name for external executables that it runs to generate or process
431binaries. It is fairly easy to create new bintools. Just add a new file to the
432'btool' directory. You can use existing bintools as examples.
433
434
435''')
436        modules = sorted(modules)
437        missing = []
438        for name in modules:
439            module = Bintool.find_bintool_class(name)
440            docs = getattr(module, '__doc__')
441            if test_missing == name:
442                docs = None
443            if docs:
444                lines = docs.splitlines()
445                first_line = lines[0]
446                rest = [line[4:] for line in lines[1:]]
447                hdr = 'Bintool: %s: %s' % (name, first_line)
448                print(hdr)
449                print('-' * len(hdr))
450                print('\n'.join(rest))
451                print()
452                print()
453            else:
454                missing.append(name)
455
456        if missing:
457            raise ValueError('Documentation is missing for modules: %s' %
458                             ', '.join(missing))
459
460    # pylint: disable=W0613
461    def fetch(self, method):
462        """Fetch handler for a bintool
463
464        This should be implemented by the base class
465
466        Args:
467            method (FETCH_...): Method to use
468
469        Returns:
470            tuple:
471                str: Filename of fetched file to copy to a suitable directory
472                str: Name of temp directory to remove, or None
473            or True if the file was fetched and already installed
474            or None if no fetch() implementation is available
475
476        Raises:
477            Valuerror: Fetching could not be completed
478        """
479        print(f"No method to fetch bintool '{self.name}'")
480        return False
481
482    def version(self):
483        """Version handler for a bintool
484
485        Returns:
486            str: Version string for this bintool
487        """
488        if self.version_regex is None:
489            return 'unknown'
490
491        import re
492
493        result = self.run_cmd_result(self.version_args)
494        out = result.stdout.strip()
495        if not out:
496            out = result.stderr.strip()
497        if not out:
498            return 'unknown'
499
500        m_version = re.search(self.version_regex, out)
501        return m_version.group(1) if m_version else out
502
503
504class BintoolPacker(Bintool):
505    """Tool which compression / decompression entry contents
506
507    This is a bintools base class for compression / decompression packer
508
509    Properties:
510        name: Name of packer tool
511        compression: Compression type (COMPRESS_...), value of 'name' property
512            if none
513        compress_args: List of positional args provided to tool for compress,
514            ['--compress'] if none
515        decompress_args: List of positional args provided to tool for
516            decompress, ['--decompress'] if none
517        fetch_package: Name of the tool installed using the apt, value of 'name'
518            property if none
519        version_regex: Regular expressions to extract the version from tool
520            version output,  '(v[0-9.]+)' if none
521    """
522    def __init__(self, name, compression=None, compress_args=None,
523                 decompress_args=None, fetch_package=None,
524                 version_regex=r'(v[0-9.]+)', version_args='-V'):
525        desc = '%s compression' % (compression if compression else name)
526        super().__init__(name, desc, version_regex, version_args)
527        if compress_args is None:
528            compress_args = ['--compress']
529        self.compress_args = compress_args
530        if decompress_args is None:
531            decompress_args = ['--decompress']
532        self.decompress_args = decompress_args
533        if fetch_package is None:
534            fetch_package = name
535        self.fetch_package = fetch_package
536
537    def compress(self, indata):
538        """Compress data
539
540        Args:
541            indata (bytes): Data to compress
542
543        Returns:
544            bytes: Compressed data
545        """
546        with tempfile.NamedTemporaryFile(prefix='comp.tmp',
547                                         dir=tools.get_output_dir()) as tmp:
548            tools.write_file(tmp.name, indata)
549            args = self.compress_args + ['--stdout', tmp.name]
550            return self.run_cmd(*args, binary=True)
551
552    def decompress(self, indata):
553        """Decompress data
554
555        Args:
556            indata (bytes): Data to decompress
557
558        Returns:
559            bytes: Decompressed data
560        """
561        with tempfile.NamedTemporaryFile(prefix='decomp.tmp',
562                                         dir=tools.get_output_dir()) as inf:
563            tools.write_file(inf.name, indata)
564            args = self.decompress_args + ['--stdout', inf.name]
565            return self.run_cmd(*args, binary=True)
566
567    def fetch(self, method):
568        """Fetch handler
569
570        This installs the gzip package using the apt utility.
571
572        Args:
573            method (FETCH_...): Method to use
574
575        Returns:
576            True if the file was fetched and now installed, None if a method
577            other than FETCH_BIN was requested
578
579        Raises:
580            Valuerror: Fetching could not be completed
581        """
582        if method != FETCH_BIN:
583            return None
584        return self.apt_install(self.fetch_package)
585