1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-2-Clause 3# 4# Copyright (c) 2014, Linaro Limited 5# Copyright (c) 2021, Huawei Technologies Co., Ltd 6# 7 8import argparse 9import os 10import select 11import socket 12import sys 13import termios 14 15handle_telnet = False 16cmd_bytes = bytearray() 17 18TELNET_IAC = 0xff 19TELNET_DO = 0xfd 20TELNET_WILL = 0xfb 21TELNET_SUPRESS_GO_AHEAD = 0x1 22 23 24def get_args(): 25 26 parser = argparse.ArgumentParser(description='Starts a TCP server to be ' 27 'used as a terminal (for QEMU or FVP). ' 28 'When the server receives a connection ' 29 'it puts the terminal in raw mode so ' 30 'that control characters (Ctrl-C etc.) ' 31 'are interpreted remotely. Only when the ' 32 'peer has closed the connection the ' 33 'terminal settings are restored.') 34 parser.add_argument('port', nargs=1, type=int, 35 help='local TCP port to listen on') 36 parser.add_argument('-t', '--telnet', action='store_true', 37 help='handle telnet commands (FVP)') 38 return parser.parse_args() 39 40 41def set_stty_noncanonical(): 42 43 t = termios.tcgetattr(sys.stdin.fileno()) 44 # iflag 45 t[0] = t[0] & ~termios.ICRNL 46 # lflag 47 t[3] = t[3] & ~(termios.ICANON | termios.ECHO | termios.ISIG) 48 t[6][termios.VMIN] = 1 # Character-at-a-time input 49 t[6][termios.VTIME] = 0 # with blocking 50 termios.tcsetattr(sys.stdin.fileno(), termios.TCSAFLUSH, t) 51 52 53def handle_telnet_codes(fd, buf): 54 55 global handle_telnet 56 global cmd_bytes 57 58 if (not handle_telnet): 59 return 60 61 if (fd == -1): 62 cmd_bytes.clear() 63 return 64 65 # Iterate on a copy because buf is modified in the loop 66 for c in bytearray(buf): 67 if (len(cmd_bytes) or c == TELNET_IAC): 68 cmd_bytes.append(c) 69 del buf[0] 70 if (len(cmd_bytes) == 3): 71 if (cmd_bytes[1] == TELNET_DO): 72 cmd_bytes[1] = TELNET_WILL 73 elif (cmd_bytes[1] == TELNET_WILL): 74 if (cmd_bytes[2] == TELNET_SUPRESS_GO_AHEAD): 75 # We're done after responding to this 76 handle_telnet = False 77 cmd_bytes[1] = TELNET_DO 78 else: 79 # Unknown command, ignore it 80 cmd_bytes.clear() 81 if (len(cmd_bytes)): 82 os.write(fd, cmd_bytes) 83 cmd_bytes.clear() 84 85 86def serve_conn(conn): 87 88 fd = conn.fileno() 89 poll = select.poll() 90 poll.register(sys.stdin.fileno(), select.POLLIN) 91 poll.register(fd, select.POLLIN) 92 while (True): 93 for readyfd, _ in poll.poll(): 94 try: 95 data = os.read(readyfd, 512) 96 if (len(data) == 0): 97 print('soc_term: read fd EOF') 98 return 99 buf = bytearray(data) 100 handle_telnet_codes(readyfd, buf) 101 if (readyfd == fd): 102 to = sys.stdout.fileno() 103 else: 104 to = fd 105 except ConnectionResetError: 106 print('soc_term: connection reset') 107 return 108 try: 109 # Python >= 3.5 handles EINTR internally so no loop required 110 os.write(to, buf) 111 except WriteErrorException: 112 print('soc_term: write error') 113 return 114 115 116def main(): 117 118 global handle_telnet 119 args = get_args() 120 port = args.port[0] 121 sock = socket.socket() 122 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 123 sock.bind(('127.0.0.1', port)) 124 sock.listen(5) 125 print(f'listening on port {port}') 126 if (args.telnet): 127 print('Handling telnet commands') 128 old_term = termios.tcgetattr(sys.stdin.fileno()) 129 while True: 130 try: 131 conn, _ = sock.accept() 132 print(f'soc_term: accepted fd {conn.fileno()}') 133 handle_telnet = args.telnet 134 handle_telnet_codes(-1, bytearray()) # Reset internal state 135 set_stty_noncanonical() 136 serve_conn(conn) 137 conn.close() 138 except KeyboardInterrupt: 139 return 140 finally: 141 termios.tcsetattr(sys.stdin.fileno(), termios.TCSAFLUSH, old_term) 142 143 144if __name__ == "__main__": 145 main() 146