1#!/usr/bin/env python
2# This file is part of the OpenMV project.
3# Copyright (c) 2013/2014 Ibrahim Abdelkader <i.abdalkader@gmail.com>
4# This work is licensed under the MIT license, see the file LICENSE for
5# details.
6
7"""This module implements enough functionality to program the STM32F4xx over
8DFU, without requiring dfu-util.
9
10See app note AN3156 for a description of the DFU protocol.
11See document UM0391 for a dscription of the DFuse file.
12"""
13
14from __future__ import print_function
15
16import argparse
17import collections
18import inspect
19import re
20import struct
21import sys
22import usb.core
23import usb.util
24import zlib
25
26# USB request __TIMEOUT
27__TIMEOUT = 4000
28
29# DFU commands
30__DFU_DETACH = 0
31__DFU_DNLOAD = 1
32__DFU_UPLOAD = 2
33__DFU_GETSTATUS = 3
34__DFU_CLRSTATUS = 4
35__DFU_GETSTATE = 5
36__DFU_ABORT = 6
37
38# DFU status
39__DFU_STATE_APP_IDLE = 0x00
40__DFU_STATE_APP_DETACH = 0x01
41__DFU_STATE_DFU_IDLE = 0x02
42__DFU_STATE_DFU_DOWNLOAD_SYNC = 0x03
43__DFU_STATE_DFU_DOWNLOAD_BUSY = 0x04
44__DFU_STATE_DFU_DOWNLOAD_IDLE = 0x05
45__DFU_STATE_DFU_MANIFEST_SYNC = 0x06
46__DFU_STATE_DFU_MANIFEST = 0x07
47__DFU_STATE_DFU_MANIFEST_WAIT_RESET = 0x08
48__DFU_STATE_DFU_UPLOAD_IDLE = 0x09
49__DFU_STATE_DFU_ERROR = 0x0A
50
51_DFU_DESCRIPTOR_TYPE = 0x21
52
53__DFU_STATUS_STR = {
54    __DFU_STATE_APP_IDLE: "STATE_APP_IDLE",
55    __DFU_STATE_APP_DETACH: "STATE_APP_DETACH",
56    __DFU_STATE_DFU_IDLE: "STATE_DFU_IDLE",
57    __DFU_STATE_DFU_DOWNLOAD_SYNC: "STATE_DFU_DOWNLOAD_SYNC",
58    __DFU_STATE_DFU_DOWNLOAD_BUSY: "STATE_DFU_DOWNLOAD_BUSY",
59    __DFU_STATE_DFU_DOWNLOAD_IDLE: "STATE_DFU_DOWNLOAD_IDLE",
60    __DFU_STATE_DFU_MANIFEST_SYNC: "STATE_DFU_MANIFEST_SYNC",
61    __DFU_STATE_DFU_MANIFEST: "STATE_DFU_MANIFEST",
62    __DFU_STATE_DFU_MANIFEST_WAIT_RESET: "STATE_DFU_MANIFEST_WAIT_RESET",
63    __DFU_STATE_DFU_UPLOAD_IDLE: "STATE_DFU_UPLOAD_IDLE",
64    __DFU_STATE_DFU_ERROR: "STATE_DFU_ERROR",
65}
66
67# USB device handle
68__dev = None
69
70# Configuration descriptor of the device
71__cfg_descr = None
72
73__verbose = None
74
75# USB DFU interface
76__DFU_INTERFACE = 0
77
78# Python 3 deprecated getargspec in favour of getfullargspec, but
79# Python 2 doesn't have the latter, so detect which one to use
80getargspec = getattr(inspect, "getfullargspec", inspect.getargspec)
81
82if "length" in getargspec(usb.util.get_string).args:
83    # PyUSB 1.0.0.b1 has the length argument
84    def get_string(dev, index):
85        return usb.util.get_string(dev, 255, index)
86
87
88else:
89    # PyUSB 1.0.0.b2 dropped the length argument
90    def get_string(dev, index):
91        return usb.util.get_string(dev, index)
92
93
94def find_dfu_cfg_descr(descr):
95    if len(descr) == 9 and descr[0] == 9 and descr[1] == _DFU_DESCRIPTOR_TYPE:
96        nt = collections.namedtuple(
97            "CfgDescr",
98            [
99                "bLength",
100                "bDescriptorType",
101                "bmAttributes",
102                "wDetachTimeOut",
103                "wTransferSize",
104                "bcdDFUVersion",
105            ],
106        )
107        return nt(*struct.unpack("<BBBHHH", bytearray(descr)))
108    return None
109
110
111def init(**kwargs):
112    """Initializes the found DFU device so that we can program it."""
113    global __dev, __cfg_descr
114    devices = get_dfu_devices(**kwargs)
115    if not devices:
116        raise ValueError("No DFU device found")
117    if len(devices) > 1:
118        raise ValueError("Multiple DFU devices found")
119    __dev = devices[0]
120    __dev.set_configuration()
121
122    # Claim DFU interface
123    usb.util.claim_interface(__dev, __DFU_INTERFACE)
124
125    # Find the DFU configuration descriptor, either in the device or interfaces
126    __cfg_descr = None
127    for cfg in __dev.configurations():
128        __cfg_descr = find_dfu_cfg_descr(cfg.extra_descriptors)
129        if __cfg_descr:
130            break
131        for itf in cfg.interfaces():
132            __cfg_descr = find_dfu_cfg_descr(itf.extra_descriptors)
133            if __cfg_descr:
134                break
135
136    # Get device into idle state
137    for attempt in range(4):
138        status = get_status()
139        if status == __DFU_STATE_DFU_IDLE:
140            break
141        elif status == __DFU_STATE_DFU_DOWNLOAD_IDLE or status == __DFU_STATE_DFU_UPLOAD_IDLE:
142            abort_request()
143        else:
144            clr_status()
145
146
147def abort_request():
148    """Sends an abort request."""
149    __dev.ctrl_transfer(0x21, __DFU_ABORT, 0, __DFU_INTERFACE, None, __TIMEOUT)
150
151
152def clr_status():
153    """Clears any error status (perhaps left over from a previous session)."""
154    __dev.ctrl_transfer(0x21, __DFU_CLRSTATUS, 0, __DFU_INTERFACE, None, __TIMEOUT)
155
156
157def get_status():
158    """Get the status of the last operation."""
159    stat = __dev.ctrl_transfer(0xA1, __DFU_GETSTATUS, 0, __DFU_INTERFACE, 6, 20000)
160
161    # firmware can provide an optional string for any error
162    if stat[5]:
163        message = get_string(__dev, stat[5])
164        if message:
165            print(message)
166
167    return stat[4]
168
169
170def check_status(stage, expected):
171    status = get_status()
172    if status != expected:
173        raise SystemExit("DFU: %s failed (%s)" % (stage, __DFU_STATUS_STR.get(status, status)))
174
175
176def mass_erase():
177    """Performs a MASS erase (i.e. erases the entire device)."""
178    # Send DNLOAD with first byte=0x41
179    __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, "\x41", __TIMEOUT)
180
181    # Execute last command
182    check_status("erase", __DFU_STATE_DFU_DOWNLOAD_BUSY)
183
184    # Check command state
185    check_status("erase", __DFU_STATE_DFU_DOWNLOAD_IDLE)
186
187
188def page_erase(addr):
189    """Erases a single page."""
190    if __verbose:
191        print("Erasing page: 0x%x..." % (addr))
192
193    # Send DNLOAD with first byte=0x41 and page address
194    buf = struct.pack("<BI", 0x41, addr)
195    __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, __TIMEOUT)
196
197    # Execute last command
198    check_status("erase", __DFU_STATE_DFU_DOWNLOAD_BUSY)
199
200    # Check command state
201    check_status("erase", __DFU_STATE_DFU_DOWNLOAD_IDLE)
202
203
204def set_address(addr):
205    """Sets the address for the next operation."""
206    # Send DNLOAD with first byte=0x21 and page address
207    buf = struct.pack("<BI", 0x21, addr)
208    __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, __TIMEOUT)
209
210    # Execute last command
211    check_status("set address", __DFU_STATE_DFU_DOWNLOAD_BUSY)
212
213    # Check command state
214    check_status("set address", __DFU_STATE_DFU_DOWNLOAD_IDLE)
215
216
217def write_memory(addr, buf, progress=None, progress_addr=0, progress_size=0):
218    """Writes a buffer into memory. This routine assumes that memory has
219    already been erased.
220    """
221
222    xfer_count = 0
223    xfer_bytes = 0
224    xfer_total = len(buf)
225    xfer_base = addr
226
227    while xfer_bytes < xfer_total:
228        if __verbose and xfer_count % 512 == 0:
229            print(
230                "Addr 0x%x %dKBs/%dKBs..."
231                % (xfer_base + xfer_bytes, xfer_bytes // 1024, xfer_total // 1024)
232            )
233        if progress and xfer_count % 2 == 0:
234            progress(progress_addr, xfer_base + xfer_bytes - progress_addr, progress_size)
235
236        # Set mem write address
237        set_address(xfer_base + xfer_bytes)
238
239        # Send DNLOAD with fw data
240        chunk = min(__cfg_descr.wTransferSize, xfer_total - xfer_bytes)
241        __dev.ctrl_transfer(
242            0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf[xfer_bytes : xfer_bytes + chunk], __TIMEOUT
243        )
244
245        # Execute last command
246        check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_BUSY)
247
248        # Check command state
249        check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_IDLE)
250
251        xfer_count += 1
252        xfer_bytes += chunk
253
254
255def write_page(buf, xfer_offset):
256    """Writes a single page. This routine assumes that memory has already
257    been erased.
258    """
259
260    xfer_base = 0x08000000
261
262    # Set mem write address
263    set_address(xfer_base + xfer_offset)
264
265    # Send DNLOAD with fw data
266    __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf, __TIMEOUT)
267
268    # Execute last command
269    check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_BUSY)
270
271    # Check command state
272    check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_IDLE)
273
274    if __verbose:
275        print("Write: 0x%x " % (xfer_base + xfer_offset))
276
277
278def exit_dfu():
279    """Exit DFU mode, and start running the program."""
280    # Set jump address
281    set_address(0x08000000)
282
283    # Send DNLOAD with 0 length to exit DFU
284    __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, None, __TIMEOUT)
285
286    try:
287        # Execute last command
288        if get_status() != __DFU_STATE_DFU_MANIFEST:
289            print("Failed to reset device")
290
291        # Release device
292        usb.util.dispose_resources(__dev)
293    except:
294        pass
295
296
297def named(values, names):
298    """Creates a dict with `names` as fields, and `values` as values."""
299    return dict(zip(names.split(), values))
300
301
302def consume(fmt, data, names):
303    """Parses the struct defined by `fmt` from `data`, stores the parsed fields
304    into a named tuple using `names`. Returns the named tuple, and the data
305    with the struct stripped off."""
306
307    size = struct.calcsize(fmt)
308    return named(struct.unpack(fmt, data[:size]), names), data[size:]
309
310
311def cstring(string):
312    """Extracts a null-terminated string from a byte array."""
313    return string.decode("utf-8").split("\0", 1)[0]
314
315
316def compute_crc(data):
317    """Computes the CRC32 value for the data passed in."""
318    return 0xFFFFFFFF & -zlib.crc32(data) - 1
319
320
321def read_dfu_file(filename):
322    """Reads a DFU file, and parses the individual elements from the file.
323    Returns an array of elements. Each element is a dictionary with the
324    following keys:
325        num     - The element index.
326        address - The address that the element data should be written to.
327        size    - The size of the element data.
328        data    - The element data.
329    If an error occurs while parsing the file, then None is returned.
330    """
331
332    print("File: {}".format(filename))
333    with open(filename, "rb") as fin:
334        data = fin.read()
335    crc = compute_crc(data[:-4])
336    elements = []
337
338    # Decode the DFU Prefix
339    #
340    # <5sBIB
341    #   <   little endian           Endianness
342    #   5s  char[5]     signature   "DfuSe"
343    #   B   uint8_t     version     1
344    #   I   uint32_t    size        Size of the DFU file (without suffix)
345    #   B   uint8_t     targets     Number of targets
346    dfu_prefix, data = consume("<5sBIB", data, "signature version size targets")
347    print(
348        "    %(signature)s v%(version)d, image size: %(size)d, "
349        "targets: %(targets)d" % dfu_prefix
350    )
351    for target_idx in range(dfu_prefix["targets"]):
352        # Decode the Image Prefix
353        #
354        # <6sBI255s2I
355        #   <       little endian           Endianness
356        #   6s      char[6]     signature   "Target"
357        #   B       uint8_t     altsetting
358        #   I       uint32_t    named       Bool indicating if a name was used
359        #   255s    char[255]   name        Name of the target
360        #   I       uint32_t    size        Size of image (without prefix)
361        #   I       uint32_t    elements    Number of elements in the image
362        img_prefix, data = consume(
363            "<6sBI255s2I", data, "signature altsetting named name " "size elements"
364        )
365        img_prefix["num"] = target_idx
366        if img_prefix["named"]:
367            img_prefix["name"] = cstring(img_prefix["name"])
368        else:
369            img_prefix["name"] = ""
370        print(
371            "    %(signature)s %(num)d, alt setting: %(altsetting)s, "
372            'name: "%(name)s", size: %(size)d, elements: %(elements)d' % img_prefix
373        )
374
375        target_size = img_prefix["size"]
376        target_data = data[:target_size]
377        data = data[target_size:]
378        for elem_idx in range(img_prefix["elements"]):
379            # Decode target prefix
380            #
381            # <2I
382            #   <   little endian           Endianness
383            #   I   uint32_t    element     Address
384            #   I   uint32_t    element     Size
385            elem_prefix, target_data = consume("<2I", target_data, "addr size")
386            elem_prefix["num"] = elem_idx
387            print("      %(num)d, address: 0x%(addr)08x, size: %(size)d" % elem_prefix)
388            elem_size = elem_prefix["size"]
389            elem_data = target_data[:elem_size]
390            target_data = target_data[elem_size:]
391            elem_prefix["data"] = elem_data
392            elements.append(elem_prefix)
393
394        if len(target_data):
395            print("target %d PARSE ERROR" % target_idx)
396
397    # Decode DFU Suffix
398    #
399    # <4H3sBI
400    #   <   little endian           Endianness
401    #   H   uint16_t    device      Firmware version
402    #   H   uint16_t    product
403    #   H   uint16_t    vendor
404    #   H   uint16_t    dfu         0x11a   (DFU file format version)
405    #   3s  char[3]     ufd         "UFD"
406    #   B   uint8_t     len         16
407    #   I   uint32_t    crc32       Checksum
408    dfu_suffix = named(
409        struct.unpack("<4H3sBI", data[:16]), "device product vendor dfu ufd len crc"
410    )
411    print(
412        "    usb: %(vendor)04x:%(product)04x, device: 0x%(device)04x, "
413        "dfu: 0x%(dfu)04x, %(ufd)s, %(len)d, 0x%(crc)08x" % dfu_suffix
414    )
415    if crc != dfu_suffix["crc"]:
416        print("CRC ERROR: computed crc32 is 0x%08x" % crc)
417        return
418    data = data[16:]
419    if data:
420        print("PARSE ERROR")
421        return
422
423    return elements
424
425
426class FilterDFU(object):
427    """Class for filtering USB devices to identify devices which are in DFU
428    mode.
429    """
430
431    def __call__(self, device):
432        for cfg in device:
433            for intf in cfg:
434                return intf.bInterfaceClass == 0xFE and intf.bInterfaceSubClass == 1
435
436
437def get_dfu_devices(*args, **kwargs):
438    """Returns a list of USB devices which are currently in DFU mode.
439    Additional filters (like idProduct and idVendor) can be passed in
440    to refine the search.
441    """
442
443    # Convert to list for compatibility with newer PyUSB
444    return list(usb.core.find(*args, find_all=True, custom_match=FilterDFU(), **kwargs))
445
446
447def get_memory_layout(device):
448    """Returns an array which identifies the memory layout. Each entry
449    of the array will contain a dictionary with the following keys:
450        addr        - Address of this memory segment.
451        last_addr   - Last address contained within the memory segment.
452        size        - Size of the segment, in bytes.
453        num_pages   - Number of pages in the segment.
454        page_size   - Size of each page, in bytes.
455    """
456
457    cfg = device[0]
458    intf = cfg[(0, 0)]
459    mem_layout_str = get_string(device, intf.iInterface)
460    mem_layout = mem_layout_str.split("/")
461    result = []
462    for mem_layout_index in range(1, len(mem_layout), 2):
463        addr = int(mem_layout[mem_layout_index], 0)
464        segments = mem_layout[mem_layout_index + 1].split(",")
465        seg_re = re.compile(r"(\d+)\*(\d+)(.)(.)")
466        for segment in segments:
467            seg_match = seg_re.match(segment)
468            num_pages = int(seg_match.groups()[0], 10)
469            page_size = int(seg_match.groups()[1], 10)
470            multiplier = seg_match.groups()[2]
471            if multiplier == "K":
472                page_size *= 1024
473            if multiplier == "M":
474                page_size *= 1024 * 1024
475            size = num_pages * page_size
476            last_addr = addr + size - 1
477            result.append(
478                named(
479                    (addr, last_addr, size, num_pages, page_size),
480                    "addr last_addr size num_pages page_size",
481                )
482            )
483            addr += size
484    return result
485
486
487def list_dfu_devices(*args, **kwargs):
488    """Prints a lits of devices detected in DFU mode."""
489    devices = get_dfu_devices(*args, **kwargs)
490    if not devices:
491        raise SystemExit("No DFU capable devices found")
492    for device in devices:
493        print(
494            "Bus {} Device {:03d}: ID {:04x}:{:04x}".format(
495                device.bus, device.address, device.idVendor, device.idProduct
496            )
497        )
498        layout = get_memory_layout(device)
499        print("Memory Layout")
500        for entry in layout:
501            print(
502                "    0x{:x} {:2d} pages of {:3d}K bytes".format(
503                    entry["addr"], entry["num_pages"], entry["page_size"] // 1024
504                )
505            )
506
507
508def write_elements(elements, mass_erase_used, progress=None):
509    """Writes the indicated elements into the target memory,
510    erasing as needed.
511    """
512
513    mem_layout = get_memory_layout(__dev)
514    for elem in elements:
515        addr = elem["addr"]
516        size = elem["size"]
517        data = elem["data"]
518        elem_size = size
519        elem_addr = addr
520        if progress and elem_size:
521            progress(elem_addr, 0, elem_size)
522        while size > 0:
523            write_size = size
524            if not mass_erase_used:
525                for segment in mem_layout:
526                    if addr >= segment["addr"] and addr <= segment["last_addr"]:
527                        # We found the page containing the address we want to
528                        # write, erase it
529                        page_size = segment["page_size"]
530                        page_addr = addr & ~(page_size - 1)
531                        if addr + write_size > page_addr + page_size:
532                            write_size = page_addr + page_size - addr
533                        page_erase(page_addr)
534                        break
535            write_memory(addr, data[:write_size], progress, elem_addr, elem_size)
536            data = data[write_size:]
537            addr += write_size
538            size -= write_size
539            if progress:
540                progress(elem_addr, addr - elem_addr, elem_size)
541
542
543def cli_progress(addr, offset, size):
544    """Prints a progress report suitable for use on the command line."""
545    width = 25
546    done = offset * width // size
547    print(
548        "\r0x{:08x} {:7d} [{}{}] {:3d}% ".format(
549            addr, size, "=" * done, " " * (width - done), offset * 100 // size
550        ),
551        end="",
552    )
553    try:
554        sys.stdout.flush()
555    except OSError:
556        pass  # Ignore Windows CLI "WinError 87" on Python 3.6
557    if offset == size:
558        print("")
559
560
561def main():
562    """Test program for verifying this files functionality."""
563    global __verbose
564    # Parse CMD args
565    parser = argparse.ArgumentParser(description="DFU Python Util")
566    parser.add_argument(
567        "-l", "--list", help="list available DFU devices", action="store_true", default=False
568    )
569    parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=None)
570    parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=None)
571    parser.add_argument(
572        "-m", "--mass-erase", help="mass erase device", action="store_true", default=False
573    )
574    parser.add_argument(
575        "-u", "--upload", help="read file from DFU device", dest="path", default=False
576    )
577    parser.add_argument("-x", "--exit", help="Exit DFU", action="store_true", default=False)
578    parser.add_argument(
579        "-v", "--verbose", help="increase output verbosity", action="store_true", default=False
580    )
581    args = parser.parse_args()
582
583    __verbose = args.verbose
584
585    kwargs = {}
586    if args.vid:
587        kwargs["idVendor"] = args.vid
588
589    if args.pid:
590        kwargs["idProduct"] = args.pid
591
592    if args.list:
593        list_dfu_devices(**kwargs)
594        return
595
596    init(**kwargs)
597
598    command_run = False
599    if args.mass_erase:
600        print("Mass erase...")
601        mass_erase()
602        command_run = True
603
604    if args.path:
605        elements = read_dfu_file(args.path)
606        if not elements:
607            print("No data in dfu file")
608            return
609        print("Writing memory...")
610        write_elements(elements, args.mass_erase, progress=cli_progress)
611
612        print("Exiting DFU...")
613        exit_dfu()
614        command_run = True
615
616    if args.exit:
617        print("Exiting DFU...")
618        exit_dfu()
619        command_run = True
620
621    if command_run:
622        print("Finished")
623    else:
624        print("No command specified")
625
626
627if __name__ == "__main__":
628    main()
629