1#!/usr/bin/env python 2# This file is part of the OpenMV project. 3# Copyright (c) 2013/2014 Ibrahim Abdelkader <i.abdalkader@gmail.com> 4# This work is licensed under the MIT license, see the file LICENSE for 5# details. 6 7"""This module implements enough functionality to program the STM32F4xx over 8DFU, without requiring dfu-util. 9 10See app note AN3156 for a description of the DFU protocol. 11See document UM0391 for a dscription of the DFuse file. 12""" 13 14from __future__ import print_function 15 16import argparse 17import collections 18import inspect 19import re 20import struct 21import sys 22import usb.core 23import usb.util 24import zlib 25 26# USB request __TIMEOUT 27__TIMEOUT = 4000 28 29# DFU commands 30__DFU_DETACH = 0 31__DFU_DNLOAD = 1 32__DFU_UPLOAD = 2 33__DFU_GETSTATUS = 3 34__DFU_CLRSTATUS = 4 35__DFU_GETSTATE = 5 36__DFU_ABORT = 6 37 38# DFU status 39__DFU_STATE_APP_IDLE = 0x00 40__DFU_STATE_APP_DETACH = 0x01 41__DFU_STATE_DFU_IDLE = 0x02 42__DFU_STATE_DFU_DOWNLOAD_SYNC = 0x03 43__DFU_STATE_DFU_DOWNLOAD_BUSY = 0x04 44__DFU_STATE_DFU_DOWNLOAD_IDLE = 0x05 45__DFU_STATE_DFU_MANIFEST_SYNC = 0x06 46__DFU_STATE_DFU_MANIFEST = 0x07 47__DFU_STATE_DFU_MANIFEST_WAIT_RESET = 0x08 48__DFU_STATE_DFU_UPLOAD_IDLE = 0x09 49__DFU_STATE_DFU_ERROR = 0x0A 50 51_DFU_DESCRIPTOR_TYPE = 0x21 52 53__DFU_STATUS_STR = { 54 __DFU_STATE_APP_IDLE: "STATE_APP_IDLE", 55 __DFU_STATE_APP_DETACH: "STATE_APP_DETACH", 56 __DFU_STATE_DFU_IDLE: "STATE_DFU_IDLE", 57 __DFU_STATE_DFU_DOWNLOAD_SYNC: "STATE_DFU_DOWNLOAD_SYNC", 58 __DFU_STATE_DFU_DOWNLOAD_BUSY: "STATE_DFU_DOWNLOAD_BUSY", 59 __DFU_STATE_DFU_DOWNLOAD_IDLE: "STATE_DFU_DOWNLOAD_IDLE", 60 __DFU_STATE_DFU_MANIFEST_SYNC: "STATE_DFU_MANIFEST_SYNC", 61 __DFU_STATE_DFU_MANIFEST: "STATE_DFU_MANIFEST", 62 __DFU_STATE_DFU_MANIFEST_WAIT_RESET: "STATE_DFU_MANIFEST_WAIT_RESET", 63 __DFU_STATE_DFU_UPLOAD_IDLE: "STATE_DFU_UPLOAD_IDLE", 64 __DFU_STATE_DFU_ERROR: "STATE_DFU_ERROR", 65} 66 67# USB device handle 68__dev = None 69 70# Configuration descriptor of the device 71__cfg_descr = None 72 73__verbose = None 74 75# USB DFU interface 76__DFU_INTERFACE = 0 77 78# Python 3 deprecated getargspec in favour of getfullargspec, but 79# Python 2 doesn't have the latter, so detect which one to use 80getargspec = getattr(inspect, "getfullargspec", inspect.getargspec) 81 82if "length" in getargspec(usb.util.get_string).args: 83 # PyUSB 1.0.0.b1 has the length argument 84 def get_string(dev, index): 85 return usb.util.get_string(dev, 255, index) 86 87 88else: 89 # PyUSB 1.0.0.b2 dropped the length argument 90 def get_string(dev, index): 91 return usb.util.get_string(dev, index) 92 93 94def find_dfu_cfg_descr(descr): 95 if len(descr) == 9 and descr[0] == 9 and descr[1] == _DFU_DESCRIPTOR_TYPE: 96 nt = collections.namedtuple( 97 "CfgDescr", 98 [ 99 "bLength", 100 "bDescriptorType", 101 "bmAttributes", 102 "wDetachTimeOut", 103 "wTransferSize", 104 "bcdDFUVersion", 105 ], 106 ) 107 return nt(*struct.unpack("<BBBHHH", bytearray(descr))) 108 return None 109 110 111def init(**kwargs): 112 """Initializes the found DFU device so that we can program it.""" 113 global __dev, __cfg_descr 114 devices = get_dfu_devices(**kwargs) 115 if not devices: 116 raise ValueError("No DFU device found") 117 if len(devices) > 1: 118 raise ValueError("Multiple DFU devices found") 119 __dev = devices[0] 120 __dev.set_configuration() 121 122 # Claim DFU interface 123 usb.util.claim_interface(__dev, __DFU_INTERFACE) 124 125 # Find the DFU configuration descriptor, either in the device or interfaces 126 __cfg_descr = None 127 for cfg in __dev.configurations(): 128 __cfg_descr = find_dfu_cfg_descr(cfg.extra_descriptors) 129 if __cfg_descr: 130 break 131 for itf in cfg.interfaces(): 132 __cfg_descr = find_dfu_cfg_descr(itf.extra_descriptors) 133 if __cfg_descr: 134 break 135 136 # Get device into idle state 137 for attempt in range(4): 138 status = get_status() 139 if status == __DFU_STATE_DFU_IDLE: 140 break 141 elif status == __DFU_STATE_DFU_DOWNLOAD_IDLE or status == __DFU_STATE_DFU_UPLOAD_IDLE: 142 abort_request() 143 else: 144 clr_status() 145 146 147def abort_request(): 148 """Sends an abort request.""" 149 __dev.ctrl_transfer(0x21, __DFU_ABORT, 0, __DFU_INTERFACE, None, __TIMEOUT) 150 151 152def clr_status(): 153 """Clears any error status (perhaps left over from a previous session).""" 154 __dev.ctrl_transfer(0x21, __DFU_CLRSTATUS, 0, __DFU_INTERFACE, None, __TIMEOUT) 155 156 157def get_status(): 158 """Get the status of the last operation.""" 159 stat = __dev.ctrl_transfer(0xA1, __DFU_GETSTATUS, 0, __DFU_INTERFACE, 6, 20000) 160 161 # firmware can provide an optional string for any error 162 if stat[5]: 163 message = get_string(__dev, stat[5]) 164 if message: 165 print(message) 166 167 return stat[4] 168 169 170def check_status(stage, expected): 171 status = get_status() 172 if status != expected: 173 raise SystemExit("DFU: %s failed (%s)" % (stage, __DFU_STATUS_STR.get(status, status))) 174 175 176def mass_erase(): 177 """Performs a MASS erase (i.e. erases the entire device).""" 178 # Send DNLOAD with first byte=0x41 179 __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, "\x41", __TIMEOUT) 180 181 # Execute last command 182 check_status("erase", __DFU_STATE_DFU_DOWNLOAD_BUSY) 183 184 # Check command state 185 check_status("erase", __DFU_STATE_DFU_DOWNLOAD_IDLE) 186 187 188def page_erase(addr): 189 """Erases a single page.""" 190 if __verbose: 191 print("Erasing page: 0x%x..." % (addr)) 192 193 # Send DNLOAD with first byte=0x41 and page address 194 buf = struct.pack("<BI", 0x41, addr) 195 __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, __TIMEOUT) 196 197 # Execute last command 198 check_status("erase", __DFU_STATE_DFU_DOWNLOAD_BUSY) 199 200 # Check command state 201 check_status("erase", __DFU_STATE_DFU_DOWNLOAD_IDLE) 202 203 204def set_address(addr): 205 """Sets the address for the next operation.""" 206 # Send DNLOAD with first byte=0x21 and page address 207 buf = struct.pack("<BI", 0x21, addr) 208 __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, __TIMEOUT) 209 210 # Execute last command 211 check_status("set address", __DFU_STATE_DFU_DOWNLOAD_BUSY) 212 213 # Check command state 214 check_status("set address", __DFU_STATE_DFU_DOWNLOAD_IDLE) 215 216 217def write_memory(addr, buf, progress=None, progress_addr=0, progress_size=0): 218 """Writes a buffer into memory. This routine assumes that memory has 219 already been erased. 220 """ 221 222 xfer_count = 0 223 xfer_bytes = 0 224 xfer_total = len(buf) 225 xfer_base = addr 226 227 while xfer_bytes < xfer_total: 228 if __verbose and xfer_count % 512 == 0: 229 print( 230 "Addr 0x%x %dKBs/%dKBs..." 231 % (xfer_base + xfer_bytes, xfer_bytes // 1024, xfer_total // 1024) 232 ) 233 if progress and xfer_count % 2 == 0: 234 progress(progress_addr, xfer_base + xfer_bytes - progress_addr, progress_size) 235 236 # Set mem write address 237 set_address(xfer_base + xfer_bytes) 238 239 # Send DNLOAD with fw data 240 chunk = min(__cfg_descr.wTransferSize, xfer_total - xfer_bytes) 241 __dev.ctrl_transfer( 242 0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf[xfer_bytes : xfer_bytes + chunk], __TIMEOUT 243 ) 244 245 # Execute last command 246 check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_BUSY) 247 248 # Check command state 249 check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_IDLE) 250 251 xfer_count += 1 252 xfer_bytes += chunk 253 254 255def write_page(buf, xfer_offset): 256 """Writes a single page. This routine assumes that memory has already 257 been erased. 258 """ 259 260 xfer_base = 0x08000000 261 262 # Set mem write address 263 set_address(xfer_base + xfer_offset) 264 265 # Send DNLOAD with fw data 266 __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf, __TIMEOUT) 267 268 # Execute last command 269 check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_BUSY) 270 271 # Check command state 272 check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_IDLE) 273 274 if __verbose: 275 print("Write: 0x%x " % (xfer_base + xfer_offset)) 276 277 278def exit_dfu(): 279 """Exit DFU mode, and start running the program.""" 280 # Set jump address 281 set_address(0x08000000) 282 283 # Send DNLOAD with 0 length to exit DFU 284 __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, None, __TIMEOUT) 285 286 try: 287 # Execute last command 288 if get_status() != __DFU_STATE_DFU_MANIFEST: 289 print("Failed to reset device") 290 291 # Release device 292 usb.util.dispose_resources(__dev) 293 except: 294 pass 295 296 297def named(values, names): 298 """Creates a dict with `names` as fields, and `values` as values.""" 299 return dict(zip(names.split(), values)) 300 301 302def consume(fmt, data, names): 303 """Parses the struct defined by `fmt` from `data`, stores the parsed fields 304 into a named tuple using `names`. Returns the named tuple, and the data 305 with the struct stripped off.""" 306 307 size = struct.calcsize(fmt) 308 return named(struct.unpack(fmt, data[:size]), names), data[size:] 309 310 311def cstring(string): 312 """Extracts a null-terminated string from a byte array.""" 313 return string.decode("utf-8").split("\0", 1)[0] 314 315 316def compute_crc(data): 317 """Computes the CRC32 value for the data passed in.""" 318 return 0xFFFFFFFF & -zlib.crc32(data) - 1 319 320 321def read_dfu_file(filename): 322 """Reads a DFU file, and parses the individual elements from the file. 323 Returns an array of elements. Each element is a dictionary with the 324 following keys: 325 num - The element index. 326 address - The address that the element data should be written to. 327 size - The size of the element data. 328 data - The element data. 329 If an error occurs while parsing the file, then None is returned. 330 """ 331 332 print("File: {}".format(filename)) 333 with open(filename, "rb") as fin: 334 data = fin.read() 335 crc = compute_crc(data[:-4]) 336 elements = [] 337 338 # Decode the DFU Prefix 339 # 340 # <5sBIB 341 # < little endian Endianness 342 # 5s char[5] signature "DfuSe" 343 # B uint8_t version 1 344 # I uint32_t size Size of the DFU file (without suffix) 345 # B uint8_t targets Number of targets 346 dfu_prefix, data = consume("<5sBIB", data, "signature version size targets") 347 print( 348 " %(signature)s v%(version)d, image size: %(size)d, " 349 "targets: %(targets)d" % dfu_prefix 350 ) 351 for target_idx in range(dfu_prefix["targets"]): 352 # Decode the Image Prefix 353 # 354 # <6sBI255s2I 355 # < little endian Endianness 356 # 6s char[6] signature "Target" 357 # B uint8_t altsetting 358 # I uint32_t named Bool indicating if a name was used 359 # 255s char[255] name Name of the target 360 # I uint32_t size Size of image (without prefix) 361 # I uint32_t elements Number of elements in the image 362 img_prefix, data = consume( 363 "<6sBI255s2I", data, "signature altsetting named name " "size elements" 364 ) 365 img_prefix["num"] = target_idx 366 if img_prefix["named"]: 367 img_prefix["name"] = cstring(img_prefix["name"]) 368 else: 369 img_prefix["name"] = "" 370 print( 371 " %(signature)s %(num)d, alt setting: %(altsetting)s, " 372 'name: "%(name)s", size: %(size)d, elements: %(elements)d' % img_prefix 373 ) 374 375 target_size = img_prefix["size"] 376 target_data = data[:target_size] 377 data = data[target_size:] 378 for elem_idx in range(img_prefix["elements"]): 379 # Decode target prefix 380 # 381 # <2I 382 # < little endian Endianness 383 # I uint32_t element Address 384 # I uint32_t element Size 385 elem_prefix, target_data = consume("<2I", target_data, "addr size") 386 elem_prefix["num"] = elem_idx 387 print(" %(num)d, address: 0x%(addr)08x, size: %(size)d" % elem_prefix) 388 elem_size = elem_prefix["size"] 389 elem_data = target_data[:elem_size] 390 target_data = target_data[elem_size:] 391 elem_prefix["data"] = elem_data 392 elements.append(elem_prefix) 393 394 if len(target_data): 395 print("target %d PARSE ERROR" % target_idx) 396 397 # Decode DFU Suffix 398 # 399 # <4H3sBI 400 # < little endian Endianness 401 # H uint16_t device Firmware version 402 # H uint16_t product 403 # H uint16_t vendor 404 # H uint16_t dfu 0x11a (DFU file format version) 405 # 3s char[3] ufd "UFD" 406 # B uint8_t len 16 407 # I uint32_t crc32 Checksum 408 dfu_suffix = named( 409 struct.unpack("<4H3sBI", data[:16]), "device product vendor dfu ufd len crc" 410 ) 411 print( 412 " usb: %(vendor)04x:%(product)04x, device: 0x%(device)04x, " 413 "dfu: 0x%(dfu)04x, %(ufd)s, %(len)d, 0x%(crc)08x" % dfu_suffix 414 ) 415 if crc != dfu_suffix["crc"]: 416 print("CRC ERROR: computed crc32 is 0x%08x" % crc) 417 return 418 data = data[16:] 419 if data: 420 print("PARSE ERROR") 421 return 422 423 return elements 424 425 426class FilterDFU(object): 427 """Class for filtering USB devices to identify devices which are in DFU 428 mode. 429 """ 430 431 def __call__(self, device): 432 for cfg in device: 433 for intf in cfg: 434 return intf.bInterfaceClass == 0xFE and intf.bInterfaceSubClass == 1 435 436 437def get_dfu_devices(*args, **kwargs): 438 """Returns a list of USB devices which are currently in DFU mode. 439 Additional filters (like idProduct and idVendor) can be passed in 440 to refine the search. 441 """ 442 443 # Convert to list for compatibility with newer PyUSB 444 return list(usb.core.find(*args, find_all=True, custom_match=FilterDFU(), **kwargs)) 445 446 447def get_memory_layout(device): 448 """Returns an array which identifies the memory layout. Each entry 449 of the array will contain a dictionary with the following keys: 450 addr - Address of this memory segment. 451 last_addr - Last address contained within the memory segment. 452 size - Size of the segment, in bytes. 453 num_pages - Number of pages in the segment. 454 page_size - Size of each page, in bytes. 455 """ 456 457 cfg = device[0] 458 intf = cfg[(0, 0)] 459 mem_layout_str = get_string(device, intf.iInterface) 460 mem_layout = mem_layout_str.split("/") 461 result = [] 462 for mem_layout_index in range(1, len(mem_layout), 2): 463 addr = int(mem_layout[mem_layout_index], 0) 464 segments = mem_layout[mem_layout_index + 1].split(",") 465 seg_re = re.compile(r"(\d+)\*(\d+)(.)(.)") 466 for segment in segments: 467 seg_match = seg_re.match(segment) 468 num_pages = int(seg_match.groups()[0], 10) 469 page_size = int(seg_match.groups()[1], 10) 470 multiplier = seg_match.groups()[2] 471 if multiplier == "K": 472 page_size *= 1024 473 if multiplier == "M": 474 page_size *= 1024 * 1024 475 size = num_pages * page_size 476 last_addr = addr + size - 1 477 result.append( 478 named( 479 (addr, last_addr, size, num_pages, page_size), 480 "addr last_addr size num_pages page_size", 481 ) 482 ) 483 addr += size 484 return result 485 486 487def list_dfu_devices(*args, **kwargs): 488 """Prints a lits of devices detected in DFU mode.""" 489 devices = get_dfu_devices(*args, **kwargs) 490 if not devices: 491 raise SystemExit("No DFU capable devices found") 492 for device in devices: 493 print( 494 "Bus {} Device {:03d}: ID {:04x}:{:04x}".format( 495 device.bus, device.address, device.idVendor, device.idProduct 496 ) 497 ) 498 layout = get_memory_layout(device) 499 print("Memory Layout") 500 for entry in layout: 501 print( 502 " 0x{:x} {:2d} pages of {:3d}K bytes".format( 503 entry["addr"], entry["num_pages"], entry["page_size"] // 1024 504 ) 505 ) 506 507 508def write_elements(elements, mass_erase_used, progress=None): 509 """Writes the indicated elements into the target memory, 510 erasing as needed. 511 """ 512 513 mem_layout = get_memory_layout(__dev) 514 for elem in elements: 515 addr = elem["addr"] 516 size = elem["size"] 517 data = elem["data"] 518 elem_size = size 519 elem_addr = addr 520 if progress and elem_size: 521 progress(elem_addr, 0, elem_size) 522 while size > 0: 523 write_size = size 524 if not mass_erase_used: 525 for segment in mem_layout: 526 if addr >= segment["addr"] and addr <= segment["last_addr"]: 527 # We found the page containing the address we want to 528 # write, erase it 529 page_size = segment["page_size"] 530 page_addr = addr & ~(page_size - 1) 531 if addr + write_size > page_addr + page_size: 532 write_size = page_addr + page_size - addr 533 page_erase(page_addr) 534 break 535 write_memory(addr, data[:write_size], progress, elem_addr, elem_size) 536 data = data[write_size:] 537 addr += write_size 538 size -= write_size 539 if progress: 540 progress(elem_addr, addr - elem_addr, elem_size) 541 542 543def cli_progress(addr, offset, size): 544 """Prints a progress report suitable for use on the command line.""" 545 width = 25 546 done = offset * width // size 547 print( 548 "\r0x{:08x} {:7d} [{}{}] {:3d}% ".format( 549 addr, size, "=" * done, " " * (width - done), offset * 100 // size 550 ), 551 end="", 552 ) 553 try: 554 sys.stdout.flush() 555 except OSError: 556 pass # Ignore Windows CLI "WinError 87" on Python 3.6 557 if offset == size: 558 print("") 559 560 561def main(): 562 """Test program for verifying this files functionality.""" 563 global __verbose 564 # Parse CMD args 565 parser = argparse.ArgumentParser(description="DFU Python Util") 566 parser.add_argument( 567 "-l", "--list", help="list available DFU devices", action="store_true", default=False 568 ) 569 parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=None) 570 parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=None) 571 parser.add_argument( 572 "-m", "--mass-erase", help="mass erase device", action="store_true", default=False 573 ) 574 parser.add_argument( 575 "-u", "--upload", help="read file from DFU device", dest="path", default=False 576 ) 577 parser.add_argument("-x", "--exit", help="Exit DFU", action="store_true", default=False) 578 parser.add_argument( 579 "-v", "--verbose", help="increase output verbosity", action="store_true", default=False 580 ) 581 args = parser.parse_args() 582 583 __verbose = args.verbose 584 585 kwargs = {} 586 if args.vid: 587 kwargs["idVendor"] = args.vid 588 589 if args.pid: 590 kwargs["idProduct"] = args.pid 591 592 if args.list: 593 list_dfu_devices(**kwargs) 594 return 595 596 init(**kwargs) 597 598 command_run = False 599 if args.mass_erase: 600 print("Mass erase...") 601 mass_erase() 602 command_run = True 603 604 if args.path: 605 elements = read_dfu_file(args.path) 606 if not elements: 607 print("No data in dfu file") 608 return 609 print("Writing memory...") 610 write_elements(elements, args.mass_erase, progress=cli_progress) 611 612 print("Exiting DFU...") 613 exit_dfu() 614 command_run = True 615 616 if args.exit: 617 print("Exiting DFU...") 618 exit_dfu() 619 command_run = True 620 621 if command_run: 622 print("Finished") 623 else: 624 print("No command specified") 625 626 627if __name__ == "__main__": 628 main() 629