| # SPDX-License-Identifier: GPL-2.0 |
| |
| import re |
| import time |
| import json |
| |
| from lib.py import ksft_pr, cmd, ip, rand_port, wait_port_listen |
| |
| |
| class Iperf3Runner: |
| """ |
| Sets up and runs iperf3 traffic. |
| """ |
| def __init__(self, env, port=None, server_ip=None, client_ip=None): |
| env.require_cmd("iperf3", local=True, remote=True) |
| self.env = env |
| self.port = rand_port() if port is None else port |
| self.server_ip = server_ip |
| self.client_ip = client_ip |
| |
| def _build_server(self): |
| cmdline = f"iperf3 -s -1 -p {self.port}" |
| if self.server_ip: |
| cmdline += f" -B {self.server_ip}" |
| return cmdline |
| |
| def _build_client(self, streams, duration, reverse): |
| host = self.env.addr if self.server_ip is None else self.server_ip |
| cmdline = f"iperf3 -c {host} -p {self.port} -P {streams} -t {duration} -J" |
| if self.client_ip: |
| cmdline += f" -B {self.client_ip}" |
| if reverse: |
| cmdline += " --reverse" |
| return cmdline |
| |
| def start_server(self): |
| """ |
| Starts an iperf3 server with optional bind IP. |
| """ |
| cmdline = self._build_server() |
| proc = cmd(cmdline, background=True) |
| wait_port_listen(self.port) |
| time.sleep(0.1) |
| return proc |
| |
| def start_client(self, background=False, streams=1, duration=10, reverse=False): |
| """ |
| Starts the iperf3 client with the configured options. |
| """ |
| cmdline = self._build_client(streams, duration, reverse) |
| return cmd(cmdline, background=background, host=self.env.remote) |
| |
| def measure_bandwidth(self, reverse=False): |
| """ |
| Runs an iperf3 measurement and returns the average bandwidth (Gbps). |
| Discards the first and last few reporting intervals and uses only the |
| middle part of the run where throughput is typically stable. |
| """ |
| self.start_server() |
| result = self.start_client(duration=10, reverse=reverse) |
| |
| if result.ret != 0: |
| raise RuntimeError("iperf3 failed to run successfully") |
| try: |
| out = json.loads(result.stdout) |
| except json.JSONDecodeError as exc: |
| raise ValueError("Failed to parse iperf3 JSON output") from exc |
| |
| intervals = out.get("intervals", []) |
| samples = [i["sum"]["bits_per_second"] / 1e9 for i in intervals] |
| if len(samples) < 10: |
| raise ValueError(f"iperf3 returned too few intervals: {len(samples)}") |
| # Discard potentially unstable first and last 3 seconds. |
| stable = samples[3:-3] |
| |
| avg = sum(stable) / len(stable) |
| |
| return avg |
| |
| |
| class GenerateTraffic: |
| def __init__(self, env, port=None): |
| self.env = env |
| self.runner = Iperf3Runner(env, port) |
| |
| self._iperf_server = self.runner.start_server() |
| self._iperf_client = self.runner.start_client(background=True, streams=16, duration=86400) |
| |
| # Wait for traffic to ramp up |
| if not self._wait_pkts(pps=1000): |
| self.stop(verbose=True) |
| raise Exception("iperf3 traffic did not ramp up") |
| |
| def _wait_pkts(self, pkt_cnt=None, pps=None): |
| """ |
| Wait until we've seen pkt_cnt or until traffic ramps up to pps. |
| Only one of pkt_cnt or pss can be specified. |
| """ |
| pkt_start = ip("-s link show dev " + self.env.ifname, json=True)[0]["stats64"]["rx"]["packets"] |
| for _ in range(50): |
| time.sleep(0.1) |
| pkt_now = ip("-s link show dev " + self.env.ifname, json=True)[0]["stats64"]["rx"]["packets"] |
| if pps: |
| if pkt_now - pkt_start > pps / 10: |
| return True |
| pkt_start = pkt_now |
| elif pkt_cnt: |
| if pkt_now - pkt_start > pkt_cnt: |
| return True |
| return False |
| |
| def wait_pkts_and_stop(self, pkt_cnt): |
| failed = not self._wait_pkts(pkt_cnt=pkt_cnt) |
| self.stop(verbose=failed) |
| |
| def stop(self, verbose=None): |
| self._iperf_client.process(terminate=True) |
| if verbose: |
| ksft_pr(">> Client:") |
| ksft_pr(self._iperf_client.stdout) |
| ksft_pr(self._iperf_client.stderr) |
| self._iperf_server.process(terminate=True) |
| if verbose: |
| ksft_pr(">> Server:") |
| ksft_pr(self._iperf_server.stdout) |
| ksft_pr(self._iperf_server.stderr) |
| self._wait_client_stopped() |
| |
| def _wait_client_stopped(self, sleep=0.005, timeout=5): |
| end = time.monotonic() + timeout |
| |
| live_port_pattern = re.compile(fr":{self.runner.port:04X} 0[^6] ") |
| |
| while time.monotonic() < end: |
| data = cmd("cat /proc/net/tcp*", host=self.env.remote).stdout |
| if not live_port_pattern.search(data): |
| return |
| time.sleep(sleep) |
| raise Exception(f"Waiting for client to stop timed out after {timeout}s") |