1# NOTE: This script is test under Python 3.x
2
3__copyright__ = "Copyright (C) 2020~2021 Nuvoton Technology Corp. All rights reserved"
4
5import sys
6import usb.core
7import usb.util
8import json
9import typing
10
11XFER_LEN_CMD = 0x0012
12GET_INFO_CMD = 0x0005
13
14
15class XUsbCom:
16
17    def __init__(self, _dev):
18        self.dev = _dev
19        self.write_addr = 0x1
20        self.read_addr = 0x81
21        self.attach = False
22        self.id = 0
23        self.info = b''
24
25    def write(self, data) -> None:
26        try:
27            # Vendor command set transfer length
28            self.dev.ctrl_transfer(0x40, 0xA0, wValue=XFER_LEN_CMD, wIndex=len(data), data_or_wLength='')
29            # Actual data
30            self.dev.write(self.write_addr, data, timeout=1000)
31        except usb.core.USBError as err:
32            sys.exit(err)
33
34    def read(self, size) -> bytes:
35        try:
36            buf = self.dev.read(self.read_addr, size, timeout=1000)
37        except usb.core.USBError as err:
38            sys.exit(err)
39        return buf
40
41    def set_media(self, media) -> None:
42        try:
43            # Vendor command set type
44            self.dev.ctrl_transfer(0x40, 0xB0, wValue=media, wIndex=0, data_or_wLength='')
45        except usb.core.USBError as err:
46            sys.exit(err)
47        return
48
49    def get_info(self, data) -> bytes:
50        try:
51            self.dev.ctrl_transfer(0x40, 0xB0, wValue=GET_INFO_CMD, wIndex=0, data_or_wLength='')
52            self.dev.ctrl_transfer(0x40, 0xA0, wValue=XFER_LEN_CMD, wIndex=76, data_or_wLength='')
53            self.dev.write(0x01, data, timeout=1000)
54            self.info = self.dev.read(0x81, 76, timeout=1000)
55            # not used
56            self.dev.read(0x81, 4, timeout=5000)
57
58        except usb.core.USBError as err:
59            sys.exit(err)
60        return self.info
61
62    def set_id(self, i) -> None:
63        self.id = i
64
65    def get_id(self) -> int:
66        return self.id
67
68    @staticmethod
69    def set_align(nand, spinand) -> None:
70        # See if we need to overwrite existing json file.
71        overwrite = False
72        try:
73            with open(".config", "r") as json_file:
74                cfg = json.load(json_file)
75            for key in cfg.keys():
76                if key == 'nand_align':
77                    if nand != int(cfg['nand_align']):
78                        overwrite = True
79                elif key == 'spinand_align':
80                    if spinand != int(cfg['spinand_align']):
81                        overwrite = True
82        except (IOError, OSError, json.decoder.JSONDecodeError) as err:
83            overwrite = True
84
85        if overwrite is True:
86            try:
87                with open(".config", "w+") as json_file:
88                    new_key = {'nand_align': nand, 'spinand_align': spinand}
89                    json.dump(new_key, json_file)
90            except (IOError, OSError) as err:
91                print("Write .config failed. Please re-attach")
92                sys.exit(err)
93
94    @staticmethod
95    def get_align() -> typing.Tuple[int, int]:
96        try:
97            with open(".config", "r") as json_file:
98                cfg = json.load(json_file)
99        except (IOError, OSError, json.decoder.JSONDecodeError) as err:
100            print("Open/parsing .config failed. Please re-attach")
101            sys.exit(err)
102        nand_align = spinand_align = 0
103        for key in cfg.keys():
104            if key == 'nand_align':
105                nand_align = int(cfg['nand_align'])
106            elif key == 'spinand_align':
107                spinand_align = int(cfg['spinand_align'])
108        return nand_align, spinand_align
109
110
111class XUsbComList:
112
113    def __init__(self, attach_all=False):
114        vid = 0x0416
115        pid = 0x5963
116        try:
117            self.devices = list(usb.core.find(idVendor=vid, idProduct=pid,
118                                              find_all=True if attach_all is True else False))
119        except TypeError:
120            # list will raise exception if there's no device
121            self.devices = []
122            return
123        except usb.core.NoBackendError as err:
124            sys.exit(err)
125
126        if len(self.devices) != 0 and attach_all is False:
127            # Object type return on attach_all == False is different
128            self.devices[0] = self.devices[0].device
129
130        for dev in self.devices:
131            try:
132                dev.set_configuration()
133            except (usb.core.USBError, NotImplementedError) as err:
134                sys.exit(err)
135
136        for i in range(0, len(self.devices)):
137            self.devices[i] = XUsbCom(self.devices[i])
138            self.devices[i].set_id(i)
139
140    def __del__(self):
141        if len(self.devices) != 0:
142            for dev in self.devices:
143                try:
144                    usb.util.dispose_resources(dev.dev)
145                    # dev.dev.reset()
146                    dev = None
147                #except (usb.core.USBError, NotImplementedError) as err:
148                except usb.core.USBError as err:
149                    sys.exit(err)
150
151        self.devices = None
152
153    def get_dev(self):
154        return self.devices
155