1# SPDX-License-Identifier: GPL-2.0
2# Copyright (c) 2015 Stephen Warren
3# Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
4
5# Common logic to interact with U-Boot via the console. This class provides
6# the interface that tests use to execute U-Boot shell commands and wait for
7# their results. Sub-classes exist to perform board-type-specific setup
8# operations, such as spawning a sub-process for Sandbox, or attaching to the
9# serial console of real hardware.
10
11import multiplexed_log
12import os
13import pytest
14import re
15import sys
16import spawn
17from spawn import BootFail, Timeout, Unexpected, handle_exception
18
19# Regexes for text we expect U-Boot to send to the console.
20pattern_u_boot_spl_signon = re.compile('(U-Boot SPL \\d{4}\\.\\d{2}[^\r\n]*\\))')
21pattern_u_boot_main_signon = re.compile('(U-Boot \\d{4}\\.\\d{2}[^\r\n]*\\))')
22pattern_stop_autoboot_prompt = re.compile('Hit any key to stop autoboot: ')
23pattern_unknown_command = re.compile('Unknown command \'.*\' - try \'help\'')
24pattern_error_notification = re.compile('## Error: ')
25pattern_error_please_reset = re.compile('### ERROR ### Please RESET the board ###')
26pattern_ready_prompt = re.compile('{lab ready in (.*)s: (.*)}')
27pattern_lab_mode = re.compile('{lab mode.*}')
28
29PAT_ID = 0
30PAT_RE = 1
31
32# Timeout before expecting the console to be ready (in milliseconds)
33TIMEOUT_MS = 30000                  # Standard timeout
34TIMEOUT_CMD_MS = 10000              # Command-echo timeout
35
36# Timeout for board preparation in lab mode. This needs to be enough to build
37# U-Boot, write it to the board and then boot the board. Since this process is
38# under the control of another program (e.g. Labgrid), it will failure sooner
39# if something goes way. So use a very long timeout here to cover all possible
40# situations.
41TIMEOUT_PREPARE_MS = 3 * 60 * 1000
42
43bad_pattern_defs = (
44    ('spl_signon', pattern_u_boot_spl_signon),
45    ('main_signon', pattern_u_boot_main_signon),
46    ('stop_autoboot_prompt', pattern_stop_autoboot_prompt),
47    ('unknown_command', pattern_unknown_command),
48    ('error_notification', pattern_error_notification),
49    ('error_please_reset', pattern_error_please_reset),
50)
51
52class ConsoleDisableCheck(object):
53    """Context manager (for Python's with statement) that temporarily disables
54    the specified console output error check. This is useful when deliberately
55    executing a command that is known to trigger one of the error checks, in
56    order to test that the error condition is actually raised. This class is
57    used internally by ConsoleBase::disable_check(); it is not intended for
58    direct usage."""
59
60    def __init__(self, console, check_type):
61        self.console = console
62        self.check_type = check_type
63
64    def __enter__(self):
65        self.console.disable_check_count[self.check_type] += 1
66        self.console.eval_bad_patterns()
67
68    def __exit__(self, extype, value, traceback):
69        self.console.disable_check_count[self.check_type] -= 1
70        self.console.eval_bad_patterns()
71
72class ConsoleEnableCheck(object):
73    """Context manager (for Python's with statement) that temporarily enables
74    the specified console output error check. This is useful when executing a
75    command that might raise an extra bad pattern, beyond the default bad
76    patterns, in order to validate that the extra bad pattern is actually
77    detected. This class is used internally by ConsoleBase::enable_check(); it
78    is not intended for direct usage."""
79
80    def __init__(self, console, check_type, check_pattern):
81        self.console = console
82        self.check_type = check_type
83        self.check_pattern = check_pattern
84
85    def __enter__(self):
86        global bad_pattern_defs
87        self.default_bad_patterns = bad_pattern_defs
88        bad_pattern_defs += ((self.check_type, self.check_pattern),)
89        self.console.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs}
90        self.console.eval_bad_patterns()
91
92    def __exit__(self, extype, value, traceback):
93        global bad_pattern_defs
94        bad_pattern_defs = self.default_bad_patterns
95        self.console.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs}
96        self.console.eval_bad_patterns()
97
98class ConsoleSetupTimeout(object):
99    """Context manager (for Python's with statement) that temporarily sets up
100    timeout for specific command. This is useful when execution time is greater
101    then default 30s."""
102
103    def __init__(self, console, timeout):
104        self.p = console.p
105        self.orig_timeout = self.p.timeout
106        self.p.timeout = timeout
107
108    def __enter__(self):
109        return self
110
111    def __exit__(self, extype, value, traceback):
112        self.p.timeout = self.orig_timeout
113
114class ConsoleBase(object):
115    """The interface through which test functions interact with the U-Boot
116    console. This primarily involves executing shell commands, capturing their
117    results, and checking for common error conditions. Some common utilities
118    are also provided too."""
119
120    def __init__(self, log, config, max_fifo_fill):
121        """Initialize a U-Boot console connection.
122
123        Can only usefully be called by sub-classes.
124
125        Args:
126            log: A multiplexed_log.Logfile object, to which the U-Boot output
127                will be logged.
128            config: A configuration data structure, as built by conftest.py.
129            max_fifo_fill: The maximum number of characters to send to U-Boot
130                command-line before waiting for U-Boot to echo the characters
131                back. For UART-based HW without HW flow control, this value
132                should be set less than the UART RX FIFO size to avoid
133                overflow, assuming that U-Boot can't keep up with full-rate
134                traffic at the baud rate.
135
136        Returns:
137            Nothing.
138        """
139
140        self.log = log
141        self.config = config
142        self.max_fifo_fill = max_fifo_fill
143
144        self.logstream = self.log.get_stream('console', sys.stdout)
145
146        # Array slice removes leading/trailing quotes
147        self.prompt = self.config.buildconfig['config_sys_prompt'][1:-1]
148        self.prompt_compiled = re.compile('^' + re.escape(self.prompt), re.MULTILINE)
149        self.p = None
150        self.disable_check_count = {pat[PAT_ID]: 0 for pat in bad_pattern_defs}
151        self.eval_bad_patterns()
152
153        self.at_prompt = False
154        self.at_prompt_logevt = None
155        self.lab_mode = False
156
157    def get_spawn(self):
158        # This is not called, ssubclass must define this.
159        # Return a value to avoid:
160        #   console_base.py:348:12: E1128: Assigning result of a function
161        #   call, where the function returns None (assignment-from-none)
162        return spawn.Spawn([])
163
164
165    def eval_bad_patterns(self):
166        self.bad_patterns = [pat[PAT_RE] for pat in bad_pattern_defs \
167            if self.disable_check_count[pat[PAT_ID]] == 0]
168        self.bad_pattern_ids = [pat[PAT_ID] for pat in bad_pattern_defs \
169            if self.disable_check_count[pat[PAT_ID]] == 0]
170
171    def close(self):
172        """Terminate the connection to the U-Boot console.
173
174        This function is only useful once all interaction with U-Boot is
175        complete. Once this function is called, data cannot be sent to or
176        received from U-Boot.
177
178        Args:
179            None.
180
181        Returns:
182            Nothing.
183        """
184
185        if self.p:
186            self.log.start_section('Stopping U-Boot')
187            close_type = self.p.close()
188            self.log.info(f'Close type: {close_type}')
189            self.log.end_section('Stopping U-Boot')
190        self.logstream.close()
191
192    def set_lab_mode(self):
193        """Select lab mode
194
195        This tells us that we will get a 'lab ready' message when the board is
196        ready for use. We don't need to look for signon messages.
197        """
198        self.log.info(f'test.py: Lab mode is active')
199        self.p.timeout = TIMEOUT_PREPARE_MS
200        self.lab_mode = True
201
202    def wait_for_boot_prompt(self, loop_num = 1):
203        """Wait for the boot up until command prompt. This is for internal use only.
204        """
205        try:
206            self.log.info('Waiting for U-Boot to be ready')
207            bcfg = self.config.buildconfig
208            config_spl_serial = bcfg.get('config_spl_serial', 'n') == 'y'
209            env_spl_skipped = self.config.env.get('env__spl_skipped', False)
210            env_spl_banner_times = self.config.env.get('env__spl_banner_times', 1)
211
212            while not self.lab_mode and loop_num > 0:
213                loop_num -= 1
214                while config_spl_serial and not env_spl_skipped and env_spl_banner_times > 0:
215                    m = self.p.expect([pattern_u_boot_spl_signon,
216                                       pattern_lab_mode] + self.bad_patterns)
217                    if m == 1:
218                        self.set_lab_mode()
219                        break
220                    elif m != 0:
221                        raise BootFail('Bad pattern found on SPL console: ' +
222                                       self.bad_pattern_ids[m - 1])
223                    env_spl_banner_times -= 1
224
225                if not self.lab_mode:
226                    m = self.p.expect([pattern_u_boot_main_signon,
227                                       pattern_lab_mode] + self.bad_patterns)
228                    if m == 1:
229                        self.set_lab_mode()
230                    elif m != 0:
231                        raise BootFail('Bad pattern found on console: ' +
232                                       self.bad_pattern_ids[m - 1])
233            if not self.lab_mode:
234                self.u_boot_version_string = self.p.after
235            while True:
236                m = self.p.expect([self.prompt_compiled, pattern_ready_prompt,
237                    pattern_stop_autoboot_prompt] + self.bad_patterns)
238                if m == 0:
239                    self.log.info(f'Found ready prompt {m}')
240                    break
241                elif m == 1:
242                    m = pattern_ready_prompt.search(self.p.after)
243                    self.u_boot_version_string = m.group(2)
244                    self.log.info(f'Lab: Board is ready')
245                    self.p.timeout = TIMEOUT_MS
246                    break
247                if m == 2:
248                    self.log.info(f'Found autoboot prompt {m}')
249                    self.p.send(' ')
250                    continue
251                if not self.lab_mode:
252                    raise BootFail('Missing prompt / ready message on console: ' +
253                                   self.bad_pattern_ids[m - 3])
254            self.log.info(f'U-Boot is ready')
255
256        finally:
257            self.log.timestamp()
258
259    def run_command(self, cmd, wait_for_echo=True, send_nl=True,
260            wait_for_prompt=True, wait_for_reboot=False):
261        """Execute a command via the U-Boot console.
262
263        The command is always sent to U-Boot.
264
265        U-Boot echoes any command back to its output, and this function
266        typically waits for that to occur. The wait can be disabled by setting
267        wait_for_echo=False, which is useful e.g. when sending CTRL-C to
268        interrupt a long-running command such as "ums".
269
270        Command execution is typically triggered by sending a newline
271        character. This can be disabled by setting send_nl=False, which is
272        also useful when sending CTRL-C.
273
274        This function typically waits for the command to finish executing, and
275        returns the console output that it generated. This can be disabled by
276        setting wait_for_prompt=False, which is useful when invoking a long-
277        running command such as "ums".
278
279        Args:
280            cmd: The command to send.
281            wait_for_echo: Boolean indicating whether to wait for U-Boot to
282                echo the command text back to its output.
283            send_nl: Boolean indicating whether to send a newline character
284                after the command string.
285            wait_for_prompt: Boolean indicating whether to wait for the
286                command prompt to be sent by U-Boot. This typically occurs
287                immediately after the command has been executed.
288            wait_for_reboot: Boolean indication whether to wait for the
289                reboot U-Boot. If this sets True, wait_for_prompt must also
290                be True.
291
292        Returns:
293            If wait_for_prompt == False:
294                Nothing.
295            Else:
296                The output from U-Boot during command execution. In other
297                words, the text U-Boot emitted between the point it echod the
298                command string and emitted the subsequent command prompts.
299        """
300
301        if self.at_prompt and \
302                self.at_prompt_logevt != self.logstream.logfile.cur_evt:
303            self.logstream.write(self.prompt, implicit=True)
304
305        try:
306            self.at_prompt = False
307            if not self.p:
308                raise BootFail(
309                    f"Lab failure: Connection lost when sending command '{cmd}'")
310
311            if send_nl:
312                cmd += '\n'
313            rem = cmd  # Remaining to be sent
314            with self.temporary_timeout(TIMEOUT_CMD_MS):
315                while rem:
316                    # Limit max outstanding data, so UART FIFOs don't overflow
317                    chunk = rem[:self.max_fifo_fill]
318                    rem = rem[self.max_fifo_fill:]
319                    self.p.send(chunk)
320                    if not wait_for_echo:
321                        continue
322                    chunk = re.escape(chunk)
323                    chunk = chunk.replace('\\\n', '[\r\n]')
324                    m = self.p.expect([chunk] + self.bad_patterns)
325                    if m != 0:
326                        self.at_prompt = False
327                        raise BootFail(f"Failed to get echo on console (cmd '{cmd}':rem '{rem}'): " +
328                                        self.bad_pattern_ids[m - 1])
329            if not wait_for_prompt:
330                return
331            if wait_for_reboot:
332                self.wait_for_boot_prompt()
333            else:
334                m = self.p.expect([self.prompt_compiled] + self.bad_patterns)
335                if m != 0:
336                    self.at_prompt = False
337                    raise BootFail('Missing prompt on console: ' +
338                                    self.bad_pattern_ids[m - 1])
339            self.at_prompt = True
340            self.at_prompt_logevt = self.logstream.logfile.cur_evt
341            # Only strip \r\n; space/TAB might be significant if testing
342            # indentation.
343            return self.p.before.strip('\r\n')
344        except Timeout as exc:
345            handle_exception(self.config, self, self.log, exc,
346                             f"Lab failure: Timeout executing '{cmd}'", True)
347            raise
348        except BootFail as exc:
349            handle_exception(self.config, self, self.log, exc,
350                             f"'Boot fail '{cmd}'",
351                             True, self.get_spawn_output())
352            raise
353        finally:
354            self.log.timestamp()
355
356    def run_command_list(self, cmds):
357        """Run a list of commands.
358
359        This is a helper function to call run_command() with default arguments
360        for each command in a list.
361
362        Args:
363            cmd: List of commands (each a string).
364        Returns:
365            A list of output strings from each command, one element for each
366            command.
367        """
368        output = []
369        for cmd in cmds:
370            output.append(self.run_command(cmd))
371        return output
372
373    def send(self, msg):
374        """Send characters without waiting for echo, etc."""
375        self.run_command(msg, wait_for_prompt=False, wait_for_echo=False,
376                         send_nl=False)
377
378    def ctrl(self, char):
379        """Send a CTRL- character to U-Boot.
380
381        This is useful in order to stop execution of long-running synchronous
382        commands such as "ums".
383
384        Args:
385            char (str): Character to send, e.g. 'C' to send Ctrl-C
386        """
387        self.log.action(f'Sending Ctrl-{char}')
388        self.send(chr(ord(char) - ord('@')))
389
390    def ctrlc(self):
391        """Send a CTRL-C character to U-Boot.
392
393        This is useful in order to stop execution of long-running synchronous
394        commands such as "ums".
395        """
396        self.ctrl('C')
397
398    def wait_for(self, text):
399        """Wait for a pattern to be emitted by U-Boot.
400
401        This is useful when a long-running command such as "dfu" is executing,
402        and it periodically emits some text that should show up at a specific
403        location in the log file.
404
405        Args:
406            text: The text to wait for; either a string (containing raw text,
407                not a regular expression) or an re object.
408
409        Returns:
410            Nothing.
411        """
412
413        if type(text) == type(''):
414            text = re.escape(text)
415        m = self.p.expect([text] + self.bad_patterns)
416        if m != 0:
417            raise Unexpected(
418                "Unexpected pattern found on console (exp '{text}': " +
419                self.bad_pattern_ids[m - 1])
420
421    def drain_console(self):
422        """Read from and log the U-Boot console for a short time.
423
424        U-Boot's console output is only logged when the test code actively
425        waits for U-Boot to emit specific data. There are cases where tests
426        can fail without doing this. For example, if a test asks U-Boot to
427        enable USB device mode, then polls until a host-side device node
428        exists. In such a case, it is useful to log U-Boot's console output
429        in case U-Boot printed clues as to why the host-side even did not
430        occur. This function will do that.
431
432        Args:
433            None.
434
435        Returns:
436            Nothing.
437        """
438
439        # If we are already not connected to U-Boot, there's nothing to drain.
440        # This should only happen when a previous call to run_command() or
441        # wait_for() failed (and hence the output has already been logged), or
442        # the system is shutting down.
443        if not self.p:
444            return
445
446        orig_timeout = self.p.timeout
447        try:
448            # Drain the log for a relatively short time.
449            self.p.timeout = 1000
450            # Wait for something U-Boot will likely never send. This will
451            # cause the console output to be read and logged.
452            self.p.expect(['This should never match U-Boot output'])
453        except:
454            # We expect a timeout, since U-Boot won't print what we waited
455            # for. Squash it when it happens.
456            #
457            # Squash any other exception too. This function is only used to
458            # drain (and log) the U-Boot console output after a failed test.
459            # The U-Boot process will be restarted, or target board reset, once
460            # this function returns. So, we don't care about detecting any
461            # additional errors, so they're squashed so that the rest of the
462            # post-test-failure cleanup code can continue operation, and
463            # correctly terminate any log sections, etc.
464            pass
465        finally:
466            self.p.timeout = orig_timeout
467
468    def ensure_spawned(self, expect_reset=False):
469        """Ensure a connection to a correctly running U-Boot instance.
470
471        This may require spawning a new Sandbox process or resetting target
472        hardware, as defined by the implementation sub-class.
473
474        This is an internal function and should not be called directly.
475
476        Args:
477            expect_reset: Boolean indication whether this boot is expected
478                to be reset while the 1st boot process after main boot before
479                prompt. False by default.
480
481        Returns:
482            Nothing.
483        """
484
485        if self.p:
486            # Reset the console timeout value as some tests may change
487            # its default value during the execution
488            if not self.config.gdbserver:
489                self.p.timeout = TIMEOUT_MS
490            return
491        try:
492            self.log.start_section('Starting U-Boot')
493            self.at_prompt = False
494            self.p = self.get_spawn()
495            # Real targets can take a long time to scroll large amounts of
496            # text if LCD is enabled. This value may need tweaking in the
497            # future, possibly per-test to be optimal. This works for 'help'
498            # on board 'seaboard'.
499            if not self.config.gdbserver:
500                self.p.timeout = TIMEOUT_MS
501            self.p.logfile_read = self.logstream
502            if self.config.use_running_system:
503                # Send an empty command to set up the 'expect' logic. This has
504                # the side effect of ensuring that there was no partial command
505                # line entered
506                self.run_command(' ')
507            else:
508                if expect_reset:
509                    loop_num = 2
510                else:
511                    loop_num = 1
512                self.wait_for_boot_prompt(loop_num = loop_num)
513            self.at_prompt = True
514            self.at_prompt_logevt = self.logstream.logfile.cur_evt
515        except Exception as ex:
516            self.log.error(str(ex))
517            self.cleanup_spawn()
518            raise
519        finally:
520            self.log.timestamp()
521            self.log.end_section('Starting U-Boot')
522
523    def cleanup_spawn(self):
524        """Shut down all interaction with the U-Boot instance.
525
526        This is used when an error is detected prior to re-establishing a
527        connection with a fresh U-Boot instance.
528
529        This is an internal function and should not be called directly.
530
531        Args:
532            None.
533
534        Returns:
535            Nothing.
536        """
537
538        try:
539            if self.p:
540                self.p.close()
541        except:
542            pass
543        self.p = None
544
545    def restart_uboot(self, expect_reset=False):
546        """Shut down and restart U-Boot."""
547        self.cleanup_spawn()
548        self.ensure_spawned(expect_reset)
549
550    def get_spawn_output(self):
551        """Return the start-up output from U-Boot
552
553        Returns:
554            The output produced by ensure_spawed(), as a string.
555        """
556        if self.p:
557            return self.p.get_expect_output()
558        return None
559
560    def validate_version_string_in_text(self, text):
561        """Assert that a command's output includes the U-Boot signon message.
562
563        This is primarily useful for validating the "version" command without
564        duplicating the signon text regex in a test function.
565
566        Args:
567            text: The command output text to check.
568
569        Returns:
570            Nothing. An exception is raised if the validation fails.
571        """
572
573        assert(self.u_boot_version_string in text)
574
575    def disable_check(self, check_type):
576        """Temporarily disable an error check of U-Boot's output.
577
578        Create a new context manager (for use with the "with" statement) which
579        temporarily disables a particular console output error check.
580
581        Args:
582            check_type: The type of error-check to disable. Valid values may
583            be found in self.disable_check_count above.
584
585        Returns:
586            A context manager object.
587        """
588
589        return ConsoleDisableCheck(self, check_type)
590
591    def enable_check(self, check_type, check_pattern):
592        """Temporarily enable an error check of U-Boot's output.
593
594        Create a new context manager (for use with the "with" statement) which
595        temporarily enables a particular console output error check. The
596        arguments form a new element of bad_pattern_defs defined above.
597
598        Args:
599            check_type: The type of error-check or bad pattern to enable.
600            check_pattern: The regexes for text error pattern or bad pattern
601                to be checked.
602
603        Returns:
604            A context manager object.
605        """
606
607        return ConsoleEnableCheck(self, check_type, check_pattern)
608
609    def temporary_timeout(self, timeout):
610        """Temporarily set up different timeout for commands.
611
612        Create a new context manager (for use with the "with" statement) which
613        temporarily change timeout.
614
615        Args:
616            timeout: Time in milliseconds.
617
618        Returns:
619            A context manager object.
620        """
621
622        return ConsoleSetupTimeout(self, timeout)
623