| # SPDX-License-Identifier: GPL-2.0 |
| # Copyright (c) 2020 SUSE LLC. |
| |
| import collections |
| import functools |
| import json |
| import os |
| import socket |
| import subprocess |
| import unittest |
| |
| |
| # Add the source tree of bpftool and /usr/local/sbin to PATH |
| cur_dir = os.path.dirname(os.path.realpath(__file__)) |
| bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..", |
| "tools", "bpf", "bpftool")) |
| os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"] |
| |
| |
| class IfaceNotFoundError(Exception): |
| pass |
| |
| |
| class UnprivilegedUserError(Exception): |
| pass |
| |
| |
| def _bpftool(args, json=True): |
| _args = ["bpftool"] |
| if json: |
| _args.append("-j") |
| _args.extend(args) |
| |
| return subprocess.check_output(_args) |
| |
| |
| def bpftool(args): |
| return _bpftool(args, json=False).decode("utf-8") |
| |
| |
| def bpftool_json(args): |
| res = _bpftool(args) |
| return json.loads(res) |
| |
| |
| def get_default_iface(): |
| for iface in socket.if_nameindex(): |
| if iface[1] != "lo": |
| return iface[1] |
| raise IfaceNotFoundError("Could not find any network interface to probe") |
| |
| |
| def default_iface(f): |
| @functools.wraps(f) |
| def wrapper(*args, **kwargs): |
| iface = get_default_iface() |
| return f(*args, iface, **kwargs) |
| return wrapper |
| |
| DMESG_EMITTING_HELPERS = [ |
| "bpf_probe_write_user", |
| "bpf_trace_printk", |
| "bpf_trace_vprintk", |
| ] |
| |
| class TestBpftool(unittest.TestCase): |
| @classmethod |
| def setUpClass(cls): |
| if os.getuid() != 0: |
| raise UnprivilegedUserError( |
| "This test suite needs root privileges") |
| |
| @default_iface |
| def test_feature_dev_json(self, iface): |
| unexpected_helpers = DMESG_EMITTING_HELPERS |
| expected_keys = [ |
| "syscall_config", |
| "program_types", |
| "map_types", |
| "helpers", |
| "misc", |
| ] |
| |
| res = bpftool_json(["feature", "probe", "dev", iface]) |
| # Check if the result has all expected keys. |
| self.assertCountEqual(res.keys(), expected_keys) |
| # Check if unexpected helpers are not included in helpers probes |
| # result. |
| for helpers in res["helpers"].values(): |
| for unexpected_helper in unexpected_helpers: |
| self.assertNotIn(unexpected_helper, helpers) |
| |
| def test_feature_kernel(self): |
| test_cases = [ |
| bpftool_json(["feature", "probe", "kernel"]), |
| bpftool_json(["feature", "probe"]), |
| bpftool_json(["feature"]), |
| ] |
| unexpected_helpers = DMESG_EMITTING_HELPERS |
| expected_keys = [ |
| "syscall_config", |
| "system_config", |
| "program_types", |
| "map_types", |
| "helpers", |
| "misc", |
| ] |
| |
| for tc in test_cases: |
| # Check if the result has all expected keys. |
| self.assertCountEqual(tc.keys(), expected_keys) |
| # Check if unexpected helpers are not included in helpers probes |
| # result. |
| for helpers in tc["helpers"].values(): |
| for unexpected_helper in unexpected_helpers: |
| self.assertNotIn(unexpected_helper, helpers) |
| |
| def test_feature_kernel_full(self): |
| test_cases = [ |
| bpftool_json(["feature", "probe", "kernel", "full"]), |
| bpftool_json(["feature", "probe", "full"]), |
| ] |
| expected_helpers = DMESG_EMITTING_HELPERS |
| |
| for tc in test_cases: |
| # Check if expected helpers are included at least once in any |
| # helpers list for any program type. Unfortunately we cannot assume |
| # that they will be included in all program types or a specific |
| # subset of programs. It depends on the kernel version and |
| # configuration. |
| found_helpers = False |
| |
| for helpers in tc["helpers"].values(): |
| if all(expected_helper in helpers |
| for expected_helper in expected_helpers): |
| found_helpers = True |
| break |
| |
| self.assertTrue(found_helpers) |
| |
| def test_feature_kernel_full_vs_not_full(self): |
| full_res = bpftool_json(["feature", "probe", "full"]) |
| not_full_res = bpftool_json(["feature", "probe"]) |
| not_full_set = set() |
| full_set = set() |
| |
| for helpers in full_res["helpers"].values(): |
| for helper in helpers: |
| full_set.add(helper) |
| |
| for helpers in not_full_res["helpers"].values(): |
| for helper in helpers: |
| not_full_set.add(helper) |
| |
| self.assertCountEqual(full_set - not_full_set, |
| set(DMESG_EMITTING_HELPERS)) |
| self.assertCountEqual(not_full_set - full_set, set()) |
| |
| def test_feature_macros(self): |
| expected_patterns = [ |
| r"/\*\*\* System call availability \*\*\*/", |
| r"#define HAVE_BPF_SYSCALL", |
| r"/\*\*\* eBPF program types \*\*\*/", |
| r"#define HAVE.*PROG_TYPE", |
| r"/\*\*\* eBPF map types \*\*\*/", |
| r"#define HAVE.*MAP_TYPE", |
| r"/\*\*\* eBPF helper functions \*\*\*/", |
| r"#define HAVE.*HELPER", |
| r"/\*\*\* eBPF misc features \*\*\*/", |
| ] |
| |
| res = bpftool(["feature", "probe", "macros"]) |
| for pattern in expected_patterns: |
| self.assertRegex(res, pattern) |