1# Test for VfsLittle using a RAM device, testing error handling from corrupt block device
2
3try:
4    import uos
5
6    uos.VfsLfs1
7    uos.VfsLfs2
8except (ImportError, AttributeError):
9    print("SKIP")
10    raise SystemExit
11
12
13class RAMBlockDevice:
14    ERASE_BLOCK_SIZE = 1024
15
16    def __init__(self, blocks):
17        self.data = bytearray(blocks * self.ERASE_BLOCK_SIZE)
18        self.ret = 0
19
20    def readblocks(self, block, buf, off):
21        addr = block * self.ERASE_BLOCK_SIZE + off
22        for i in range(len(buf)):
23            buf[i] = self.data[addr + i]
24        return self.ret
25
26    def writeblocks(self, block, buf, off):
27        addr = block * self.ERASE_BLOCK_SIZE + off
28        for i in range(len(buf)):
29            self.data[addr + i] = buf[i]
30        return self.ret
31
32    def ioctl(self, op, arg):
33        if op == 4:  # block count
34            return len(self.data) // self.ERASE_BLOCK_SIZE
35        if op == 5:  # block size
36            return self.ERASE_BLOCK_SIZE
37        if op == 6:  # erase block
38            return 0
39
40
41def corrupt(bdev, block):
42    addr = block * bdev.ERASE_BLOCK_SIZE
43    for i in range(bdev.ERASE_BLOCK_SIZE):
44        bdev.data[addr + i] = i & 0xFF
45
46
47def create_vfs(bdev, vfs_class):
48    bdev.ret = 0
49    vfs_class.mkfs(bdev)
50    vfs = vfs_class(bdev)
51    with vfs.open("f", "w") as f:
52        for i in range(100):
53            f.write("test")
54    return vfs
55
56
57def test(bdev, vfs_class):
58    print("test", vfs_class)
59
60    # statvfs
61    vfs = create_vfs(bdev, vfs_class)
62    corrupt(bdev, 0)
63    corrupt(bdev, 1)
64    try:
65        print(vfs.statvfs(""))
66    except OSError:
67        print("statvfs OSError")
68
69    # error during read
70    vfs = create_vfs(bdev, vfs_class)
71    f = vfs.open("f", "r")
72    bdev.ret = -5  # EIO
73    try:
74        f.read(10)
75    except OSError:
76        print("read OSError")
77
78    # error during write
79    vfs = create_vfs(bdev, vfs_class)
80    f = vfs.open("f", "a")
81    bdev.ret = -5  # EIO
82    try:
83        f.write("test")
84    except OSError:
85        print("write OSError")
86
87    # error during close
88    vfs = create_vfs(bdev, vfs_class)
89    f = vfs.open("f", "w")
90    f.write("test")
91    bdev.ret = -5  # EIO
92    try:
93        f.close()
94    except OSError:
95        print("close OSError")
96
97    # error during flush
98    vfs = create_vfs(bdev, vfs_class)
99    f = vfs.open("f", "w")
100    f.write("test")
101    bdev.ret = -5  # EIO
102    try:
103        f.flush()
104    except OSError:
105        print("flush OSError")
106    bdev.ret = 0
107    f.close()
108
109
110bdev = RAMBlockDevice(30)
111test(bdev, uos.VfsLfs1)
112test(bdev, uos.VfsLfs2)
113