1# SPDX-License-Identifier: GPL-2.0
2# Copyright (c) 2015 Stephen Warren
3# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
4
5# Common logic to interact with U-Boot via the console. This class provides
6# the interface that tests use to execute U-Boot shell commands and wait for
7# their results. Sub-classes exist to perform board-type-specific setup
8# operations, such as spawning a sub-process for Sandbox, or attaching to the
9# serial console of real hardware.
10
11import multiplexed_log
12import os
13import pytest
14import re
15import sys
16import u_boot_spawn
17
18# Regexes for text we expect U-Boot to send to the console.
19pattern_u_boot_spl_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}[^\r\n]*\\))')
20pattern_u_boot_spl2_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}[^\r\n]*\\))')
21pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}[^\r\n]*\\))')
22pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ')
23pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'')
24pattern_error_notification = re.compile('## Error: ')
25pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###')
26
27PAT_ID = 0
28PAT_RE = 1
29
30bad_pattern_defs = (
31    ('spl_signon', pattern_u_boot_spl_signon),
32    ('spl2_signon', pattern_u_boot_spl2_signon),
33    ('main_signon', pattern_u_boot_main_signon),
34    ('stop_autoboot_prompt', pattern_stop_autoboot_prompt),
35    ('unknown_command', pattern_unknown_command),
36    ('error_notification', pattern_error_notification),
37    ('error_please_reset', pattern_error_please_reset),
38)
39
40class ConsoleDisableCheck(object):
41    """Context manager (for Python's with statement) that temporarily disables
42    the specified console output error check. This is useful when deliberately
43    executing a command that is known to trigger one of the error checks, in
44    order to test that the error condition is actually raised. This class is
45    used internally by ConsoleBase::disable_check(); it is not intended for
46    direct usage."""
47
48    def __init__(self, console, check_type):
49        self.console = console
50        self.check_type = check_type
51
52    def __enter__(self):
53        self.console.disable_check_count[self.check_type] += 1
54        self.console.eval_bad_patterns()
55
56    def __exit__(self, extype, value, traceback):
57        self.console.disable_check_count[self.check_type] -= 1
58        self.console.eval_bad_patterns()
59
60class ConsoleSetupTimeout(object):
61    """Context manager (for Python's with statement) that temporarily sets up
62    timeout for specific command. This is useful when execution time is greater
63    then default 30s."""
64
65    def __init__(self, console, timeout):
66        self.p = console.p
67        self.orig_timeout = self.p.timeout
68        self.p.timeout = timeout
69
70    def __enter__(self):
71        return self
72
73    def __exit__(self, extype, value, traceback):
74        self.p.timeout = self.orig_timeout
75
76class ConsoleBase(object):
77    """The interface through which test functions interact with the U-Boot
78    console. This primarily involves executing shell commands, capturing their
79    results, and checking for common error conditions. Some common utilities
80    are also provided too."""
81
82    def __init__(self, log, config, max_fifo_fill):
83        """Initialize a U-Boot console connection.
84
85        Can only usefully be called by sub-classes.
86
87        Args:
88            log: A mulptiplex_log.Logfile object, to which the U-Boot output
89                will be logged.
90            config: A configuration data structure, as built by conftest.py.
91            max_fifo_fill: The maximum number of characters to send to U-Boot
92                command-line before waiting for U-Boot to echo the characters
93                back. For UART-based HW without HW flow control, this value
94                should be set less than the UART RX FIFO size to avoid
95                overflow, assuming that U-Boot can't keep up with full-rate
96                traffic at the baud rate.
97
98        Returns:
99            Nothing.
100        """
101
102        self.log = log
103        self.config = config
104        self.max_fifo_fill = max_fifo_fill
105
106        self.logstream = self.log.get_stream('console', sys.stdout)
107
108        # Array slice removes leading/trailing quotes
109        self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1]
110        self.prompt_compiled = re.compile('^' + re.escape(self.prompt), re.MULTILINE)
111        self.p = None
112        self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs}
113        self.eval_bad_patterns()
114
115        self.at_prompt = False
116        self.at_prompt_logevt = None
117
118    def get_spawn(self):
119        # This is not called, ssubclass must define this.
120        # Return a value to avoid:
121        #   u_boot_console_base.py:348:12: E1128: Assigning result of a function
122        #   call, where the function returns None (assignment-from-none)
123        return u_boot_spawn.Spawn([])
124
125
126    def eval_bad_patterns(self):
127        self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \
128            if self.disable_check_count[pat[PAT_ID]] == 0]
129        self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \
130            if self.disable_check_count[pat[PAT_ID]] == 0]
131
132    def close(self):
133        """Terminate the connection to the U-Boot console.
134
135        This function is only useful once all interaction with U-Boot is
136        complete. Once this function is called, data cannot be sent to or
137        received from U-Boot.
138
139        Args:
140            None.
141
142        Returns:
143            Nothing.
144        """
145
146        if self.p:
147            self.p.close()
148        self.logstream.close()
149
150    def wait_for_boot_prompt(self, loop_num = 1):
151        """Wait for the boot up until command prompt. This is for internal use only.
152        """
153        try:
154            bcfg = self.config.buildconfig
155            config_spl = bcfg.get('config_spl', 'n') == 'y'
156            config_spl_serial = bcfg.get('config_spl_serial', 'n') == 'y'
157            env_spl_skipped = self.config.env.get('env__spl_skipped', False)
158            env_spl2_skipped = self.config.env.get('env__spl2_skipped', True)
159
160            while loop_num > 0:
161                loop_num -= 1
162                if config_spl and config_spl_serial and not env_spl_skipped:
163                    m = self.p.expect([pattern_u_boot_spl_signon] +
164                                      self.bad_patterns)
165                    if m != 0:
166                        raise Exception('Bad pattern found on SPL console: ' +
167                                        self.bad_pattern_ids[m - 1])
168                if not env_spl2_skipped:
169                    m = self.p.expect([pattern_u_boot_spl2_signon] +
170                                      self.bad_patterns)
171                    if m != 0:
172                        raise Exception('Bad pattern found on SPL2 console: ' +
173                                        self.bad_pattern_ids[m - 1])
174                m = self.p.expect([pattern_u_boot_main_signon] + self.bad_patterns)
175                if m != 0:
176                    raise Exception('Bad pattern found on console: ' +
177                                    self.bad_pattern_ids[m - 1])
178            self.u_boot_version_string = self.p.after
179            while True:
180                m = self.p.expect([self.prompt_compiled,
181                    pattern_stop_autoboot_prompt] + self.bad_patterns)
182                if m == 0:
183                    break
184                if m == 1:
185                    self.p.send(' ')
186                    continue
187                raise Exception('Bad pattern found on console: ' +
188                                self.bad_pattern_ids[m - 2])
189
190        except Exception as ex:
191            self.log.error(str(ex))
192            self.cleanup_spawn()
193            raise
194        finally:
195            self.log.timestamp()
196
197    def run_command(self, cmd, wait_for_echo=True, send_nl=True,
198            wait_for_prompt=True, wait_for_reboot=False):
199        """Execute a command via the U-Boot console.
200
201        The command is always sent to U-Boot.
202
203        U-Boot echoes any command back to its output, and this function
204        typically waits for that to occur. The wait can be disabled by setting
205        wait_for_echo=False, which is useful e.g. when sending CTRL-C to
206        interrupt a long-running command such as "ums".
207
208        Command execution is typically triggered by sending a newline
209        character. This can be disabled by setting send_nl=False, which is
210        also useful when sending CTRL-C.
211
212        This function typically waits for the command to finish executing, and
213        returns the console output that it generated. This can be disabled by
214        setting wait_for_prompt=False, which is useful when invoking a long-
215        running command such as "ums".
216
217        Args:
218            cmd: The command to send.
219            wait_for_echo: Boolean indicating whether to wait for U-Boot to
220                echo the command text back to its output.
221            send_nl: Boolean indicating whether to send a newline character
222                after the command string.
223            wait_for_prompt: Boolean indicating whether to wait for the
224                command prompt to be sent by U-Boot. This typically occurs
225                immediately after the command has been executed.
226            wait_for_reboot: Boolean indication whether to wait for the
227                reboot U-Boot. If this sets True, wait_for_prompt must also
228                be True.
229
230        Returns:
231            If wait_for_prompt == False:
232                Nothing.
233            Else:
234                The output from U-Boot during command execution. In other
235                words, the text U-Boot emitted between the point it echod the
236                command string and emitted the subsequent command prompts.
237        """
238
239        if self.at_prompt and \
240                self.at_prompt_logevt != self.logstream.logfile.cur_evt:
241            self.logstream.write(self.prompt, implicit=True)
242
243        try:
244            self.at_prompt = False
245            if send_nl:
246                cmd += '\n'
247            while cmd:
248                # Limit max outstanding data, so UART FIFOs don't overflow
249                chunk = cmd[:self.max_fifo_fill]
250                cmd = cmd[self.max_fifo_fill:]
251                self.p.send(chunk)
252                if not wait_for_echo:
253                    continue
254                chunk = re.escape(chunk)
255                chunk = chunk.replace('\\\n', '[\r\n]')
256                m = self.p.expect([chunk] + self.bad_patterns)
257                if m != 0:
258                    self.at_prompt = False
259                    raise Exception('Bad pattern found on console: ' +
260                                    self.bad_pattern_ids[m - 1])
261            if not wait_for_prompt:
262                return
263            if wait_for_reboot:
264                self.wait_for_boot_prompt()
265            else:
266                m = self.p.expect([self.prompt_compiled] + self.bad_patterns)
267                if m != 0:
268                    self.at_prompt = False
269                    raise Exception('Bad pattern found on console: ' +
270                                    self.bad_pattern_ids[m - 1])
271            self.at_prompt = True
272            self.at_prompt_logevt = self.logstream.logfile.cur_evt
273            # Only strip \r\n; space/TAB might be significant if testing
274            # indentation.
275            return self.p.before.strip('\r\n')
276        except Exception as ex:
277            self.log.error(str(ex))
278            self.cleanup_spawn()
279            raise
280        finally:
281            self.log.timestamp()
282
283    def run_command_list(self, cmds):
284        """Run a list of commands.
285
286        This is a helper function to call run_command() with default arguments
287        for each command in a list.
288
289        Args:
290            cmd: List of commands (each a string).
291        Returns:
292            A list of output strings from each command, one element for each
293            command.
294        """
295        output = []
296        for cmd in cmds:
297            output.append(self.run_command(cmd))
298        return output
299
300    def ctrlc(self):
301        """Send a CTRL-C character to U-Boot.
302
303        This is useful in order to stop execution of long-running synchronous
304        commands such as "ums".
305
306        Args:
307            None.
308
309        Returns:
310            Nothing.
311        """
312
313        self.log.action('Sending Ctrl-C')
314        self.run_command(chr(3), wait_for_echo=False, send_nl=False)
315
316    def wait_for(self, text):
317        """Wait for a pattern to be emitted by U-Boot.
318
319        This is useful when a long-running command such as "dfu" is executing,
320        and it periodically emits some text that should show up at a specific
321        location in the log file.
322
323        Args:
324            text: The text to wait for; either a string (containing raw text,
325                not a regular expression) or an re object.
326
327        Returns:
328            Nothing.
329        """
330
331        if type(text) == type(''):
332            text = re.escape(text)
333        m = self.p.expect([text] + self.bad_patterns)
334        if m != 0:
335            raise Exception('Bad pattern found on console: ' +
336                            self.bad_pattern_ids[m - 1])
337
338    def drain_console(self):
339        """Read from and log the U-Boot console for a short time.
340
341        U-Boot's console output is only logged when the test code actively
342        waits for U-Boot to emit specific data. There are cases where tests
343        can fail without doing this. For example, if a test asks U-Boot to
344        enable USB device mode, then polls until a host-side device node
345        exists. In such a case, it is useful to log U-Boot's console output
346        in case U-Boot printed clues as to why the host-side even did not
347        occur. This function will do that.
348
349        Args:
350            None.
351
352        Returns:
353            Nothing.
354        """
355
356        # If we are already not connected to U-Boot, there's nothing to drain.
357        # This should only happen when a previous call to run_command() or
358        # wait_for() failed (and hence the output has already been logged), or
359        # the system is shutting down.
360        if not self.p:
361            return
362
363        orig_timeout = self.p.timeout
364        try:
365            # Drain the log for a relatively short time.
366            self.p.timeout = 1000
367            # Wait for something U-Boot will likely never send. This will
368            # cause the console output to be read and logged.
369            self.p.expect(['This should never match U-Boot output'])
370        except:
371            # We expect a timeout, since U-Boot won't print what we waited
372            # for. Squash it when it happens.
373            #
374            # Squash any other exception too. This function is only used to
375            # drain (and log) the U-Boot console output after a failed test.
376            # The U-Boot process will be restarted, or target board reset, once
377            # this function returns. So, we don't care about detecting any
378            # additional errors, so they're squashed so that the rest of the
379            # post-test-failure cleanup code can continue operation, and
380            # correctly terminate any log sections, etc.
381            pass
382        finally:
383            self.p.timeout = orig_timeout
384
385    def ensure_spawned(self, expect_reset=False):
386        """Ensure a connection to a correctly running U-Boot instance.
387
388        This may require spawning a new Sandbox process or resetting target
389        hardware, as defined by the implementation sub-class.
390
391        This is an internal function and should not be called directly.
392
393        Args:
394            expect_reset: Boolean indication whether this boot is expected
395                to be reset while the 1st boot process after main boot before
396                prompt. False by default.
397
398        Returns:
399            Nothing.
400        """
401
402        if self.p:
403            # Reset the console timeout value as some tests may change
404            # its default value during the execution
405            if not self.config.gdbserver:
406                self.p.timeout = 30000
407            return
408        try:
409            self.log.start_section('Starting U-Boot')
410            self.at_prompt = False
411            self.p = self.get_spawn()
412            # Real targets can take a long time to scroll large amounts of
413            # text if LCD is enabled. This value may need tweaking in the
414            # future, possibly per-test to be optimal. This works for 'help'
415            # on board 'seaboard'.
416            if not self.config.gdbserver:
417                self.p.timeout = 30000
418            self.p.logfile_read = self.logstream
419            if expect_reset:
420                loop_num = 2
421            else:
422                loop_num = 1
423            self.wait_for_boot_prompt(loop_num = loop_num)
424            self.at_prompt = True
425            self.at_prompt_logevt = self.logstream.logfile.cur_evt
426        except Exception as ex:
427            self.log.error(str(ex))
428            self.cleanup_spawn()
429            raise
430        finally:
431            self.log.timestamp()
432            self.log.end_section('Starting U-Boot')
433
434    def cleanup_spawn(self):
435        """Shut down all interaction with the U-Boot instance.
436
437        This is used when an error is detected prior to re-establishing a
438        connection with a fresh U-Boot instance.
439
440        This is an internal function and should not be called directly.
441
442        Args:
443            None.
444
445        Returns:
446            Nothing.
447        """
448
449        try:
450            if self.p:
451                self.p.close()
452        except:
453            pass
454        self.p = None
455
456    def restart_uboot(self, expect_reset=False):
457        """Shut down and restart U-Boot."""
458        self.cleanup_spawn()
459        self.ensure_spawned(expect_reset)
460
461    def get_spawn_output(self):
462        """Return the start-up output from U-Boot
463
464        Returns:
465            The output produced by ensure_spawed(), as a string.
466        """
467        if self.p:
468            return self.p.get_expect_output()
469        return None
470
471    def validate_version_string_in_text(self, text):
472        """Assert that a command's output includes the U-Boot signon message.
473
474        This is primarily useful for validating the "version" command without
475        duplicating the signon text regex in a test function.
476
477        Args:
478            text: The command output text to check.
479
480        Returns:
481            Nothing. An exception is raised if the validation fails.
482        """
483
484        assert(self.u_boot_version_string in text)
485
486    def disable_check(self, check_type):
487        """Temporarily disable an error check of U-Boot's output.
488
489        Create a new context manager (for use with the "with" statement) which
490        temporarily disables a particular console output error check.
491
492        Args:
493            check_type: The type of error-check to disable. Valid values may
494            be found in self.disable_check_count above.
495
496        Returns:
497            A context manager object.
498        """
499
500        return ConsoleDisableCheck(self, check_type)
501
502    def temporary_timeout(self, timeout):
503        """Temporarily set up different timeout for commands.
504
505        Create a new context manager (for use with the "with" statement) which
506        temporarily change timeout.
507
508        Args:
509            timeout: Time in milliseconds.
510
511        Returns:
512            A context manager object.
513        """
514
515        return ConsoleSetupTimeout(self, timeout)
516