1#!/usr/bin/env python3 2# 3# This file is part of the MicroPython project, http://micropython.org/ 4# 5# The MIT License (MIT) 6# 7# Copyright (c) 2014-2021 Damien P. George 8# Copyright (c) 2017 Paul Sokolovsky 9# 10# Permission is hereby granted, free of charge, to any person obtaining a copy 11# of this software and associated documentation files (the "Software"), to deal 12# in the Software without restriction, including without limitation the rights 13# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 14# copies of the Software, and to permit persons to whom the Software is 15# furnished to do so, subject to the following conditions: 16# 17# The above copyright notice and this permission notice shall be included in 18# all copies or substantial portions of the Software. 19# 20# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 21# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 22# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 23# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 24# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 25# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 26# THE SOFTWARE. 27 28""" 29pyboard interface 30 31This module provides the Pyboard class, used to communicate with and 32control a MicroPython device over a communication channel. Both real 33boards and emulated devices (e.g. running in QEMU) are supported. 34Various communication channels are supported, including a serial 35connection, telnet-style network connection, external process 36connection. 37 38Example usage: 39 40 import pyboard 41 pyb = pyboard.Pyboard('/dev/ttyACM0') 42 43Or: 44 45 pyb = pyboard.Pyboard('192.168.1.1') 46 47Then: 48 49 pyb.enter_raw_repl() 50 pyb.exec('import pyb') 51 pyb.exec('pyb.LED(1).on()') 52 pyb.exit_raw_repl() 53 54Note: if using Python2 then pyb.exec must be written as pyb.exec_. 55To run a script from the local machine on the board and print out the results: 56 57 import pyboard 58 pyboard.execfile('test.py', device='/dev/ttyACM0') 59 60This script can also be run directly. To execute a local script, use: 61 62 ./pyboard.py test.py 63 64Or: 65 66 python pyboard.py test.py 67 68""" 69 70import sys 71import time 72import os 73import ast 74 75try: 76 stdout = sys.stdout.buffer 77except AttributeError: 78 # Python2 doesn't have buffer attr 79 stdout = sys.stdout 80 81 82def stdout_write_bytes(b): 83 b = b.replace(b"\x04", b"") 84 stdout.write(b) 85 stdout.flush() 86 87 88class PyboardError(Exception): 89 pass 90 91 92class TelnetToSerial: 93 def __init__(self, ip, user, password, read_timeout=None): 94 self.tn = None 95 import telnetlib 96 97 self.tn = telnetlib.Telnet(ip, timeout=15) 98 self.read_timeout = read_timeout 99 if b"Login as:" in self.tn.read_until(b"Login as:", timeout=read_timeout): 100 self.tn.write(bytes(user, "ascii") + b"\r\n") 101 102 if b"Password:" in self.tn.read_until(b"Password:", timeout=read_timeout): 103 # needed because of internal implementation details of the telnet server 104 time.sleep(0.2) 105 self.tn.write(bytes(password, "ascii") + b"\r\n") 106 107 if b"for more information." in self.tn.read_until( 108 b'Type "help()" for more information.', timeout=read_timeout 109 ): 110 # login successful 111 from collections import deque 112 113 self.fifo = deque() 114 return 115 116 raise PyboardError("Failed to establish a telnet connection with the board") 117 118 def __del__(self): 119 self.close() 120 121 def close(self): 122 if self.tn: 123 self.tn.close() 124 125 def read(self, size=1): 126 while len(self.fifo) < size: 127 timeout_count = 0 128 data = self.tn.read_eager() 129 if len(data): 130 self.fifo.extend(data) 131 timeout_count = 0 132 else: 133 time.sleep(0.25) 134 if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: 135 break 136 timeout_count += 1 137 138 data = b"" 139 while len(data) < size and len(self.fifo) > 0: 140 data += bytes([self.fifo.popleft()]) 141 return data 142 143 def write(self, data): 144 self.tn.write(data) 145 return len(data) 146 147 def inWaiting(self): 148 n_waiting = len(self.fifo) 149 if not n_waiting: 150 data = self.tn.read_eager() 151 self.fifo.extend(data) 152 return len(data) 153 else: 154 return n_waiting 155 156 157class ProcessToSerial: 158 "Execute a process and emulate serial connection using its stdin/stdout." 159 160 def __init__(self, cmd): 161 import subprocess 162 163 self.subp = subprocess.Popen( 164 cmd, 165 bufsize=0, 166 shell=True, 167 preexec_fn=os.setsid, 168 stdin=subprocess.PIPE, 169 stdout=subprocess.PIPE, 170 ) 171 172 # Initially was implemented with selectors, but that adds Python3 173 # dependency. However, there can be race conditions communicating 174 # with a particular child process (like QEMU), and selectors may 175 # still work better in that case, so left inplace for now. 176 # 177 # import selectors 178 # self.sel = selectors.DefaultSelector() 179 # self.sel.register(self.subp.stdout, selectors.EVENT_READ) 180 181 import select 182 183 self.poll = select.poll() 184 self.poll.register(self.subp.stdout.fileno()) 185 186 def close(self): 187 import signal 188 189 os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) 190 191 def read(self, size=1): 192 data = b"" 193 while len(data) < size: 194 data += self.subp.stdout.read(size - len(data)) 195 return data 196 197 def write(self, data): 198 self.subp.stdin.write(data) 199 return len(data) 200 201 def inWaiting(self): 202 # res = self.sel.select(0) 203 res = self.poll.poll(0) 204 if res: 205 return 1 206 return 0 207 208 209class ProcessPtyToTerminal: 210 """Execute a process which creates a PTY and prints slave PTY as 211 first line of its output, and emulate serial connection using 212 this PTY.""" 213 214 def __init__(self, cmd): 215 import subprocess 216 import re 217 import serial 218 219 self.subp = subprocess.Popen( 220 cmd.split(), 221 bufsize=0, 222 shell=False, 223 preexec_fn=os.setsid, 224 stdin=subprocess.PIPE, 225 stdout=subprocess.PIPE, 226 stderr=subprocess.PIPE, 227 ) 228 pty_line = self.subp.stderr.readline().decode("utf-8") 229 m = re.search(r"/dev/pts/[0-9]+", pty_line) 230 if not m: 231 print("Error: unable to find PTY device in startup line:", pty_line) 232 self.close() 233 sys.exit(1) 234 pty = m.group() 235 # rtscts, dsrdtr params are to workaround pyserial bug: 236 # http://stackoverflow.com/questions/34831131/pyserial-does-not-play-well-with-virtual-port 237 self.ser = serial.Serial(pty, interCharTimeout=1, rtscts=True, dsrdtr=True) 238 239 def close(self): 240 import signal 241 242 os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) 243 244 def read(self, size=1): 245 return self.ser.read(size) 246 247 def write(self, data): 248 return self.ser.write(data) 249 250 def inWaiting(self): 251 return self.ser.inWaiting() 252 253 254class Pyboard: 255 def __init__( 256 self, device, baudrate=115200, user="micro", password="python", wait=0, exclusive=True 257 ): 258 self.in_raw_repl = False 259 self.use_raw_paste = True 260 if device.startswith("exec:"): 261 self.serial = ProcessToSerial(device[len("exec:") :]) 262 elif device.startswith("execpty:"): 263 self.serial = ProcessPtyToTerminal(device[len("qemupty:") :]) 264 elif device and device[0].isdigit() and device[-1].isdigit() and device.count(".") == 3: 265 # device looks like an IP address 266 self.serial = TelnetToSerial(device, user, password, read_timeout=10) 267 else: 268 import serial 269 270 # Set options, and exclusive if pyserial supports it 271 serial_kwargs = {"baudrate": baudrate, "interCharTimeout": 1} 272 if serial.__version__ >= "3.3": 273 serial_kwargs["exclusive"] = exclusive 274 275 delayed = False 276 for attempt in range(wait + 1): 277 try: 278 self.serial = serial.Serial(device, **serial_kwargs) 279 break 280 except (OSError, IOError): # Py2 and Py3 have different errors 281 if wait == 0: 282 continue 283 if attempt == 0: 284 sys.stdout.write("Waiting {} seconds for pyboard ".format(wait)) 285 delayed = True 286 time.sleep(1) 287 sys.stdout.write(".") 288 sys.stdout.flush() 289 else: 290 if delayed: 291 print("") 292 raise PyboardError("failed to access " + device) 293 if delayed: 294 print("") 295 296 def close(self): 297 self.serial.close() 298 299 def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): 300 # if data_consumer is used then data is not accumulated and the ending must be 1 byte long 301 assert data_consumer is None or len(ending) == 1 302 303 data = self.serial.read(min_num_bytes) 304 if data_consumer: 305 data_consumer(data) 306 timeout_count = 0 307 while True: 308 if data.endswith(ending): 309 break 310 elif self.serial.inWaiting() > 0: 311 new_data = self.serial.read(1) 312 if data_consumer: 313 data_consumer(new_data) 314 data = new_data 315 else: 316 data = data + new_data 317 timeout_count = 0 318 else: 319 timeout_count += 1 320 if timeout is not None and timeout_count >= 100 * timeout: 321 break 322 time.sleep(0.01) 323 return data 324 325 def enter_raw_repl(self, soft_reset=True): 326 self.serial.write(b"\r\x03\x03") # ctrl-C twice: interrupt any running program 327 328 # flush input (without relying on serial.flushInput()) 329 n = self.serial.inWaiting() 330 while n > 0: 331 self.serial.read(n) 332 n = self.serial.inWaiting() 333 334 self.serial.write(b"\r\x01") # ctrl-A: enter raw REPL 335 336 if soft_reset: 337 data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n>") 338 if not data.endswith(b"raw REPL; CTRL-B to exit\r\n>"): 339 print(data) 340 raise PyboardError("could not enter raw repl") 341 342 self.serial.write(b"\x04") # ctrl-D: soft reset 343 344 # Waiting for "soft reboot" independently to "raw REPL" (done below) 345 # allows boot.py to print, which will show up after "soft reboot" 346 # and before "raw REPL". 347 data = self.read_until(1, b"soft reboot\r\n") 348 if not data.endswith(b"soft reboot\r\n"): 349 print(data) 350 raise PyboardError("could not enter raw repl") 351 352 data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n") 353 if not data.endswith(b"raw REPL; CTRL-B to exit\r\n"): 354 print(data) 355 raise PyboardError("could not enter raw repl") 356 357 self.in_raw_repl = True 358 359 def exit_raw_repl(self): 360 self.serial.write(b"\r\x02") # ctrl-B: enter friendly REPL 361 self.in_raw_repl = False 362 363 def follow(self, timeout, data_consumer=None): 364 # wait for normal output 365 data = self.read_until(1, b"\x04", timeout=timeout, data_consumer=data_consumer) 366 if not data.endswith(b"\x04"): 367 raise PyboardError("timeout waiting for first EOF reception") 368 data = data[:-1] 369 370 # wait for error output 371 data_err = self.read_until(1, b"\x04", timeout=timeout) 372 if not data_err.endswith(b"\x04"): 373 raise PyboardError("timeout waiting for second EOF reception") 374 data_err = data_err[:-1] 375 376 # return normal and error output 377 return data, data_err 378 379 def raw_paste_write(self, command_bytes): 380 # Read initial header, with window size. 381 data = self.serial.read(2) 382 window_size = data[0] | data[1] << 8 383 window_remain = window_size 384 385 # Write out the command_bytes data. 386 i = 0 387 while i < len(command_bytes): 388 while window_remain == 0 or self.serial.inWaiting(): 389 data = self.serial.read(1) 390 if data == b"\x01": 391 # Device indicated that a new window of data can be sent. 392 window_remain += window_size 393 elif data == b"\x04": 394 # Device indicated abrupt end. Acknowledge it and finish. 395 self.serial.write(b"\x04") 396 return 397 else: 398 # Unexpected data from device. 399 raise PyboardError("unexpected read during raw paste: {}".format(data)) 400 # Send out as much data as possible that fits within the allowed window. 401 b = command_bytes[i : min(i + window_remain, len(command_bytes))] 402 self.serial.write(b) 403 window_remain -= len(b) 404 i += len(b) 405 406 # Indicate end of data. 407 self.serial.write(b"\x04") 408 409 # Wait for device to acknowledge end of data. 410 data = self.read_until(1, b"\x04") 411 if not data.endswith(b"\x04"): 412 raise PyboardError("could not complete raw paste: {}".format(data)) 413 414 def exec_raw_no_follow(self, command): 415 if isinstance(command, bytes): 416 command_bytes = command 417 else: 418 command_bytes = bytes(command, encoding="utf8") 419 420 # check we have a prompt 421 data = self.read_until(1, b">") 422 if not data.endswith(b">"): 423 raise PyboardError("could not enter raw repl") 424 425 if self.use_raw_paste: 426 # Try to enter raw-paste mode. 427 self.serial.write(b"\x05A\x01") 428 data = self.serial.read(2) 429 if data == b"R\x00": 430 # Device understood raw-paste command but doesn't support it. 431 pass 432 elif data == b"R\x01": 433 # Device supports raw-paste mode, write out the command using this mode. 434 return self.raw_paste_write(command_bytes) 435 else: 436 # Device doesn't support raw-paste, fall back to normal raw REPL. 437 data = self.read_until(1, b"w REPL; CTRL-B to exit\r\n>") 438 if not data.endswith(b"w REPL; CTRL-B to exit\r\n>"): 439 print(data) 440 raise PyboardError("could not enter raw repl") 441 # Don't try to use raw-paste mode again for this connection. 442 self.use_raw_paste = False 443 444 # Write command using standard raw REPL, 256 bytes every 10ms. 445 for i in range(0, len(command_bytes), 256): 446 self.serial.write(command_bytes[i : min(i + 256, len(command_bytes))]) 447 time.sleep(0.01) 448 self.serial.write(b"\x04") 449 450 # check if we could exec command 451 data = self.serial.read(2) 452 if data != b"OK": 453 raise PyboardError("could not exec command (response: %r)" % data) 454 455 def exec_raw(self, command, timeout=10, data_consumer=None): 456 self.exec_raw_no_follow(command) 457 return self.follow(timeout, data_consumer) 458 459 def eval(self, expression): 460 ret = self.exec_("print({})".format(expression)) 461 ret = ret.strip() 462 return ret 463 464 def exec_(self, command, data_consumer=None): 465 ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) 466 if ret_err: 467 raise PyboardError("exception", ret, ret_err) 468 return ret 469 470 def execfile(self, filename): 471 with open(filename, "rb") as f: 472 pyfile = f.read() 473 return self.exec_(pyfile) 474 475 def get_time(self): 476 t = str(self.eval("pyb.RTC().datetime()"), encoding="utf8")[1:-1].split(", ") 477 return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) 478 479 def fs_ls(self, src): 480 cmd = ( 481 "import uos\nfor f in uos.ilistdir(%s):\n" 482 " print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))" 483 % (("'%s'" % src) if src else "") 484 ) 485 self.exec_(cmd, data_consumer=stdout_write_bytes) 486 487 def fs_cat(self, src, chunk_size=256): 488 cmd = ( 489 "with open('%s') as f:\n while 1:\n" 490 " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) 491 ) 492 self.exec_(cmd, data_consumer=stdout_write_bytes) 493 494 def fs_get(self, src, dest, chunk_size=256): 495 self.exec_("f=open('%s','rb')\nr=f.read" % src) 496 with open(dest, "wb") as f: 497 while True: 498 data = bytearray() 499 self.exec_("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d)) 500 assert data.endswith(b"\r\n\x04") 501 try: 502 data = ast.literal_eval(str(data[:-3], "ascii")) 503 if not isinstance(data, bytes): 504 raise ValueError("Not bytes") 505 except (UnicodeError, ValueError) as e: 506 raise PyboardError("fs_get: Could not interpret received data: %s" % str(e)) 507 if not data: 508 break 509 f.write(data) 510 self.exec_("f.close()") 511 512 def fs_put(self, src, dest, chunk_size=256): 513 self.exec_("f=open('%s','wb')\nw=f.write" % dest) 514 with open(src, "rb") as f: 515 while True: 516 data = f.read(chunk_size) 517 if not data: 518 break 519 if sys.version_info < (3,): 520 self.exec_("w(b" + repr(data) + ")") 521 else: 522 self.exec_("w(" + repr(data) + ")") 523 self.exec_("f.close()") 524 525 def fs_mkdir(self, dir): 526 self.exec_("import uos\nuos.mkdir('%s')" % dir) 527 528 def fs_rmdir(self, dir): 529 self.exec_("import uos\nuos.rmdir('%s')" % dir) 530 531 def fs_rm(self, src): 532 self.exec_("import uos\nuos.remove('%s')" % src) 533 534 535# in Python2 exec is a keyword so one must use "exec_" 536# but for Python3 we want to provide the nicer version "exec" 537setattr(Pyboard, "exec", Pyboard.exec_) 538 539 540def execfile(filename, device="/dev/ttyACM0", baudrate=115200, user="micro", password="python"): 541 pyb = Pyboard(device, baudrate, user, password) 542 pyb.enter_raw_repl() 543 output = pyb.execfile(filename) 544 stdout_write_bytes(output) 545 pyb.exit_raw_repl() 546 pyb.close() 547 548 549def filesystem_command(pyb, args): 550 def fname_remote(src): 551 if src.startswith(":"): 552 src = src[1:] 553 return src 554 555 def fname_cp_dest(src, dest): 556 src = src.rsplit("/", 1)[-1] 557 if dest is None or dest == "": 558 dest = src 559 elif dest == ".": 560 dest = "./" + src 561 elif dest.endswith("/"): 562 dest += src 563 return dest 564 565 cmd = args[0] 566 args = args[1:] 567 try: 568 if cmd == "cp": 569 srcs = args[:-1] 570 dest = args[-1] 571 if srcs[0].startswith("./") or dest.startswith(":"): 572 op = pyb.fs_put 573 fmt = "cp %s :%s" 574 dest = fname_remote(dest) 575 else: 576 op = pyb.fs_get 577 fmt = "cp :%s %s" 578 for src in srcs: 579 src = fname_remote(src) 580 dest2 = fname_cp_dest(src, dest) 581 print(fmt % (src, dest2)) 582 op(src, dest2) 583 else: 584 op = { 585 "ls": pyb.fs_ls, 586 "cat": pyb.fs_cat, 587 "mkdir": pyb.fs_mkdir, 588 "rmdir": pyb.fs_rmdir, 589 "rm": pyb.fs_rm, 590 }[cmd] 591 if cmd == "ls" and not args: 592 args = [""] 593 for src in args: 594 src = fname_remote(src) 595 print("%s :%s" % (cmd, src)) 596 op(src) 597 except PyboardError as er: 598 print(str(er.args[2], "ascii")) 599 pyb.exit_raw_repl() 600 pyb.close() 601 sys.exit(1) 602 603 604_injected_import_hook_code = """\ 605import uos, uio 606class _FS: 607 class File(uio.IOBase): 608 def __init__(self): 609 self.off = 0 610 def ioctl(self, request, arg): 611 return 0 612 def readinto(self, buf): 613 buf[:] = memoryview(_injected_buf)[self.off:self.off + len(buf)] 614 self.off += len(buf) 615 return len(buf) 616 mount = umount = chdir = lambda *args: None 617 def stat(self, path): 618 if path == '_injected.mpy': 619 return tuple(0 for _ in range(10)) 620 else: 621 raise OSError(-2) # ENOENT 622 def open(self, path, mode): 623 return self.File() 624uos.mount(_FS(), '/_') 625uos.chdir('/_') 626from _injected import * 627uos.umount('/_') 628del _injected_buf, _FS 629""" 630 631 632def main(): 633 import argparse 634 635 cmd_parser = argparse.ArgumentParser(description="Run scripts on the pyboard.") 636 cmd_parser.add_argument( 637 "-d", 638 "--device", 639 default=os.environ.get("PYBOARD_DEVICE", "/dev/ttyACM0"), 640 help="the serial device or the IP address of the pyboard", 641 ) 642 cmd_parser.add_argument( 643 "-b", 644 "--baudrate", 645 default=os.environ.get("PYBOARD_BAUDRATE", "115200"), 646 help="the baud rate of the serial device", 647 ) 648 cmd_parser.add_argument("-u", "--user", default="micro", help="the telnet login username") 649 cmd_parser.add_argument("-p", "--password", default="python", help="the telnet login password") 650 cmd_parser.add_argument("-c", "--command", help="program passed in as string") 651 cmd_parser.add_argument( 652 "-w", 653 "--wait", 654 default=0, 655 type=int, 656 help="seconds to wait for USB connected board to become available", 657 ) 658 group = cmd_parser.add_mutually_exclusive_group() 659 group.add_argument( 660 "--soft-reset", 661 default=True, 662 action="store_true", 663 help="Whether to perform a soft reset when connecting to the board [default]", 664 ) 665 group.add_argument( 666 "--no-soft-reset", 667 action="store_false", 668 dest="soft_reset", 669 ) 670 group = cmd_parser.add_mutually_exclusive_group() 671 group.add_argument( 672 "--follow", 673 action="store_true", 674 default=None, 675 help="follow the output after running the scripts [default if no scripts given]", 676 ) 677 group.add_argument( 678 "--no-follow", 679 action="store_false", 680 dest="follow", 681 ) 682 group = cmd_parser.add_mutually_exclusive_group() 683 group.add_argument( 684 "--exclusive", 685 action="store_true", 686 default=True, 687 help="Open the serial device for exclusive access [default]", 688 ) 689 group.add_argument( 690 "--no-exclusive", 691 action="store_false", 692 dest="exclusive", 693 ) 694 cmd_parser.add_argument( 695 "-f", 696 "--filesystem", 697 action="store_true", 698 help="perform a filesystem action: " 699 "cp local :device | cp :device local | cat path | ls [path] | rm path | mkdir path | rmdir path", 700 ) 701 cmd_parser.add_argument("files", nargs="*", help="input files") 702 args = cmd_parser.parse_args() 703 704 # open the connection to the pyboard 705 try: 706 pyb = Pyboard( 707 args.device, args.baudrate, args.user, args.password, args.wait, args.exclusive 708 ) 709 except PyboardError as er: 710 print(er) 711 sys.exit(1) 712 713 # run any command or file(s) 714 if args.command is not None or args.filesystem or len(args.files): 715 # we must enter raw-REPL mode to execute commands 716 # this will do a soft-reset of the board 717 try: 718 pyb.enter_raw_repl(args.soft_reset) 719 except PyboardError as er: 720 print(er) 721 pyb.close() 722 sys.exit(1) 723 724 def execbuffer(buf): 725 try: 726 if args.follow is None or args.follow: 727 ret, ret_err = pyb.exec_raw( 728 buf, timeout=None, data_consumer=stdout_write_bytes 729 ) 730 else: 731 pyb.exec_raw_no_follow(buf) 732 ret_err = None 733 except PyboardError as er: 734 print(er) 735 pyb.close() 736 sys.exit(1) 737 except KeyboardInterrupt: 738 sys.exit(1) 739 if ret_err: 740 pyb.exit_raw_repl() 741 pyb.close() 742 stdout_write_bytes(ret_err) 743 sys.exit(1) 744 745 # do filesystem commands, if given 746 if args.filesystem: 747 filesystem_command(pyb, args.files) 748 del args.files[:] 749 750 # run the command, if given 751 if args.command is not None: 752 execbuffer(args.command.encode("utf-8")) 753 754 # run any files 755 for filename in args.files: 756 with open(filename, "rb") as f: 757 pyfile = f.read() 758 if filename.endswith(".mpy") and pyfile[0] == ord("M"): 759 pyb.exec_("_injected_buf=" + repr(pyfile)) 760 pyfile = _injected_import_hook_code 761 execbuffer(pyfile) 762 763 # exiting raw-REPL just drops to friendly-REPL mode 764 pyb.exit_raw_repl() 765 766 # if asked explicitly, or no files given, then follow the output 767 if args.follow or (args.command is None and not args.filesystem and len(args.files) == 0): 768 try: 769 ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) 770 except PyboardError as er: 771 print(er) 772 sys.exit(1) 773 except KeyboardInterrupt: 774 sys.exit(1) 775 if ret_err: 776 pyb.close() 777 stdout_write_bytes(ret_err) 778 sys.exit(1) 779 780 # close the connection to the pyboard 781 pyb.close() 782 783 784if __name__ == "__main__": 785 main() 786