1# SPDX-License-Identifier: GPL-2.0+ 2# Copyright 2022 Google LLC 3# Copyright (C) 2022 Weidmüller Interface GmbH & Co. KG 4# Stefan Herbrechtsmeier <stefan.herbrechtsmeier@weidmueller.com> 5# 6"""Base class for all bintools 7 8This defines the common functionality for all bintools, including running 9the tool, checking its version and fetching it if needed. 10""" 11 12import collections 13import glob 14import importlib 15import multiprocessing 16import os 17import shutil 18import tempfile 19import urllib.error 20 21from u_boot_pylib import command 22from u_boot_pylib import terminal 23from u_boot_pylib import tools 24from u_boot_pylib import tout 25 26BINMAN_DIR = os.path.dirname(os.path.realpath(__file__)) 27 28# Format string for listing bintools, see also the header in list_all() 29FORMAT = '%-16.16s %-12.12s %-26.26s %s' 30 31# List of known modules, to avoid importing the module multiple times 32modules = {} 33 34# Possible ways of fetching a tool (FETCH_COUNT is number of ways) 35FETCH_ANY, FETCH_BIN, FETCH_BUILD, FETCH_COUNT = range(4) 36 37FETCH_NAMES = { 38 FETCH_ANY: 'any method', 39 FETCH_BIN: 'binary download', 40 FETCH_BUILD: 'build from source' 41 } 42 43# Status of tool fetching 44FETCHED, FAIL, PRESENT, STATUS_COUNT = range(4) 45 46class Bintool: 47 """Tool which operates on binaries to help produce entry contents 48 49 This is the base class for all bintools 50 """ 51 # List of bintools to regard as missing 52 missing_list = [] 53 54 # Directory to store tools. Note that this set up by set_tool_dir() which 55 # must be called before this class is used. 56 tooldir = '' 57 58 def __init__(self, name, desc, version_regex=None, version_args='-V'): 59 self.name = name 60 self.desc = desc 61 self.version_regex = version_regex 62 self.version_args = version_args 63 64 @staticmethod 65 def find_bintool_class(btype): 66 """Look up the bintool class for bintool 67 68 Args: 69 byte: Bintool to use, e.g. 'mkimage' 70 71 Returns: 72 The bintool class object if found, else a tuple: 73 module name that could not be found 74 exception received 75 """ 76 # Convert something like 'u-boot' to 'u_boot' since we are only 77 # interested in the type. 78 module_name = btype.replace('-', '_') 79 module = modules.get(module_name) 80 class_name = f'Bintool{module_name}' 81 82 # Import the module if we have not already done so 83 if not module: 84 try: 85 module = importlib.import_module('binman.btool.' + module_name) 86 except ImportError as exc: 87 try: 88 # Deal with classes which must be renamed due to conflicts 89 # with Python libraries 90 module = importlib.import_module('binman.btool.btool_' + 91 module_name) 92 except ImportError: 93 return module_name, exc 94 modules[module_name] = module 95 96 # Look up the expected class name 97 return getattr(module, class_name) 98 99 @staticmethod 100 def create(name): 101 """Create a new bintool object 102 103 Args: 104 name (str): Bintool to create, e.g. 'mkimage' 105 106 Returns: 107 A new object of the correct type (a subclass of Binutil) 108 """ 109 cls = Bintool.find_bintool_class(name) 110 if isinstance(cls, tuple): 111 raise ValueError("Cannot import bintool module '%s': %s" % cls) 112 113 # Call its constructor to get the object we want. 114 obj = cls(name) 115 return obj 116 117 @classmethod 118 def set_tool_dir(cls, pathname): 119 """Set the path to use to store and find tools""" 120 cls.tooldir = pathname 121 122 def show(self): 123 """Show a line of information about a bintool""" 124 if self.is_present(): 125 version = self.version() 126 else: 127 version = '-' 128 print(FORMAT % (self.name, version, self.desc, 129 self.get_path() or '(not found)')) 130 131 @classmethod 132 def set_missing_list(cls, missing_list): 133 cls.missing_list = missing_list or [] 134 135 @staticmethod 136 def get_tool_list(include_testing=False): 137 """Get a list of the known tools 138 139 Returns: 140 list of str: names of all tools known to binman 141 """ 142 files = glob.glob(os.path.join(BINMAN_DIR, 'btool/*')) 143 names = [os.path.splitext(os.path.basename(fname))[0] 144 for fname in files] 145 names = [name for name in names if name[0] != '_'] 146 names = [name[6:] if name.startswith('btool_') else name 147 for name in names] 148 if include_testing: 149 names.append('_testing') 150 return sorted(names) 151 152 @staticmethod 153 def list_all(): 154 """List all the bintools known to binman""" 155 names = Bintool.get_tool_list() 156 print(FORMAT % ('Name', 'Version', 'Description', 'Path')) 157 print(FORMAT % ('-' * 15,'-' * 11, '-' * 25, '-' * 30)) 158 for name in names: 159 btool = Bintool.create(name) 160 btool.show() 161 162 def is_present(self): 163 """Check if a bintool is available on the system 164 165 Returns: 166 bool: True if available, False if not 167 """ 168 if self.name in self.missing_list: 169 return False 170 return bool(self.get_path()) 171 172 def get_path(self): 173 """Get the path of a bintool 174 175 Returns: 176 str: Path to the tool, if available, else None 177 """ 178 return tools.tool_find(self.name) 179 180 def fetch_tool(self, method, col, skip_present): 181 """Fetch a single tool 182 183 Args: 184 method (FETCH_...): Method to use 185 col (terminal.Color): Color terminal object 186 skip_present (boo;): Skip fetching if it is already present 187 188 Returns: 189 int: Result of fetch either FETCHED, FAIL, PRESENT 190 """ 191 def try_fetch(meth): 192 res = None 193 try: 194 res = self.fetch(meth) 195 except urllib.error.URLError as uerr: 196 message = uerr.reason 197 print(col.build(col.RED, f'- {message}')) 198 199 except ValueError as exc: 200 print(f'Exception: {exc}') 201 return res 202 203 if skip_present and self.is_present(): 204 return PRESENT 205 print(col.build(col.YELLOW, 'Fetch: %s' % self.name)) 206 if method == FETCH_ANY: 207 for try_method in range(1, FETCH_COUNT): 208 print(f'- trying method: {FETCH_NAMES[try_method]}') 209 result = try_fetch(try_method) 210 if result: 211 break 212 else: 213 result = try_fetch(method) 214 if not result: 215 return FAIL 216 if result is not True: 217 fname, tmpdir = result 218 dest = os.path.join(self.tooldir, self.name) 219 os.makedirs(self.tooldir, exist_ok=True) 220 print(f"- writing to '{dest}'") 221 shutil.move(fname, dest) 222 if tmpdir: 223 shutil.rmtree(tmpdir) 224 return FETCHED 225 226 @staticmethod 227 def fetch_tools(method, names_to_fetch): 228 """Fetch bintools from a suitable place 229 230 This fetches or builds the requested bintools so that they can be used 231 by binman 232 233 Args: 234 names_to_fetch (list of str): names of bintools to fetch 235 236 Returns: 237 True on success, False on failure 238 """ 239 def show_status(color, prompt, names): 240 print(col.build( 241 color, f'{prompt}:%s{len(names):2}: %s' % 242 (' ' * (16 - len(prompt)), ' '.join(names)))) 243 244 col = terminal.Color() 245 skip_present = False 246 name_list = names_to_fetch 247 if len(names_to_fetch) == 1 and names_to_fetch[0] in ['all', 'missing']: 248 name_list = Bintool.get_tool_list() 249 if names_to_fetch[0] == 'missing': 250 skip_present = True 251 print(col.build(col.YELLOW, 252 'Fetching tools: %s' % ' '.join(name_list))) 253 status = collections.defaultdict(list) 254 for name in name_list: 255 btool = Bintool.create(name) 256 result = btool.fetch_tool(method, col, skip_present) 257 status[result].append(name) 258 if result == FAIL: 259 if method == FETCH_ANY: 260 print('- failed to fetch with all methods') 261 else: 262 print(f"- method '{FETCH_NAMES[method]}' is not supported") 263 264 if len(name_list) > 1: 265 if skip_present: 266 show_status(col.GREEN, 'Already present', status[PRESENT]) 267 show_status(col.GREEN, 'Tools fetched', status[FETCHED]) 268 if status[FAIL]: 269 show_status(col.RED, 'Failures', status[FAIL]) 270 return not status[FAIL] 271 272 def run_cmd_result(self, *args, binary=False, raise_on_error=True): 273 """Run the bintool using command-line arguments 274 275 Args: 276 args (list of str): Arguments to provide, in addition to the bintool 277 name 278 binary (bool): True to return output as bytes instead of str 279 raise_on_error (bool): True to raise a ValueError exception if the 280 tool returns a non-zero return code 281 282 Returns: 283 CommandResult: Resulting output from the bintool, or None if the 284 tool is not present 285 """ 286 if self.name in self.missing_list: 287 return None 288 name = os.path.expanduser(self.name) # Expand paths containing ~ 289 all_args = (name,) + args 290 env = tools.get_env_with_path() 291 tout.detail(f"bintool: {' '.join(all_args)}") 292 result = command.run_pipe( 293 [all_args], capture=True, capture_stderr=True, env=env, 294 raise_on_error=False, binary=binary) 295 296 if result.return_code: 297 # Return None if the tool was not found. In this case there is no 298 # output from the tool and it does not appear on the path. We still 299 # try to run it (as above) since RunPipe() allows faking the tool's 300 # output 301 if not any([result.stdout, result.stderr, tools.tool_find(name)]): 302 tout.info(f"bintool '{name}' not found") 303 return None 304 if raise_on_error: 305 tout.info(f"bintool '{name}' failed") 306 raise ValueError("Error %d running '%s': %s" % 307 (result.return_code, ' '.join(all_args), 308 result.stderr or result.stdout)) 309 if result.stdout: 310 tout.debug(result.stdout) 311 if result.stderr: 312 tout.debug(result.stderr) 313 return result 314 315 def run_cmd(self, *args, binary=False): 316 """Run the bintool using command-line arguments 317 318 Args: 319 args (list of str): Arguments to provide, in addition to the bintool 320 name 321 binary (bool): True to return output as bytes instead of str 322 323 Returns: 324 str or bytes: Resulting stdout from the bintool 325 """ 326 result = self.run_cmd_result(*args, binary=binary) 327 if result: 328 return result.stdout 329 330 @classmethod 331 def build_from_git(cls, git_repo, make_target, bintool_path, flags=None): 332 """Build a bintool from a git repo 333 334 This clones the repo in a temporary directory, builds it with 'make', 335 then returns the filename of the resulting executable bintool 336 337 Args: 338 git_repo (str): URL of git repo 339 make_target (str): Target to pass to 'make' to build the tool 340 bintool_path (str): Relative path of the tool in the repo, after 341 build is complete 342 flags (list of str): Flags or variables to pass to make, or None 343 344 Returns: 345 tuple: 346 str: Filename of fetched file to copy to a suitable directory 347 str: Name of temp directory to remove, or None 348 or None on error 349 """ 350 tmpdir = tempfile.mkdtemp(prefix='binmanf.') 351 print(f"- clone git repo '{git_repo}' to '{tmpdir}'") 352 tools.run('git', 'clone', '--depth', '1', git_repo, tmpdir) 353 print(f"- build target '{make_target}'") 354 cmd = ['make', '-C', tmpdir, '-j', f'{multiprocessing.cpu_count()}', 355 make_target] 356 if flags: 357 cmd += flags 358 tools.run(*cmd) 359 fname = os.path.join(tmpdir, bintool_path) 360 if not os.path.exists(fname): 361 print(f"- File '{fname}' was not produced") 362 return None 363 return fname, tmpdir 364 365 @classmethod 366 def fetch_from_url(cls, url): 367 """Fetch a bintool from a URL 368 369 Args: 370 url (str): URL to fetch from 371 372 Returns: 373 tuple: 374 str: Filename of fetched file to copy to a suitable directory 375 str: Name of temp directory to remove, or None 376 """ 377 fname, tmpdir = tools.download(url) 378 tools.run('chmod', 'a+x', fname) 379 return fname, tmpdir 380 381 @classmethod 382 def fetch_from_drive(cls, drive_id): 383 """Fetch a bintool from Google drive 384 385 Args: 386 drive_id (str): ID of file to fetch. For a URL of the form 387 'https://drive.google.com/file/d/xxx/view?usp=sharing' the value 388 passed here should be 'xxx' 389 390 Returns: 391 tuple: 392 str: Filename of fetched file to copy to a suitable directory 393 str: Name of temp directory to remove, or None 394 """ 395 url = f'https://drive.google.com/uc?export=download&id={drive_id}' 396 return cls.fetch_from_url(url) 397 398 @classmethod 399 def apt_install(cls, package): 400 """Install a bintool using the 'apt' tool 401 402 This requires use of servo so may request a password 403 404 Args: 405 package (str): Name of package to install 406 407 Returns: 408 True, assuming it completes without error 409 """ 410 args = ['sudo', 'apt', 'install', '-y', package] 411 print('- %s' % ' '.join(args)) 412 tools.run(*args) 413 return True 414 415 @staticmethod 416 def WriteDocs(modules, test_missing=None): 417 """Write out documentation about the various bintools to stdout 418 419 Args: 420 modules: List of modules to include 421 test_missing: Used for testing. This is a module to report 422 as missing 423 """ 424 print('''.. SPDX-License-Identifier: GPL-2.0+ 425 426Binman bintool Documentation 427============================ 428 429This file describes the bintools (binary tools) supported by binman. Bintools 430are binman's name for external executables that it runs to generate or process 431binaries. It is fairly easy to create new bintools. Just add a new file to the 432'btool' directory. You can use existing bintools as examples. 433 434 435''') 436 modules = sorted(modules) 437 missing = [] 438 for name in modules: 439 module = Bintool.find_bintool_class(name) 440 docs = getattr(module, '__doc__') 441 if test_missing == name: 442 docs = None 443 if docs: 444 lines = docs.splitlines() 445 first_line = lines[0] 446 rest = [line[4:] for line in lines[1:]] 447 hdr = 'Bintool: %s: %s' % (name, first_line) 448 print(hdr) 449 print('-' * len(hdr)) 450 print('\n'.join(rest)) 451 print() 452 print() 453 else: 454 missing.append(name) 455 456 if missing: 457 raise ValueError('Documentation is missing for modules: %s' % 458 ', '.join(missing)) 459 460 # pylint: disable=W0613 461 def fetch(self, method): 462 """Fetch handler for a bintool 463 464 This should be implemented by the base class 465 466 Args: 467 method (FETCH_...): Method to use 468 469 Returns: 470 tuple: 471 str: Filename of fetched file to copy to a suitable directory 472 str: Name of temp directory to remove, or None 473 or True if the file was fetched and already installed 474 or None if no fetch() implementation is available 475 476 Raises: 477 Valuerror: Fetching could not be completed 478 """ 479 print(f"No method to fetch bintool '{self.name}'") 480 return False 481 482 def version(self): 483 """Version handler for a bintool 484 485 Returns: 486 str: Version string for this bintool 487 """ 488 if self.version_regex is None: 489 return 'unknown' 490 491 import re 492 493 result = self.run_cmd_result(self.version_args) 494 out = result.stdout.strip() 495 if not out: 496 out = result.stderr.strip() 497 if not out: 498 return 'unknown' 499 500 m_version = re.search(self.version_regex, out) 501 return m_version.group(1) if m_version else out 502 503 504class BintoolPacker(Bintool): 505 """Tool which compression / decompression entry contents 506 507 This is a bintools base class for compression / decompression packer 508 509 Properties: 510 name: Name of packer tool 511 compression: Compression type (COMPRESS_...), value of 'name' property 512 if none 513 compress_args: List of positional args provided to tool for compress, 514 ['--compress'] if none 515 decompress_args: List of positional args provided to tool for 516 decompress, ['--decompress'] if none 517 fetch_package: Name of the tool installed using the apt, value of 'name' 518 property if none 519 version_regex: Regular expressions to extract the version from tool 520 version output, '(v[0-9.]+)' if none 521 """ 522 def __init__(self, name, compression=None, compress_args=None, 523 decompress_args=None, fetch_package=None, 524 version_regex=r'(v[0-9.]+)', version_args='-V'): 525 desc = '%s compression' % (compression if compression else name) 526 super().__init__(name, desc, version_regex, version_args) 527 if compress_args is None: 528 compress_args = ['--compress'] 529 self.compress_args = compress_args 530 if decompress_args is None: 531 decompress_args = ['--decompress'] 532 self.decompress_args = decompress_args 533 if fetch_package is None: 534 fetch_package = name 535 self.fetch_package = fetch_package 536 537 def compress(self, indata): 538 """Compress data 539 540 Args: 541 indata (bytes): Data to compress 542 543 Returns: 544 bytes: Compressed data 545 """ 546 with tempfile.NamedTemporaryFile(prefix='comp.tmp', 547 dir=tools.get_output_dir()) as tmp: 548 tools.write_file(tmp.name, indata) 549 args = self.compress_args + ['--stdout', tmp.name] 550 return self.run_cmd(*args, binary=True) 551 552 def decompress(self, indata): 553 """Decompress data 554 555 Args: 556 indata (bytes): Data to decompress 557 558 Returns: 559 bytes: Decompressed data 560 """ 561 with tempfile.NamedTemporaryFile(prefix='decomp.tmp', 562 dir=tools.get_output_dir()) as inf: 563 tools.write_file(inf.name, indata) 564 args = self.decompress_args + ['--stdout', inf.name] 565 return self.run_cmd(*args, binary=True) 566 567 def fetch(self, method): 568 """Fetch handler 569 570 This installs the gzip package using the apt utility. 571 572 Args: 573 method (FETCH_...): Method to use 574 575 Returns: 576 True if the file was fetched and now installed, None if a method 577 other than FETCH_BIN was requested 578 579 Raises: 580 Valuerror: Fetching could not be completed 581 """ 582 if method != FETCH_BIN: 583 return None 584 return self.apt_install(self.fetch_package) 585