1# SPDX-License-Identifier: GPL-2.0+ 2# Copyright 2022 Google LLC 3# Copyright (C) 2022 Weidmüller Interface GmbH & Co. KG 4# Stefan Herbrechtsmeier <stefan.herbrechtsmeier@weidmueller.com> 5# 6"""Base class for all bintools 7 8This defines the common functionality for all bintools, including running 9the tool, checking its version and fetching it if needed. 10""" 11 12import collections 13import glob 14import importlib 15import multiprocessing 16import os 17import shutil 18import tempfile 19import urllib.error 20 21from u_boot_pylib import command 22from u_boot_pylib import terminal 23from u_boot_pylib import tools 24from u_boot_pylib import tout 25 26BINMAN_DIR = os.path.dirname(os.path.realpath(__file__)) 27 28# Format string for listing bintools, see also the header in list_all() 29FORMAT = '%-16.16s %-12.12s %-26.26s %s' 30 31# List of known modules, to avoid importing the module multiple times 32modules = {} 33 34# Possible ways of fetching a tool (FETCH_COUNT is number of ways) 35FETCH_ANY, FETCH_BIN, FETCH_BUILD, FETCH_COUNT = range(4) 36 37FETCH_NAMES = { 38 FETCH_ANY: 'any method', 39 FETCH_BIN: 'binary download', 40 FETCH_BUILD: 'build from source' 41 } 42 43# Status of tool fetching 44FETCHED, FAIL, PRESENT, STATUS_COUNT = range(4) 45 46class Bintool: 47 """Tool which operates on binaries to help produce entry contents 48 49 This is the base class for all bintools 50 """ 51 # List of bintools to regard as missing 52 missing_list = [] 53 54 # Directory to store tools. Note that this set up by set_tool_dir() which 55 # must be called before this class is used. 56 tooldir = '' 57 58 # Flag to run 'apt-get update -y' once on first use of apt_install() 59 apt_updated = False 60 61 def __init__(self, name, desc, version_regex=None, version_args='-V'): 62 self.name = name 63 self.desc = desc 64 self.version_regex = version_regex 65 self.version_args = version_args 66 67 @staticmethod 68 def find_bintool_class(btype): 69 """Look up the bintool class for bintool 70 71 Args: 72 byte: Bintool to use, e.g. 'mkimage' 73 74 Returns: 75 The bintool class object if found, else a tuple: 76 module name that could not be found 77 exception received 78 """ 79 # Convert something like 'u-boot' to 'u_boot' since we are only 80 # interested in the type. 81 module_name = btype.replace('-', '_') 82 module = modules.get(module_name) 83 class_name = f'Bintool{module_name}' 84 85 # Import the module if we have not already done so 86 if not module: 87 try: 88 module = importlib.import_module('binman.btool.' + module_name) 89 except ImportError as exc: 90 try: 91 # Deal with classes which must be renamed due to conflicts 92 # with Python libraries 93 module = importlib.import_module('binman.btool.btool_' + 94 module_name) 95 except ImportError: 96 return module_name, exc 97 modules[module_name] = module 98 99 # Look up the expected class name 100 return getattr(module, class_name) 101 102 @staticmethod 103 def create(name): 104 """Create a new bintool object 105 106 Args: 107 name (str): Bintool to create, e.g. 'mkimage' 108 109 Returns: 110 A new object of the correct type (a subclass of Binutil) 111 """ 112 cls = Bintool.find_bintool_class(name) 113 if isinstance(cls, tuple): 114 raise ValueError("Cannot import bintool module '%s': %s" % cls) 115 116 # Call its constructor to get the object we want. 117 obj = cls(name) 118 return obj 119 120 @classmethod 121 def set_tool_dir(cls, pathname): 122 """Set the path to use to store and find tools""" 123 cls.tooldir = pathname 124 125 def show(self): 126 """Show a line of information about a bintool""" 127 if self.is_present(): 128 version = self.version() 129 else: 130 version = '-' 131 print(FORMAT % (self.name, version, self.desc, 132 self.get_path() or '(not found)')) 133 134 @classmethod 135 def set_missing_list(cls, missing_list): 136 cls.missing_list = missing_list or [] 137 138 @staticmethod 139 def get_tool_list(include_testing=False): 140 """Get a list of the known tools 141 142 Returns: 143 list of str: names of all tools known to binman 144 """ 145 files = glob.glob(os.path.join(BINMAN_DIR, 'btool/*')) 146 names = [os.path.splitext(os.path.basename(fname))[0] 147 for fname in files] 148 names = [name for name in names if name[0] != '_'] 149 names = [name[6:] if name.startswith('btool_') else name 150 for name in names] 151 if include_testing: 152 names.append('_testing') 153 return sorted(names) 154 155 @staticmethod 156 def list_all(): 157 """List all the bintools known to binman""" 158 names = Bintool.get_tool_list() 159 print(FORMAT % ('Name', 'Version', 'Description', 'Path')) 160 print(FORMAT % ('-' * 15,'-' * 11, '-' * 25, '-' * 30)) 161 for name in names: 162 btool = Bintool.create(name) 163 btool.show() 164 165 def is_present(self): 166 """Check if a bintool is available on the system 167 168 Returns: 169 bool: True if available, False if not 170 """ 171 if self.name in self.missing_list: 172 return False 173 return bool(self.get_path()) 174 175 def get_path(self): 176 """Get the path of a bintool 177 178 Returns: 179 str: Path to the tool, if available, else None 180 """ 181 return tools.tool_find(self.name) 182 183 def fetch_tool(self, method, col, skip_present): 184 """Fetch a single tool 185 186 Args: 187 method (FETCH_...): Method to use 188 col (terminal.Color): Color terminal object 189 skip_present (boo;): Skip fetching if it is already present 190 191 Returns: 192 int: Result of fetch either FETCHED, FAIL, PRESENT 193 """ 194 def try_fetch(meth): 195 res = None 196 try: 197 res = self.fetch(meth) 198 except urllib.error.URLError as uerr: 199 message = uerr.reason 200 print(col.build(col.RED, f'- {message}')) 201 202 except ValueError as exc: 203 print(f'Exception: {exc}') 204 return res 205 206 if skip_present and self.is_present(): 207 return PRESENT 208 print(col.build(col.YELLOW, 'Fetch: %s' % self.name)) 209 if method == FETCH_ANY: 210 for try_method in range(1, FETCH_COUNT): 211 print(f'- trying method: {FETCH_NAMES[try_method]}') 212 result = try_fetch(try_method) 213 if result: 214 break 215 else: 216 result = try_fetch(method) 217 if not result: 218 return FAIL 219 if result is not True: 220 fname, tmpdir = result 221 dest = os.path.join(self.tooldir, self.name) 222 os.makedirs(self.tooldir, exist_ok=True) 223 print(f"- writing to '{dest}'") 224 shutil.move(fname, dest) 225 if tmpdir: 226 shutil.rmtree(tmpdir) 227 return FETCHED 228 229 @staticmethod 230 def fetch_tools(method, names_to_fetch): 231 """Fetch bintools from a suitable place 232 233 This fetches or builds the requested bintools so that they can be used 234 by binman 235 236 Args: 237 names_to_fetch (list of str): names of bintools to fetch 238 239 Returns: 240 True on success, False on failure 241 """ 242 def show_status(color, prompt, names): 243 print(col.build( 244 color, f'{prompt}:%s{len(names):2}: %s' % 245 (' ' * (16 - len(prompt)), ' '.join(names)))) 246 247 col = terminal.Color() 248 skip_present = False 249 name_list = names_to_fetch 250 if len(names_to_fetch) == 1 and names_to_fetch[0] in ['all', 'missing']: 251 name_list = Bintool.get_tool_list() 252 if names_to_fetch[0] == 'missing': 253 skip_present = True 254 print(col.build(col.YELLOW, 255 'Fetching tools: %s' % ' '.join(name_list))) 256 status = collections.defaultdict(list) 257 for name in name_list: 258 btool = Bintool.create(name) 259 result = btool.fetch_tool(method, col, skip_present) 260 status[result].append(name) 261 if result == FAIL: 262 if method == FETCH_ANY: 263 print('- failed to fetch with all methods') 264 else: 265 print(f"- method '{FETCH_NAMES[method]}' is not supported") 266 267 if len(name_list) > 1: 268 if skip_present: 269 show_status(col.GREEN, 'Already present', status[PRESENT]) 270 show_status(col.GREEN, 'Tools fetched', status[FETCHED]) 271 if status[FAIL]: 272 show_status(col.RED, 'Failures', status[FAIL]) 273 return not status[FAIL] 274 275 def run_cmd_result(self, *args, binary=False, raise_on_error=True): 276 """Run the bintool using command-line arguments 277 278 Args: 279 args (list of str): Arguments to provide, in addition to the bintool 280 name 281 binary (bool): True to return output as bytes instead of str 282 raise_on_error (bool): True to raise a ValueError exception if the 283 tool returns a non-zero return code 284 285 Returns: 286 CommandResult: Resulting output from the bintool, or None if the 287 tool is not present 288 """ 289 if self.name in self.missing_list: 290 return None 291 name = os.path.expanduser(self.name) # Expand paths containing ~ 292 all_args = (name,) + args 293 env = tools.get_env_with_path() 294 tout.debug(f"bintool: {' '.join(all_args)}") 295 result = command.run_pipe( 296 [all_args], capture=True, capture_stderr=True, env=env, 297 raise_on_error=False, binary=binary) 298 299 if result.return_code: 300 # Return None if the tool was not found. In this case there is no 301 # output from the tool and it does not appear on the path. We still 302 # try to run it (as above) since RunPipe() allows faking the tool's 303 # output 304 if not any([result.stdout, result.stderr, tools.tool_find(name)]): 305 tout.info(f"bintool '{name}' not found") 306 return None 307 if raise_on_error: 308 tout.info(f"bintool '{name}' failed") 309 raise ValueError("Error %d running '%s': %s" % 310 (result.return_code, ' '.join(all_args), 311 result.stderr or result.stdout)) 312 if result.stdout: 313 tout.debug(result.stdout) 314 if result.stderr: 315 tout.debug(result.stderr) 316 return result 317 318 def run_cmd(self, *args, binary=False): 319 """Run the bintool using command-line arguments 320 321 Args: 322 args (list of str): Arguments to provide, in addition to the bintool 323 name 324 binary (bool): True to return output as bytes instead of str 325 326 Returns: 327 str or bytes: Resulting stdout from the bintool 328 """ 329 result = self.run_cmd_result(*args, binary=binary) 330 if result: 331 return result.stdout 332 333 @classmethod 334 def build_from_git(cls, git_repo, make_targets, bintool_path, 335 flags=None, git_branch=None, make_path=None): 336 """Build a bintool from a git repo 337 338 This clones the repo in a temporary directory, builds it with 'make', 339 then returns the filename of the resulting executable bintool 340 341 Args: 342 git_repo (str): URL of git repo 343 make_targets (list of str): List of targets to pass to 'make' to build 344 the tool 345 bintool_path (str): Relative path of the tool in the repo, after 346 build is complete 347 flags (list of str): Flags or variables to pass to make, or None 348 git_branch (str): Branch of git repo, or None to use the default 349 make_path (str): Relative path inside git repo containing the 350 Makefile, or None 351 352 Returns: 353 tuple: 354 str: Filename of fetched file to copy to a suitable directory 355 str: Name of temp directory to remove, or None 356 or None on error 357 """ 358 tmpdir = tempfile.mkdtemp(prefix='binmanf.') 359 print(f"- clone git repo '{git_repo}' to '{tmpdir}'") 360 if git_branch: 361 tools.run('git', 'clone', '--depth', '1', '--branch', git_branch, 362 git_repo, tmpdir) 363 else: 364 tools.run('git', 'clone', '--depth', '1', git_repo, tmpdir) 365 for target in make_targets: 366 print(f"- build target '{target}'") 367 makedir = tmpdir 368 if make_path: 369 makedir = os.path.join(tmpdir, make_path) 370 cmd = ['make', '-C', makedir, '-j', f'{multiprocessing.cpu_count()}', 371 target] 372 if flags: 373 cmd += flags 374 tools.run(*cmd) 375 376 fname = os.path.join(tmpdir, bintool_path) 377 if not os.path.exists(fname): 378 print(f"- File '{fname}' was not produced") 379 return None 380 return fname, tmpdir 381 382 @classmethod 383 def fetch_from_url(cls, url): 384 """Fetch a bintool from a URL 385 386 Args: 387 url (str): URL to fetch from 388 389 Returns: 390 tuple: 391 str: Filename of fetched file to copy to a suitable directory 392 str: Name of temp directory to remove, or None 393 """ 394 fname, tmpdir = tools.download(url) 395 tools.run('chmod', 'a+x', fname) 396 return fname, tmpdir 397 398 @classmethod 399 def fetch_from_drive(cls, drive_id): 400 """Fetch a bintool from Google drive 401 402 Args: 403 drive_id (str): ID of file to fetch. For a URL of the form 404 'https://drive.google.com/file/d/xxx/view?usp=sharing' the value 405 passed here should be 'xxx' 406 407 Returns: 408 tuple: 409 str: Filename of fetched file to copy to a suitable directory 410 str: Name of temp directory to remove, or None 411 """ 412 url = f'https://drive.google.com/uc?export=download&id={drive_id}' 413 return cls.fetch_from_url(url) 414 415 @classmethod 416 def apt_install(cls, package): 417 """Install a bintool using the 'apt' tool 418 419 This requires use of servo so may request a password 420 421 Args: 422 package (str): Name of package to install 423 424 Returns: 425 True, assuming it completes without error 426 """ 427 if not cls.apt_updated: 428 args = ['sudo', 'apt-get', 'update', '-y'] 429 print('- %s' % ' '.join(args)) 430 tools.run(*args) 431 cls.apt_updated = True 432 args = ['sudo', 'apt-get', 'install', '-y', package] 433 print('- %s' % ' '.join(args)) 434 tools.run(*args) 435 return True 436 437 @staticmethod 438 def WriteDocs(modules, test_missing=None): 439 """Write out documentation about the various bintools to stdout 440 441 Args: 442 modules: List of modules to include 443 test_missing: Used for testing. This is a module to report 444 as missing 445 """ 446 print('''.. SPDX-License-Identifier: GPL-2.0+ 447 448Binman bintool Documentation 449============================ 450 451This file describes the bintools (binary tools) supported by binman. Bintools 452are binman's name for external executables that it runs to generate or process 453binaries. It is fairly easy to create new bintools. Just add a new file to the 454'btool' directory. You can use existing bintools as examples. 455 456 457''') 458 modules = sorted(modules) 459 missing = [] 460 for name in modules: 461 module = Bintool.find_bintool_class(name) 462 docs = getattr(module, '__doc__') 463 if test_missing == name: 464 docs = None 465 if docs: 466 lines = docs.splitlines() 467 first_line = lines[0] 468 rest = [line[4:] for line in lines[1:]] 469 hdr = 'Bintool: %s: %s' % (name, first_line) 470 print(hdr) 471 print('-' * len(hdr)) 472 print('\n'.join(rest)) 473 print() 474 print() 475 else: 476 missing.append(name) 477 478 if missing: 479 raise ValueError('Documentation is missing for modules: %s' % 480 ', '.join(missing)) 481 482 # pylint: disable=W0613 483 def fetch(self, method): 484 """Fetch handler for a bintool 485 486 This should be implemented by the base class 487 488 Args: 489 method (FETCH_...): Method to use 490 491 Returns: 492 tuple: 493 str: Filename of fetched file to copy to a suitable directory 494 str: Name of temp directory to remove, or None 495 or True if the file was fetched and already installed 496 or None if no fetch() implementation is available 497 498 Raises: 499 Valuerror: Fetching could not be completed 500 """ 501 print(f"No method to fetch bintool '{self.name}'") 502 return False 503 504 def version(self): 505 """Version handler for a bintool 506 507 Returns: 508 str: Version string for this bintool 509 """ 510 if self.version_regex is None: 511 return 'unknown' 512 513 import re 514 515 result = self.run_cmd_result(self.version_args) 516 out = result.stdout.strip() 517 if not out: 518 out = result.stderr.strip() 519 if not out: 520 return 'unknown' 521 522 m_version = re.search(self.version_regex, out) 523 return m_version.group(1) if m_version else out 524 525 526class BintoolPacker(Bintool): 527 """Tool which compression / decompression entry contents 528 529 This is a bintools base class for compression / decompression packer 530 531 Properties: 532 name: Name of packer tool 533 compression: Compression type (COMPRESS_...), value of 'name' property 534 if none 535 compress_args: List of positional args provided to tool for compress, 536 ['--compress'] if none 537 decompress_args: List of positional args provided to tool for 538 decompress, ['--decompress'] if none 539 fetch_package: Name of the tool installed using the apt, value of 'name' 540 property if none 541 version_regex: Regular expressions to extract the version from tool 542 version output, '(v[0-9.]+)' if none 543 """ 544 def __init__(self, name, compression=None, compress_args=None, 545 decompress_args=None, fetch_package=None, 546 version_regex=r'(v[0-9.]+)', version_args='-V'): 547 desc = '%s compression' % (compression if compression else name) 548 super().__init__(name, desc, version_regex, version_args) 549 if compress_args is None: 550 compress_args = ['--compress'] 551 self.compress_args = compress_args 552 if decompress_args is None: 553 decompress_args = ['--decompress'] 554 self.decompress_args = decompress_args 555 if fetch_package is None: 556 fetch_package = name 557 self.fetch_package = fetch_package 558 559 def compress(self, indata): 560 """Compress data 561 562 Args: 563 indata (bytes): Data to compress 564 565 Returns: 566 bytes: Compressed data 567 """ 568 with tempfile.NamedTemporaryFile(prefix='comp.tmp', 569 dir=tools.get_output_dir()) as tmp: 570 tools.write_file(tmp.name, indata) 571 args = self.compress_args + ['--stdout', tmp.name] 572 return self.run_cmd(*args, binary=True) 573 574 def decompress(self, indata): 575 """Decompress data 576 577 Args: 578 indata (bytes): Data to decompress 579 580 Returns: 581 bytes: Decompressed data 582 """ 583 with tempfile.NamedTemporaryFile(prefix='decomp.tmp', 584 dir=tools.get_output_dir()) as inf: 585 tools.write_file(inf.name, indata) 586 args = self.decompress_args + ['--stdout', inf.name] 587 return self.run_cmd(*args, binary=True) 588 589 def fetch(self, method): 590 """Fetch handler 591 592 This installs the gzip package using the apt utility. 593 594 Args: 595 method (FETCH_...): Method to use 596 597 Returns: 598 True if the file was fetched and now installed, None if a method 599 other than FETCH_BIN was requested 600 601 Raises: 602 Valuerror: Fetching could not be completed 603 """ 604 if method != FETCH_BIN: 605 return None 606 return self.apt_install(self.fetch_package) 607