1import os
2
3import infra.basetest
4
5
6class TestNftables(infra.basetest.BRTest):
7    config = \
8        """
9        BR2_aarch64=y
10        BR2_TOOLCHAIN_EXTERNAL=y
11        BR2_TARGET_GENERIC_GETTY_PORT="ttyAMA0"
12        BR2_LINUX_KERNEL=y
13        BR2_LINUX_KERNEL_CUSTOM_VERSION=y
14        BR2_LINUX_KERNEL_CUSTOM_VERSION_VALUE="6.1.46"
15        BR2_LINUX_KERNEL_USE_CUSTOM_CONFIG=y
16        BR2_LINUX_KERNEL_CUSTOM_CONFIG_FILE="board/qemu/aarch64-virt/linux.config"
17        BR2_LINUX_KERNEL_NEEDS_HOST_OPENSSL=y
18        BR2_PACKAGE_NFTABLES=y
19        BR2_PACKAGE_PYTHON3=y
20        BR2_ROOTFS_OVERLAY="{}"
21        BR2_TARGET_ROOTFS_CPIO=y
22        BR2_TARGET_ROOTFS_CPIO_GZIP=y
23        # BR2_TARGET_ROOTFS_TAR is not set
24        """.format(
25                infra.filepath("tests/package/test_nftables/rootfs-overlay"))
26
27    def nftables_test(self, prog="nft"):
28        # Table/Chain names for the test
29        nft_table = "br_ip_table"
30        nft_chain = "br_ip_chain_in"
31
32        # We flush all nftables rules, to start from a known state.
33        self.assertRunOk(f"{prog} flush ruleset")
34
35        # We create an ip table.
36        self.assertRunOk(f"{prog} add table ip {nft_table}")
37
38        # We should be able to list this table.
39        list_cmd = f"{prog} list tables ip"
40        output, exit_code = self.emulator.run(list_cmd)
41        self.assertEqual(exit_code, 0)
42        self.assertIn(nft_table, output[0])
43
44        # We create an ip input chain in our table.
45        cmd = f"{prog} add chain ip"
46        cmd += f" {nft_table} {nft_chain}"
47        cmd += " { type filter hook input priority 0 \\; }"
48        self.assertRunOk(cmd)
49
50        # We list our chain.
51        cmd = f"{prog} list chain ip {nft_table} {nft_chain}"
52        self.assertRunOk(cmd)
53
54        # We add a filter rule to drop pings (icmp echo-requests) to
55        # the 127.0.0.2 destination.
56        cmd = f"{prog} add rule ip {nft_table} {nft_chain}"
57        cmd += " ip daddr 127.0.0.2 icmp type echo-request drop"
58        self.assertRunOk(cmd)
59
60        # We list our rule.
61        self.assertRunOk(f"{prog} list ruleset ip")
62
63        # A ping to 127.0.0.1 is expected to work, because it's not
64        # matching our rule. We expect 3 replies (-c), with 0.5s
65        # internal (-i), and set a maximum timeout of 2s.
66        ping_cmd_prefix = "ping -c 3 -i 0.5 -W 2 "
67        self.assertRunOk(ping_cmd_prefix + "127.0.0.1")
68
69        # A ping to 127.0.0.2 is expected to fail, because our rule is
70        # supposed to drop it.
71        ping_test_cmd = ping_cmd_prefix + "127.0.0.2"
72        _, exit_code = self.emulator.run(ping_test_cmd)
73        self.assertNotEqual(exit_code, 0)
74
75        # We completely delete the table. This should also delete the
76        # chain and the rule.
77        self.assertRunOk(f"{prog} delete table ip {nft_table}")
78
79        # We should no longer see the table in the list.
80        output, exit_code = self.emulator.run(list_cmd)
81        self.assertEqual(exit_code, 0)
82        self.assertNotIn(nft_table, "\n".join(output))
83
84        # Since we deleted the rule, the ping test command which was
85        # supposed to fail earlier is now supposed to succeed.
86        self.assertRunOk(ping_test_cmd)
87
88    def test_run(self):
89        img = os.path.join(self.builddir, "images", "rootfs.cpio.gz")
90        kern = os.path.join(self.builddir, "images", "Image")
91        self.emulator.boot(arch="aarch64",
92                           kernel=kern,
93                           kernel_cmdline=["console=ttyAMA0"],
94                           options=["-M", "virt",
95                                    "-cpu", "cortex-a57",
96                                    "-m", "256M",
97                                    "-initrd", img])
98        self.emulator.login()
99
100        # We check the program can execute.
101        self.assertRunOk("nft --version")
102
103        # We run the nftables test sequence using the default "nft"
104        # user space configuration tool.
105        self.nftables_test()
106
107        # We run again the same test sequence using our simple nft
108        # python implementation, to check the language bindings.
109        self.nftables_test(prog="/root/nft.py")
110