1#! /usr/bin/env python
2
3"""
4 Copyright 2016 Google Inc. All Rights Reserved.
5 Author: gkalsi@google.com (Gurjant Kalsi)
6
7 Permission is hereby granted, free of charge, to any person obtaining
8 a copy of this software and associated documentation files
9 (the "Software"), to deal in the Software without restriction,
10 including without limitation the rights to use, copy, modify, merge,
11 publish, distribute, sublicense, and/or sell copies of the Software,
12 and to permit persons to whom the Software is furnished to do so,
13 subject to the following conditions:
14
15 The above copyright notice and this permission notice shall be
16 included in all copies or substantial portions of the Software.
17
18 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
21 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22 CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
23 TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
24 SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25"""
26import argparse
27import binascii
28import logging
29import struct
30import time
31
32import usb.core
33import usb.util
34
35
36class Command:
37    flash   = 0x01
38    boot    = 0x02
39    devinfo = 0x03
40
41
42class DataPhaseType:
43    none           = 0
44    host_to_device = 1
45    device_to_host = 2
46
47
48class Retcode:
49    # Normal operation
50    no_error            = (0x00)
51    xmit_ready          = (0x01)
52    recv_ready          = (0x02)
53
54    # Malformed reqeust
55    bad_data_len        = (0xAAA0)
56    bad_magic           = (0xAAA1)
57    unknown_command     = (0xAAA2)
58
59    # Device side system error.
60    sys_image_too_big   = (0xFFF1)
61    err_open_sys_flash  = (0xFFF2)
62    err_erase_sys_flash = (0xFFF3)
63    err_write_sys_flash = (0xFFF4)
64    cant_find_buildsig  = (0xFFF5)
65    usb_read_error      = (0xFFF6)
66
67
68class CommandParam:
69    def __init__(self, data_phase_type):
70        self.data_phase_type = data_phase_type
71
72VENDOR_ID = 0x18D1
73PRODUCT_ID = 0xA010
74CLASS_VENDOR_SPECIFIC = 0xFF
75SUBCLASS_MTLDR_DEBUG = 0x01
76
77# create logger
78logger = logging.getLogger('mtldr')
79logger.setLevel(logging.INFO)
80
81
82class FindByDeviceClass(object):
83    # Callable object that selects a USB device by a Sub/Device class pair
84
85    def __init__(self, device, subdevice):
86        self._device_class = device
87        self._subdevice_class = subdevice
88
89    def __call__(self, device):
90        for cfg in device:
91            intf = usb.util.find_descriptor(
92                cfg, bInterfaceClass=self._device_class,
93                bInterfaceSubClass=self._subdevice_class)
94            if intf:
95                return True
96        return False
97
98
99class CommandDispatcher:
100    header_struct = struct.Struct("< i i i")
101    header_struct_len = 12  # bytes
102
103    response_struct = struct.Struct("< i i i")
104    response_struct_len = 12  # bytes
105
106    cmd_magic = 0x4d4f4f54
107    resp_magic = 0x52455350
108
109    data_phase_timeout = 10000  # ms
110
111    CommandParams = {
112        Command.flash: CommandParam(DataPhaseType.host_to_device),
113        Command.boot: CommandParam(DataPhaseType.none),
114        Command.devinfo: CommandParam(DataPhaseType.device_to_host),
115    }
116
117    def __init__(self, ep_in, ep_out):
118        self._ep_in = ep_in
119        self._ep_out = ep_out
120
121    def __read_response(self, timeout=None):
122        """
123        reads a standard response from the USB device.
124        """
125        if timeout:
126            resp = self._ep_in.read(CommandDispatcher.response_struct_len, timeout=timeout).tostring()
127        else:
128            resp = self._ep_in.read(CommandDispatcher.response_struct_len).tostring()
129
130        logger.debug(
131            ("Read %d bytes: " % CommandDispatcher.response_struct_len) +
132            str(binascii.hexlify(resp))
133        )
134
135        resp = CommandDispatcher.response_struct.unpack(resp)
136        if resp[0] != CommandDispatcher.resp_magic:
137            raise("Device responded with an unexpected magic value.")
138
139        logger.debug(
140            ("Read Response - retcode = %d, nbytes = %d" % (resp[1], resp[2]))
141        )
142
143        return (resp[1], int(resp[2]))
144
145    def __dispatch_no_data(self, command):
146        """
147        Dispatches a command that has no data phase.
148        """
149        logger.debug("Write %d bytes, command = %d" % (0, command))
150        command = CommandDispatcher.header_struct.pack(CommandDispatcher.cmd_magic, command, 0)
151        self._ep_out.write(command)
152
153        retcode, datalen = self.__read_response()
154        assert datalen == 0  # A command with no data can't have a datalen
155
156        return (retcode, list())
157
158    def __dispatch_device_to_host(self, command):
159        """
160        Dispatches a command that has a device to host data phase.
161        """
162        logger.debug("Write %d bytes, command = %d" % (0, command))
163        command = CommandDispatcher.header_struct.pack(CommandDispatcher.cmd_magic, command, 0)
164        self._ep_out.write(command)
165
166        retcode, datalen = self.__read_response()
167        if retcode != Retcode.xmit_ready:
168            return (retcode, list())
169
170        logger.debug("Read %d bytes, retcode = %d" % (int(datalen), retcode))
171        resp = self._ep_in.read(int(datalen), timeout=CommandDispatcher.data_phase_timeout)
172
173        retcode, datalen = self.__read_response()
174
175        return (retcode, resp)
176
177    def __dispatch_host_to_device(self, command, data):
178        """
179        Dispatches a command that has a host to device data phase.
180        """
181        logger.debug("Write %d bytes, command = %d" % (len(data), command))
182        # Tell the device that we're about to send it data. Also mention how
183        # much data we're about to send.
184        command = CommandDispatcher.header_struct.pack(CommandDispatcher.cmd_magic, command, len(data))
185        self._ep_out.write(command)
186
187        # The device will signal back to us that it's ready to read data.
188        retcode, datalen = self.__read_response(CommandDispatcher.data_phase_timeout)
189        assert datalen == 0
190
191        if retcode != Retcode.recv_ready:
192            # The device experienced an error and is not ready to receive data.
193            return (retcode, list())
194
195        # Write the data back to the device.
196        self._ep_out.write(data, timeout=CommandDispatcher.data_phase_timeout)
197
198        # The device will reply to us to let us know whether or not it received
199        # our data correctly.
200        retcode, datalen = self.__read_response()
201        return (retcode, list())
202
203    def dispatch(self, command, data=None):
204        """
205        Dispatches a command to the connected USB device.
206
207        A command is composed of a command phase followed by an optional data
208        phase. If the data parameter is specified, dispatch(...) will also
209        attempt to send data to the device.
210
211        Returns a 2-Tuple as follows: (command result, optional data). If the
212        device returned data during the data phase, optional data will contain
213        that data, otherwise it will be None.
214        """
215        # Make sure the command actually exists.
216        params = CommandDispatcher.CommandParams.get(command)
217        if not params:
218            raise("Command " + str(command) + " does not exist.")
219
220        if params.data_phase_type == DataPhaseType.none:
221            result = self.__dispatch_no_data(command)
222        elif params.data_phase_type == DataPhaseType.host_to_device:
223            result = self.__dispatch_host_to_device(command, data)
224        elif params.data_phase_type == DataPhaseType.device_to_host:
225            result = self.__dispatch_device_to_host(command)
226
227        return result
228
229
230def cmd_devinfo(dispatcher, args):
231    retcode, data = dispatcher.dispatch(Command.devinfo)
232
233    if retcode != Retcode.no_error:
234        print ("Error %d while reading devinfo" % retcode)
235    else:
236        print data.tostring()
237
238
239def cmd_flash(dispatcher, args):
240    with open(args.bin, 'rb') as file:
241        binary = file.read()
242
243    retcode, data = dispatcher.dispatch(Command.flash, binary)
244    if retcode != Retcode.no_error:
245        print ("Error %d while flashing device" % retcode)
246
247
248
249def cmd_boot(dispatcher, args):
250    retcode, data = dispatcher.dispatch(Command.boot)
251    if retcode != Retcode.no_error:
252        print ("Error %d while booting device" % retcode)
253
254
255def main():
256    # Setup the Logger
257    ch = logging.StreamHandler()
258    ch.setLevel(logging.DEBUG)
259    formatter = logging.Formatter(
260        '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
261    ch.setFormatter(formatter)
262    logger.addHandler(ch)
263
264    # Setup the argument parser
265    parser = argparse.ArgumentParser(prog='PROG')
266    subparsers = parser.add_subparsers(help='sub-command help')
267
268    devinfo_parser = subparsers.add_parser('devinfo', help='a help')
269    devinfo_parser.set_defaults(func=cmd_devinfo)
270
271    flash_parser = subparsers.add_parser('flash', help='b help')
272    flash_parser.add_argument('bin', help="Path to the LK Binary to flash")
273    flash_parser.set_defaults(func=cmd_flash)
274
275    boot_parser = subparsers.add_parser('boot', help='b help')
276    boot_parser.set_defaults(func=cmd_boot)
277
278    args = parser.parse_args()
279
280    logger.info("Waiting for device (vid=0x%04x, pid=0x%04x, "
281                "class=0x%02x, subclass=0x%02x)" %
282                (VENDOR_ID, PRODUCT_ID, CLASS_VENDOR_SPECIFIC,
283                 SUBCLASS_MTLDR_DEBUG))
284    while True:
285        dev = usb.core.find(
286            idVendor=VENDOR_ID,
287            idProduct=PRODUCT_ID,
288            custom_match=FindByDeviceClass(CLASS_VENDOR_SPECIFIC,
289                                           SUBCLASS_MTLDR_DEBUG))
290        if dev:
291            break
292        time.sleep(0.5)
293
294    logger.info("Found USB Device!")
295    dev.set_configuration()
296
297    cfg = dev.get_active_configuration()
298    intf = usb.util.find_descriptor(
299        cfg, bInterfaceClass=CLASS_VENDOR_SPECIFIC,
300        bInterfaceSubClass=SUBCLASS_MTLDR_DEBUG)
301
302    ep_out = usb.util.find_descriptor(
303        intf,
304        custom_match=lambda e:
305        usb.util.endpoint_direction(e.bEndpointAddress) ==
306        usb.util.ENDPOINT_OUT)
307    ep_in = usb.util.find_descriptor(
308        intf,
309        custom_match=lambda e:
310        usb.util.endpoint_direction(e.bEndpointAddress) ==
311        usb.util.ENDPOINT_IN)
312
313    dispatcher = CommandDispatcher(ep_in, ep_out)
314    args.func(dispatcher, args)
315
316
317if __name__ == "__main__":
318    main()
319