1import os 2import sys 3import signal 4import subprocess 5from subprocess import PIPE, STDOUT 6import io 7import time 8import select 9 10 11def wait_for_output(fp: io.TextIOBase, predicate, timeout: float): 12 start_time = time.time() 13 end_time = start_time + timeout 14 poll_obj = select.poll() 15 poll_obj.register(fp, select.POLLIN) 16 output = [] 17 cur_line = "" 18 while time.time() < end_time: 19 poll_result = poll_obj.poll(max(end_time-time.time(), 0.001)) 20 if poll_result: 21 data = fp.read() 22 if "\n" in data: 23 output.append(cur_line + data[:data.index("\n") + 1]) 24 cur_line = data[data.index("\n") + 1:] 25 if predicate(output[-1]): 26 return True, output 27 else: 28 cur_line += data 29 30 return False, output 31 32 33def shutdown_little_kernel(p: subprocess.Popen): 34 try: 35 ret = p.poll() 36 if ret: 37 print("LittleKernel already exited with code", ret) 38 return 39 status_path = "/proc/{}/status".format(p.pid) 40 if os.path.exists(status_path): 41 with open(status_path) as fp: 42 lines = fp.readlines() 43 state_line = [l for l in lines if "State:" in l] 44 if state_line: 45 print("LittleKernel process state after test:",state_line[0].rstrip()) 46 else: 47 print(status_path, "does not exists") 48 p.stdin.write("poweroff\n") 49 p.stdin.flush() 50 p.wait(0.3) 51 p.send_signal(signal.SIGINT) 52 p.wait(1) 53 except subprocess.TimeoutExpired: 54 pass 55 finally: 56 p.kill() 57 p.wait() 58 59 60def main(): 61 # Test relies on reading subprocess output, so set bufsize=0 62 # to ensure that we get real-time output. 63 p = subprocess.Popen(['qemu-system-aarch64', 64 '-cpu', 65 'max', 66 '-m', 67 '512', 68 '-smp', 69 '1', 70 '-machine', 71 'virt,highmem=off', 72 '-kernel', 73 'build-qemu-virt-arm64-test/lk.elf', 74 '-net', 75 'none', 76 '-nographic', 77 '-drive', 78 'if=none,file=lib/uefi/helloworld_aa64.efi,id=blk,format=raw', 79 '-device', 80 'virtio-blk-device,drive=blk'], stdout=PIPE, stdin=PIPE, stderr=STDOUT, text=True, bufsize=0) 81 try: 82 os.set_blocking(p.stdout.fileno(), False) 83 condition_met, output = wait_for_output( 84 p.stdout, lambda l: "starting app shell" in l, 5) 85 assert condition_met, "Did not see 'starting app shell', stdout: {}".format( 86 "".join(output)) 87 p.stdin.write("uefi_load virtio0\n") 88 p.stdin.flush() 89 condition_met, output = wait_for_output( 90 p.stdout, lambda l: "Hello World!" in l, 0.5) 91 print("".join(output)) 92 if condition_met: 93 return 94 95 finally: 96 shutdown_little_kernel(p) 97 98 99if __name__ == "__main__": 100 main() 101