1import os
2import sys
3import signal
4import subprocess
5from subprocess import PIPE, STDOUT
6import io
7import time
8import select
9
10
11def wait_for_output(fp: io.TextIOBase, predicate, timeout: float):
12    start_time = time.time()
13    end_time = start_time + timeout
14    poll_obj = select.poll()
15    poll_obj.register(fp, select.POLLIN)
16    output = []
17    cur_line = ""
18    while time.time() < end_time:
19        poll_result = poll_obj.poll(max(end_time-time.time(), 0.001))
20        if poll_result:
21            data = fp.read()
22            if "\n" in data:
23                output.append(cur_line + data[:data.index("\n") + 1])
24                cur_line = data[data.index("\n") + 1:]
25                if predicate(output[-1]):
26                    return True, output
27            else:
28                cur_line += data
29
30    return False, output
31
32
33def shutdown_little_kernel(p: subprocess.Popen):
34    try:
35        ret = p.poll()
36        if ret:
37            print("LittleKernel already exited with code", ret)
38            return
39        status_path = "/proc/{}/status".format(p.pid)
40        if os.path.exists(status_path):
41            with open(status_path) as fp:
42                lines = fp.readlines()
43                state_line = [l for l in lines if "State:" in l]
44                if state_line:
45                    print("LittleKernel process state after test:",state_line[0].rstrip())
46        else:
47            print(status_path, "does not exists")
48        p.stdin.write("poweroff\n")
49        p.stdin.flush()
50        p.wait(0.3)
51        p.send_signal(signal.SIGINT)
52        p.wait(1)
53    except subprocess.TimeoutExpired:
54        pass
55    finally:
56        p.kill()
57        p.wait()
58
59
60def main():
61    # Test relies on reading subprocess output, so set bufsize=0
62    # to ensure that we get real-time output.
63    p = subprocess.Popen(['qemu-system-aarch64',
64                          '-cpu',
65                          'max',
66                          '-m',
67                          '512',
68                          '-smp',
69                          '1',
70                          '-machine',
71                          'virt,highmem=off',
72                          '-kernel',
73                          'build-qemu-virt-arm64-test/lk.elf',
74                          '-net',
75                          'none',
76                          '-nographic',
77                          '-drive',
78                          'if=none,file=lib/uefi/helloworld_aa64.efi,id=blk,format=raw',
79                          '-device',
80                          'virtio-blk-device,drive=blk'], stdout=PIPE, stdin=PIPE, stderr=STDOUT, text=True, bufsize=0)
81    try:
82        os.set_blocking(p.stdout.fileno(), False)
83        condition_met, output = wait_for_output(
84            p.stdout, lambda l: "starting app shell" in l, 5)
85        assert condition_met, "Did not see 'starting app shell', stdout: {}".format(
86            "".join(output))
87        p.stdin.write("uefi_load virtio0\n")
88        p.stdin.flush()
89        condition_met, output = wait_for_output(
90            p.stdout, lambda l: "Hello World!" in l, 0.5)
91        print("".join(output))
92        if condition_met:
93            return
94
95    finally:
96        shutdown_little_kernel(p)
97
98
99if __name__ == "__main__":
100    main()
101