1# SPDX-License-Identifier: GPL-2.0
2# (C) Copyright 2023, Advanced Micro Devices, Inc.
3
4import pytest
5import random
6import string
7import test_net
8
9"""
10Note: This test relies on boardenv_* containing configuration values to define
11RPU applications information for AMD's ZynqMP SoC which contains, application
12names, processors, address where it is built, expected output and the tftp load
13addresses. This test will be automatically skipped without this.
14
15It also relies on dhcp or setup_static net test to support tftp to load
16application on DDR. All the environment parameters are stored sequentially.
17The length of all parameters values should be same. For example, if 2 app_names
18are defined in a list as a value of parameter 'app_name' then the other
19parameters value also should have a list with 2 items.
20It will run RPU cases for all the applications defined in boardenv_*
21configuration file.
22
23Example:
24env__zynqmp_rpu_apps = {
25    'app_name': ['hello_world_r5_0_ddr.elf', 'hello_world_r5_1_ddr.elf'],
26    'proc': ['rpu0', 'rpu1'],
27    'cpu_num': [4, 5],
28    'addr': [0xA00000, 0xB00000],
29    'output': ['Successfully ran Hello World application on DDR from RPU0',
30               'Successfully ran Hello World application on DDR from RPU1'],
31    'tftp_addr': [0x100000, 0x200000],
32}
33"""
34
35# Get rpu apps params from env
36def get_rpu_apps_env(ubman):
37    rpu_apps = ubman.config.env.get('env__zynqmp_rpu_apps', False)
38    if not rpu_apps:
39        pytest.skip('ZynqMP RPU application info not defined!')
40
41    apps = rpu_apps.get('app_name', None)
42    if not apps:
43        pytest.skip('No RPU application found!')
44
45    procs = rpu_apps.get('proc', None)
46    if not procs:
47        pytest.skip('No RPU application processor provided!')
48
49    cpu_nums = rpu_apps.get('cpu_num', None)
50    if not cpu_nums:
51        pytest.skip('No CPU number for respective processor provided!')
52
53    addrs = rpu_apps.get('addr', None)
54    if not addrs:
55        pytest.skip('No RPU application build address found!')
56
57    outputs = rpu_apps.get('output', None)
58    if not outputs:
59        pytest.skip('Expected output not found!')
60
61    tftp_addrs = rpu_apps.get('tftp_addr', None)
62    if not tftp_addrs:
63        pytest.skip('TFTP address to load application not found!')
64
65    return apps, procs, cpu_nums, addrs, outputs, tftp_addrs
66
67# Check return code
68def ret_code(ubman):
69    return ubman.run_command('echo $?')
70
71# Initialize tcm
72def tcminit(ubman, rpu_mode):
73    output = ubman.run_command(f'zynqmp tcminit {rpu_mode}')
74    assert 'Initializing TCM overwrites TCM content' in output
75    return ret_code(ubman)
76
77# Load application in DDR
78def load_app_ddr(ubman, tftp_addr, app):
79    output = ubman.run_command('tftpboot %x %s' % (tftp_addr, app))
80    assert 'TIMEOUT' not in output
81    assert 'Bytes transferred = ' in output
82
83    # Load elf
84    ubman.run_command('bootelf -p %x' % tftp_addr)
85    assert ret_code(ubman).endswith('0')
86
87# Disable cpus
88def disable_cpus(ubman, cpu_nums):
89    for num in cpu_nums:
90        ubman.run_command(f'cpu {num} disable')
91
92# Get random RPU mode between string and integer
93def get_rpu_mode(rpu_mode):
94    if rpu_mode == 0 or rpu_mode == 'lockstep':
95        return random.choice(['lockstep', 0])
96    elif rpu_mode == 1 or rpu_mode == 'split':
97        return random.choice(['split', 1])
98
99# Load apps on RPU cores
100def rpu_apps_load(ubman, rpu_mode):
101    apps, procs, cpu_nums, addrs, outputs, tftp_addrs = get_rpu_apps_env(
102        ubman)
103    test_net.test_net_dhcp(ubman)
104    if not test_net.net_set_up:
105        test_net.test_net_setup_static(ubman)
106
107    try:
108        assert tcminit(ubman, get_rpu_mode(rpu_mode)).endswith('0')
109
110        for i in range(len(apps)):
111            if rpu_mode == 'lockstep' and procs[i] != 'rpu0':
112                continue
113
114            load_app_ddr(ubman, tftp_addrs[i], apps[i])
115            rel_addr = hex(int(addrs[i] + 0x3C))
116
117            # Release cpu at app load address
118            cpu_num = cpu_nums[i]
119            cmd = f'cpu {cpu_num} release {rel_addr} {rpu_mode}'
120            output = ubman.run_command(cmd)
121            exp_op = f'Using TCM jump trampoline for address {rel_addr}'
122            assert exp_op in output
123            assert f'R5 {rpu_mode} mode' in output
124            ubman.wait_for(outputs[i])
125            assert ret_code(ubman).endswith('0')
126    finally:
127        disable_cpus(ubman, cpu_nums)
128
129@pytest.mark.buildconfigspec('cmd_zynqmp')
130def test_zynqmp_rpu_app_load_split(ubman):
131    rpu_apps_load(ubman, 'split')
132
133@pytest.mark.buildconfigspec('cmd_zynqmp')
134def test_zynqmp_rpu_app_load_lockstep(ubman):
135    rpu_apps_load(ubman, 'lockstep')
136
137@pytest.mark.buildconfigspec('cmd_zynqmp')
138def test_zynqmp_rpu_app_load_negative(ubman):
139    apps, procs, cpu_nums, addrs, outputs, tftp_addrs = get_rpu_apps_env(
140        ubman)
141
142    # Invalid commands
143    rand_str = ''.join(random.choices(string.ascii_lowercase, k=4))
144    rand_num = random.randint(2, 100)
145    inv_modes = ['mode', rand_str, rand_num, 'splittt', 'locksteppp', '00', 11]
146
147    for mode in inv_modes:
148        ubman.run_command(f'zynqmp tcminit {mode}')
149        assert ret_code(ubman).endswith('1')
150
151    test_net.test_net_dhcp(ubman)
152    if not test_net.net_set_up:
153        test_net.test_net_setup_static(ubman)
154
155    try:
156        rpu_mode = 'split'
157        assert tcminit(ubman, get_rpu_mode(rpu_mode)).endswith('0')
158
159        inv_modes += [0, 1]
160        for i in range(len(apps)):
161            load_app_ddr(ubman, tftp_addrs[i], apps[i])
162
163            # Run in split mode at different load address
164            rel_addr = hex(int(addrs[i]) + random.randint(200, 1000))
165            cpu_num = cpu_nums[i]
166            cmd = f'cpu {cpu_num} release {rel_addr} {rpu_mode}'
167            output = ubman.run_command(cmd)
168            exp_op = f'Using TCM jump trampoline for address {rel_addr}'
169            assert exp_op in output
170            assert f'R5 {rpu_mode} mode' in output
171            assert not outputs[i] in output
172
173            # Invalid rpu mode
174            for mode in inv_modes:
175                cmd = f'cpu {cpu_num} release {rel_addr} {mode}'
176                output = ubman.run_command(cmd)
177                assert exp_op in output
178                assert f'Unsupported mode' in output
179                assert not ret_code(ubman).endswith('0')
180
181        # Switch to lockstep mode, without disabling CPUs
182        rpu_mode = 'lockstep'
183        output = ubman.run_command(
184            f'zynqmp tcminit {get_rpu_mode(rpu_mode)}'
185        )
186        assert 'ERROR: ' in output
187
188        # Disable cpus
189        disable_cpus(ubman, cpu_nums)
190
191        # Switch to lockstep mode, after disabling CPUs
192        output = ubman.run_command(
193            f'zynqmp tcminit {get_rpu_mode(rpu_mode)}'
194        )
195        assert 'Initializing TCM overwrites TCM content' in output
196        assert ret_code(ubman).endswith('0')
197
198        # Run lockstep mode for RPU1/RPU0
199        for i in range(len(apps)):
200            load_app_ddr(ubman, tftp_addrs[i], apps[i])
201            rel_addr = hex(int(addrs[i] + 0x3C))
202            cpu_num = cpu_nums[i]
203            cmd = f'cpu {cpu_num} release {rel_addr} {rpu_mode}'
204            output = ubman.run_command(cmd)
205            exp_op = f'Using TCM jump trampoline for address {rel_addr}'
206            assert exp_op in output
207
208            if procs[i] == 'rpu1':
209                assert 'Lockstep mode should run on ZYNQMP_CORE_RPU0' in output
210                assert not ret_code(ubman).endswith('0')
211            elif procs[i] == 'rpu0':
212                assert f'R5 {rpu_mode} mode' in output
213                ubman.wait_for(outputs[i])
214                assert ret_code(ubman).endswith('0')
215            else:
216                assert False, 'ERROR: Invalid processor!'
217    finally:
218        disable_cpus(ubman, cpu_nums)
219        # This forces the console object to be shutdown, so any subsequent test
220        # will reset the board back into U-Boot.
221        ubman.drain_console()
222        ubman.cleanup_spawn()
223