| import os |
| |
| import infra.basetest |
| |
| |
| class TestNftables(infra.basetest.BRTest): |
| config = \ |
| """ |
| BR2_aarch64=y |
| BR2_TOOLCHAIN_EXTERNAL=y |
| BR2_TARGET_GENERIC_GETTY_PORT="ttyAMA0" |
| BR2_LINUX_KERNEL=y |
| BR2_LINUX_KERNEL_CUSTOM_VERSION=y |
| BR2_LINUX_KERNEL_CUSTOM_VERSION_VALUE="6.1.46" |
| BR2_LINUX_KERNEL_USE_CUSTOM_CONFIG=y |
| BR2_LINUX_KERNEL_CUSTOM_CONFIG_FILE="board/qemu/aarch64-virt/linux.config" |
| BR2_LINUX_KERNEL_NEEDS_HOST_OPENSSL=y |
| BR2_PACKAGE_NFTABLES=y |
| BR2_PACKAGE_PYTHON3=y |
| BR2_ROOTFS_OVERLAY="{}" |
| BR2_TARGET_ROOTFS_CPIO=y |
| BR2_TARGET_ROOTFS_CPIO_GZIP=y |
| # BR2_TARGET_ROOTFS_TAR is not set |
| """.format( |
| infra.filepath("tests/package/test_nftables/rootfs-overlay")) |
| |
| def nftables_test(self, prog="nft"): |
| # Table/Chain names for the test |
| nft_table = "br_ip_table" |
| nft_chain = "br_ip_chain_in" |
| |
| # We flush all nftables rules, to start from a known state. |
| self.assertRunOk(f"{prog} flush ruleset") |
| |
| # We create an ip table. |
| self.assertRunOk(f"{prog} add table ip {nft_table}") |
| |
| # We should be able to list this table. |
| list_cmd = f"{prog} list tables ip" |
| output, exit_code = self.emulator.run(list_cmd) |
| self.assertEqual(exit_code, 0) |
| self.assertIn(nft_table, output[0]) |
| |
| # We create an ip input chain in our table. |
| cmd = f"{prog} add chain ip" |
| cmd += f" {nft_table} {nft_chain}" |
| cmd += " { type filter hook input priority 0 \\; }" |
| self.assertRunOk(cmd) |
| |
| # We list our chain. |
| cmd = f"{prog} list chain ip {nft_table} {nft_chain}" |
| self.assertRunOk(cmd) |
| |
| # We add a filter rule to drop pings (icmp echo-requests) to |
| # the 127.0.0.2 destination. |
| cmd = f"{prog} add rule ip {nft_table} {nft_chain}" |
| cmd += " ip daddr 127.0.0.2 icmp type echo-request drop" |
| self.assertRunOk(cmd) |
| |
| # We list our rule. |
| self.assertRunOk(f"{prog} list ruleset ip") |
| |
| # A ping to 127.0.0.1 is expected to work, because it's not |
| # matching our rule. We expect 3 replies (-c), with 0.5s |
| # internal (-i), and set a maximum timeout of 2s. |
| ping_cmd_prefix = "ping -c 3 -i 0.5 -W 2 " |
| self.assertRunOk(ping_cmd_prefix + "127.0.0.1") |
| |
| # A ping to 127.0.0.2 is expected to fail, because our rule is |
| # supposed to drop it. |
| ping_test_cmd = ping_cmd_prefix + "127.0.0.2" |
| _, exit_code = self.emulator.run(ping_test_cmd) |
| self.assertNotEqual(exit_code, 0) |
| |
| # We completely delete the table. This should also delete the |
| # chain and the rule. |
| self.assertRunOk(f"{prog} delete table ip {nft_table}") |
| |
| # We should no longer see the table in the list. |
| output, exit_code = self.emulator.run(list_cmd) |
| self.assertEqual(exit_code, 0) |
| self.assertNotIn(nft_table, "\n".join(output)) |
| |
| # Since we deleted the rule, the ping test command which was |
| # supposed to fail earlier is now supposed to succeed. |
| self.assertRunOk(ping_test_cmd) |
| |
| def boot_vm(self): |
| img = os.path.join(self.builddir, "images", "rootfs.cpio.gz") |
| kern = os.path.join(self.builddir, "images", "Image") |
| self.emulator.boot(arch="aarch64", |
| kernel=kern, |
| kernel_cmdline=["console=ttyAMA0"], |
| options=["-M", "virt", |
| "-cpu", "cortex-a57", |
| "-m", "256M", |
| "-initrd", img]) |
| self.emulator.login() |
| |
| def test_run(self): |
| self.boot_vm() |
| |
| # We check the program can execute. |
| self.assertRunOk("nft --version") |
| |
| # We run the nftables test sequence using the default "nft" |
| # user space configuration tool. |
| self.nftables_test() |
| |
| # We run again the same test sequence using our simple nft |
| # python implementation, to check the language bindings. |
| self.nftables_test(prog="/root/nft.py") |
| |
| |
| class TestNftablesInit(TestNftables): |
| config = TestNftables.config + \ |
| """ |
| BR2_INIT_BUSYBOX=y |
| """ |
| |
| def test_run(self): |
| self.boot_vm() |
| |
| # start with known state (rules from /etc/nftables.conf) |
| self.assertRunOk("/etc/init.d/S35nftables reload") |
| |
| # Same concept as in TestNftables.nftables_test: The rules |
| # should allow ping to 127.0.0.1, but not 127.0.0.2. |
| ping_cmd_prefix = "ping -c 3 -i 0.5 -W 2 " |
| self.assertRunOk(ping_cmd_prefix + "127.0.0.1") |
| _, exit_code = self.emulator.run(ping_cmd_prefix + "127.0.0.2") |
| self.assertNotEqual(exit_code, 0) |
| |
| # Stop should flush the rules, ping to both addresses should |
| # work now. |
| self.assertRunOk("/etc/init.d/S35nftables stop") |
| self.assertRunOk(ping_cmd_prefix + "127.0.0.1") |
| self.assertRunOk(ping_cmd_prefix + "127.0.0.2") |
| |
| # Start is essentially the same as reload, check that |
| # 127.0.0.2 gets blocked again. |
| self.assertRunOk("/etc/init.d/S35nftables start") |
| _, exit_code = self.emulator.run(ping_cmd_prefix + "127.0.0.2") |
| self.assertNotEqual(exit_code, 0) |