1# SPDX-License-Identifier: GPL-2.0+
2"""
3Shell command ease-ups for Python
4
5Copyright (c) 2011 The Chromium OS Authors.
6"""
7
8import subprocess
9
10from u_boot_pylib import cros_subprocess
11
12# This permits interception of RunPipe for test purposes. If it is set to
13# a function, then that function is called with the pipe list being
14# executed. Otherwise, it is assumed to be a CommandResult object, and is
15# returned as the result for every run_pipe() call.
16# When this value is None, commands are executed as normal.
17TEST_RESULT = None
18
19
20class CommandExc(Exception):
21    """Reports an exception to the caller"""
22    def __init__(self, msg, result):
23        """Set up a new exception object
24
25        Args:
26            result (CommandResult): Execution result so far
27        """
28        super().__init__(msg)
29        self.result = result
30
31
32class CommandResult:
33    """A class which captures the result of executing a command.
34
35    Members:
36        stdout (bytes): stdout obtained from command, as a string
37        stderr (bytes): stderr obtained from command, as a string
38        combined (bytes): stdout and stderr interleaved
39        return_code (int): Return code from command
40        exception (Exception): Exception received, or None if all ok
41        output (str or None): Returns output as a single line if requested
42    """
43    def __init__(self, stdout='', stderr='', combined='', return_code=0,
44                 exception=None):
45        self.stdout = stdout
46        self.stderr = stderr
47        self.combined = combined
48        self.return_code = return_code
49        self.exception = exception
50        self.output = None
51
52    def to_output(self, binary):
53        """Converts binary output to its final form
54
55        Args:
56            binary (bool): True to report binary output, False to use strings
57        Returns:
58            self
59        """
60        if not binary:
61            self.stdout = self.stdout.decode('utf-8')
62            self.stderr = self.stderr.decode('utf-8')
63            self.combined = self.combined.decode('utf-8')
64        return self
65
66
67def run_pipe(pipe_list, infile=None, outfile=None, capture=False,
68             capture_stderr=False, oneline=False, raise_on_error=True, cwd=None,
69             binary=False, output_func=None, **kwargs):
70    """
71    Perform a command pipeline, with optional input/output filenames.
72
73    Args:
74        pipe_list (list of list): List of command lines to execute. Each command
75            line is piped into the next, and is itself a list of strings. For
76            example [ ['ls', '.git'] ['wc'] ] will pipe the output of
77            'ls .git' into 'wc'.
78        infile (str): File to provide stdin to the pipeline
79        outfile (str): File to store stdout
80        capture (bool): True to capture output
81        capture_stderr (bool): True to capture stderr
82        oneline (bool): True to strip newline chars from output
83        raise_on_error (bool): True to raise on an error, False to return it in
84            the CommandResult
85        cwd (str or None): Directory to run the command in
86        binary (bool): True to report binary output, False to use strings
87        output_func (function): Output function to call with each output
88            fragment (if it returns True the function terminates)
89        **kwargs: Additional keyword arguments to cros_subprocess.Popen()
90    Returns:
91        CommandResult object
92    Raises:
93        CommandExc if an exception happens
94    """
95    if TEST_RESULT:
96        if hasattr(TEST_RESULT, '__call__'):
97            # pylint: disable=E1102
98            result = TEST_RESULT(pipe_list=pipe_list)
99            if result:
100                return result
101        else:
102            return TEST_RESULT
103        # No result: fall through to normal processing
104    result = CommandResult(b'', b'', b'')
105    last_pipe = None
106    pipeline = list(pipe_list)
107    user_pipestr = '|'.join([' '.join(pipe) for pipe in pipe_list])
108    kwargs['stdout'] = None
109    kwargs['stderr'] = None
110    while pipeline:
111        cmd = pipeline.pop(0)
112        if last_pipe is not None:
113            kwargs['stdin'] = last_pipe.stdout
114        elif infile:
115            kwargs['stdin'] = open(infile, 'rb')
116        if pipeline or capture:
117            kwargs['stdout'] = cros_subprocess.PIPE
118        elif outfile:
119            kwargs['stdout'] = open(outfile, 'wb')
120        if capture_stderr:
121            kwargs['stderr'] = cros_subprocess.PIPE
122
123        try:
124            last_pipe = cros_subprocess.Popen(cmd, cwd=cwd, **kwargs)
125        except Exception as err:
126            result.exception = err
127            if raise_on_error:
128                raise CommandExc(f"Error running '{user_pipestr}': {err}",
129                                 result) from err
130            result.return_code = 255
131            return result.to_output(binary)
132
133    if capture:
134        result.stdout, result.stderr, result.combined = (
135                last_pipe.communicate_filter(output_func))
136        if result.stdout and oneline:
137            result.output = result.stdout.rstrip(b'\r\n')
138    result.return_code = last_pipe.wait()
139    if raise_on_error and result.return_code:
140        raise CommandExc(f"Error running '{user_pipestr}'", result)
141    return result.to_output(binary)
142
143
144def output(*cmd, **kwargs):
145    """Run a command and return its output
146
147    Args:
148        *cmd (list of str): Command to run
149        **kwargs (dict of args): Extra arguments to pass in
150
151    Returns:
152        str: command output
153    """
154    kwargs['raise_on_error'] = kwargs.get('raise_on_error', True)
155    return run_pipe([cmd], capture=True, **kwargs).stdout
156
157
158def output_one_line(*cmd, **kwargs):
159    """Run a command and output it as a single-line string
160
161    The command is expected to produce a single line of output
162
163    Args:
164        *cmd (list of str): Command to run
165        **kwargs (dict of args): Extra arguments to pass in
166
167    Returns:
168        str: output of command with all newlines removed
169    """
170    raise_on_error = kwargs.pop('raise_on_error', True)
171    result = run_pipe([cmd], capture=True, oneline=True,
172                      raise_on_error=raise_on_error, **kwargs).stdout.strip()
173    return result
174
175
176def run(*cmd, **kwargs):
177    """Run a command
178
179    Note that you must add 'capture' to kwargs to obtain non-empty output
180
181    Args:
182        *cmd (list of str): Command to run
183        **kwargs (dict of args): Extra arguments to pass in
184
185    Returns:
186        str: output of command
187    """
188    return run_pipe([cmd], **kwargs).stdout
189
190
191def run_one(*cmd, **kwargs):
192    """Run a single command
193
194    Note that you must add 'capture' to kwargs to obtain non-empty output
195
196    Args:
197        *cmd (list of str): Command to run
198        **kwargs (dict of args): Extra arguments to pass in
199
200    Returns:
201        CommandResult: output of command
202    """
203    return run_pipe([cmd], **kwargs)
204
205
206def run_list(cmd, **kwargs):
207    """Run a command and return its output
208
209    Args:
210        cmd (list of str): Command to run
211
212    Returns:
213        str: output of command
214        **kwargs (dict of args): Extra arguments to pass in
215    """
216    return run_pipe([cmd], capture=True, **kwargs).stdout
217
218
219def stop_all():
220    """Stop all subprocesses initiated with cros_subprocess"""
221    cros_subprocess.stay_alive = False
222