1import os 2 3import infra.basetest 4 5 6class TestNftables(infra.basetest.BRTest): 7 config = \ 8 """ 9 BR2_aarch64=y 10 BR2_TOOLCHAIN_EXTERNAL=y 11 BR2_TARGET_GENERIC_GETTY_PORT="ttyAMA0" 12 BR2_LINUX_KERNEL=y 13 BR2_LINUX_KERNEL_CUSTOM_VERSION=y 14 BR2_LINUX_KERNEL_CUSTOM_VERSION_VALUE="6.1.46" 15 BR2_LINUX_KERNEL_USE_CUSTOM_CONFIG=y 16 BR2_LINUX_KERNEL_CUSTOM_CONFIG_FILE="board/qemu/aarch64-virt/linux.config" 17 BR2_LINUX_KERNEL_NEEDS_HOST_OPENSSL=y 18 BR2_PACKAGE_NFTABLES=y 19 BR2_PACKAGE_PYTHON3=y 20 BR2_ROOTFS_OVERLAY="{}" 21 BR2_TARGET_ROOTFS_CPIO=y 22 BR2_TARGET_ROOTFS_CPIO_GZIP=y 23 # BR2_TARGET_ROOTFS_TAR is not set 24 """.format( 25 infra.filepath("tests/package/test_nftables/rootfs-overlay")) 26 27 def nftables_test(self, prog="nft"): 28 # Table/Chain names for the test 29 nft_table = "br_ip_table" 30 nft_chain = "br_ip_chain_in" 31 32 # We flush all nftables rules, to start from a known state. 33 self.assertRunOk(f"{prog} flush ruleset") 34 35 # We create an ip table. 36 self.assertRunOk(f"{prog} add table ip {nft_table}") 37 38 # We should be able to list this table. 39 list_cmd = f"{prog} list tables ip" 40 output, exit_code = self.emulator.run(list_cmd) 41 self.assertEqual(exit_code, 0) 42 self.assertIn(nft_table, output[0]) 43 44 # We create an ip input chain in our table. 45 cmd = f"{prog} add chain ip" 46 cmd += f" {nft_table} {nft_chain}" 47 cmd += " { type filter hook input priority 0 \\; }" 48 self.assertRunOk(cmd) 49 50 # We list our chain. 51 cmd = f"{prog} list chain ip {nft_table} {nft_chain}" 52 self.assertRunOk(cmd) 53 54 # We add a filter rule to drop pings (icmp echo-requests) to 55 # the 127.0.0.2 destination. 56 cmd = f"{prog} add rule ip {nft_table} {nft_chain}" 57 cmd += " ip daddr 127.0.0.2 icmp type echo-request drop" 58 self.assertRunOk(cmd) 59 60 # We list our rule. 61 self.assertRunOk(f"{prog} list ruleset ip") 62 63 # A ping to 127.0.0.1 is expected to work, because it's not 64 # matching our rule. We expect 3 replies (-c), with 0.5s 65 # internal (-i), and set a maximum timeout of 2s. 66 ping_cmd_prefix = "ping -c 3 -i 0.5 -W 2 " 67 self.assertRunOk(ping_cmd_prefix + "127.0.0.1") 68 69 # A ping to 127.0.0.2 is expected to fail, because our rule is 70 # supposed to drop it. 71 ping_test_cmd = ping_cmd_prefix + "127.0.0.2" 72 _, exit_code = self.emulator.run(ping_test_cmd) 73 self.assertNotEqual(exit_code, 0) 74 75 # We completely delete the table. This should also delete the 76 # chain and the rule. 77 self.assertRunOk(f"{prog} delete table ip {nft_table}") 78 79 # We should no longer see the table in the list. 80 output, exit_code = self.emulator.run(list_cmd) 81 self.assertEqual(exit_code, 0) 82 self.assertNotIn(nft_table, "\n".join(output)) 83 84 # Since we deleted the rule, the ping test command which was 85 # supposed to fail earlier is now supposed to succeed. 86 self.assertRunOk(ping_test_cmd) 87 88 def test_run(self): 89 img = os.path.join(self.builddir, "images", "rootfs.cpio.gz") 90 kern = os.path.join(self.builddir, "images", "Image") 91 self.emulator.boot(arch="aarch64", 92 kernel=kern, 93 kernel_cmdline=["console=ttyAMA0"], 94 options=["-M", "virt", 95 "-cpu", "cortex-a57", 96 "-m", "256M", 97 "-initrd", img]) 98 self.emulator.login() 99 100 # We check the program can execute. 101 self.assertRunOk("nft --version") 102 103 # We run the nftables test sequence using the default "nft" 104 # user space configuration tool. 105 self.nftables_test() 106 107 # We run again the same test sequence using our simple nft 108 # python implementation, to check the language bindings. 109 self.nftables_test(prog="/root/nft.py") 110