blob: 0740a4d852407e331b27d26ddf47d26c5210207d [file] [edit]
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
"""Traffic test for VXLAN + IPsec crypto-offload."""
import os
from lib.py import ksft_run, ksft_exit, ksft_eq, ksft_ge
from lib.py import ksft_variants, KsftNamedVariant, KsftSkipEx
from lib.py import CmdExitFailure, NetDrvEpEnv, cmd, defer, ethtool, ip
from lib.py import Iperf3Runner
# Inner tunnel addresses - TEST-NET-2 (RFC 5737) / doc prefix (RFC 3849)
INNER_V4_LOCAL = "198.51.100.1"
INNER_V4_REMOTE = "198.51.100.2"
INNER_V6_LOCAL = "2001:db8:100::1"
INNER_V6_REMOTE = "2001:db8:100::2"
# ESP parameters
SPI_OUT = "0x1000"
SPI_IN = "0x1001"
# 128-bit key + 32-bit salt = 20 bytes hex, 128-bit ICV
ESP_AEAD = "aead 'rfc4106(gcm(aes))' 0x" + "01" * 20 + " 128"
def xfrm(args, host=None):
"""Runs 'ip xfrm' via shell to preserve parentheses in algo names."""
cmd(f"ip xfrm {args}", shell=True, host=host)
def check_xfrm_offload_support():
"""Skips if iproute2 lacks xfrm offload support."""
out = cmd("ip xfrm state help", fail=False)
if "offload" not in out.stdout + out.stderr:
raise KsftSkipEx("iproute2 too old, missing xfrm offload")
def check_esp_hw_offload(cfg):
"""Skips if device lacks esp-hw-offload support."""
check_xfrm_offload_support()
try:
feat = ethtool(f"-k {cfg.ifname}", json=True)[0]
except (CmdExitFailure, IndexError) as e:
raise KsftSkipEx(f"can't query features: {e}") from e
if not feat.get("esp-hw-offload", {}).get("active"):
raise KsftSkipEx("Device does not support esp-hw-offload")
def get_tx_drops(cfg):
"""Returns TX dropped counter from the physical device."""
stats = ip("-s -s link show dev " + cfg.ifname, json=True)[0]
return stats["stats64"]["tx"]["dropped"]
def setup_vxlan_ipsec(cfg, outer_ipver, inner_ipver):
"""Sets up VXLAN tunnel with IPsec transport-mode crypto-offload."""
vxlan_name = f"vx{os.getpid()}"
local_addr = cfg.addr_v[outer_ipver]
remote_addr = cfg.remote_addr_v[outer_ipver]
if inner_ipver == "4":
inner_local = f"{INNER_V4_LOCAL}/24"
inner_remote = f"{INNER_V4_REMOTE}/24"
addr_extra = ""
else:
inner_local = f"{INNER_V6_LOCAL}/64"
inner_remote = f"{INNER_V6_REMOTE}/64"
addr_extra = " nodad"
if outer_ipver == "6":
vxlan_opts = "udp6zerocsumtx udp6zerocsumrx"
else:
vxlan_opts = "noudpcsum"
# VXLAN tunnel - local side
ip(f"link add {vxlan_name} type vxlan id 100 dstport 4789 {vxlan_opts} "
f"local {local_addr} remote {remote_addr} dev {cfg.ifname}")
defer(ip, f"link del {vxlan_name}")
ip(f"addr add {inner_local} dev {vxlan_name}{addr_extra}")
ip(f"link set {vxlan_name} up")
# VXLAN tunnel - remote side
ip(f"link add {vxlan_name} type vxlan id 100 dstport 4789 {vxlan_opts} "
f"local {remote_addr} remote {local_addr} dev {cfg.remote_ifname}",
host=cfg.remote)
defer(ip, f"link del {vxlan_name}", host=cfg.remote)
ip(f"addr add {inner_remote} dev {vxlan_name}{addr_extra}",
host=cfg.remote)
ip(f"link set {vxlan_name} up", host=cfg.remote)
# xfrm state - local outbound SA
xfrm(f"state add src {local_addr} dst {remote_addr} "
f"proto esp spi {SPI_OUT} "
f"{ESP_AEAD} "
f"mode transport offload crypto dev {cfg.ifname} dir out")
defer(xfrm, f"state del src {local_addr} dst {remote_addr} "
f"proto esp spi {SPI_OUT}")
# xfrm state - local inbound SA
xfrm(f"state add src {remote_addr} dst {local_addr} "
f"proto esp spi {SPI_IN} "
f"{ESP_AEAD} "
f"mode transport offload crypto dev {cfg.ifname} dir in")
defer(xfrm, f"state del src {remote_addr} dst {local_addr} "
f"proto esp spi {SPI_IN}")
# xfrm state - remote outbound SA (mirror, software crypto)
xfrm(f"state add src {remote_addr} dst {local_addr} "
f"proto esp spi {SPI_IN} "
f"{ESP_AEAD} "
f"mode transport",
host=cfg.remote)
defer(xfrm, f"state del src {remote_addr} dst {local_addr} "
f"proto esp spi {SPI_IN}", host=cfg.remote)
# xfrm state - remote inbound SA (mirror, software crypto)
xfrm(f"state add src {local_addr} dst {remote_addr} "
f"proto esp spi {SPI_OUT} "
f"{ESP_AEAD} "
f"mode transport",
host=cfg.remote)
defer(xfrm, f"state del src {local_addr} dst {remote_addr} "
f"proto esp spi {SPI_OUT}", host=cfg.remote)
# xfrm policy - local out
xfrm(f"policy add src {local_addr} dst {remote_addr} "
f"proto udp dport 4789 dir out "
f"tmpl src {local_addr} dst {remote_addr} proto esp mode transport")
defer(xfrm, f"policy del src {local_addr} dst {remote_addr} "
f"proto udp dport 4789 dir out")
# xfrm policy - local in
xfrm(f"policy add src {remote_addr} dst {local_addr} "
f"proto udp dport 4789 dir in "
f"tmpl src {remote_addr} dst {local_addr} proto esp mode transport")
defer(xfrm, f"policy del src {remote_addr} dst {local_addr} "
f"proto udp dport 4789 dir in")
# xfrm policy - remote out
xfrm(f"policy add src {remote_addr} dst {local_addr} "
f"proto udp dport 4789 dir out "
f"tmpl src {remote_addr} dst {local_addr} proto esp mode transport",
host=cfg.remote)
defer(xfrm, f"policy del src {remote_addr} dst {local_addr} "
f"proto udp dport 4789 dir out", host=cfg.remote)
# xfrm policy - remote in
xfrm(f"policy add src {local_addr} dst {remote_addr} "
f"proto udp dport 4789 dir in "
f"tmpl src {local_addr} dst {remote_addr} proto esp mode transport",
host=cfg.remote)
defer(xfrm, f"policy del src {local_addr} dst {remote_addr} "
f"proto udp dport 4789 dir in", host=cfg.remote)
def _vxlan_ipsec_variants():
"""Generates outer/inner IP version variants."""
for outer in ["4", "6"]:
for inner in ["4", "6"]:
yield KsftNamedVariant(f"outer_v{outer}_inner_v{inner}", outer, inner)
@ksft_variants(_vxlan_ipsec_variants())
def test_vxlan_ipsec_crypto_offload(cfg, outer_ipver, inner_ipver):
"""Tests VXLAN+IPsec crypto-offload has no TX drops."""
cfg.require_ipver(outer_ipver)
check_esp_hw_offload(cfg)
setup_vxlan_ipsec(cfg, outer_ipver, inner_ipver)
if inner_ipver == "4":
inner_local = INNER_V4_LOCAL
inner_remote = INNER_V4_REMOTE
ping = "ping"
else:
inner_local = INNER_V6_LOCAL
inner_remote = INNER_V6_REMOTE
ping = "ping -6"
cmd(f"{ping} -c 1 -W 2 {inner_remote}")
drops_before = get_tx_drops(cfg)
runner = Iperf3Runner(cfg, server_ip=inner_local,
client_ip=inner_remote)
bw_gbps = runner.measure_bandwidth(reverse=True)
cfg.wait_hw_stats_settle()
drops_after = get_tx_drops(cfg)
ksft_eq(drops_after - drops_before, 0,
comment="TX drops during VXLAN+IPsec")
ksft_ge(bw_gbps, 0.1,
comment="Minimum 100Mbps over VXLAN+IPsec")
def main():
"""Runs VXLAN+IPsec crypto-offload GSO selftest."""
with NetDrvEpEnv(__file__, nsim_test=False) as cfg:
ksft_run([test_vxlan_ipsec_crypto_offload], args=(cfg,))
ksft_exit()
if __name__ == "__main__":
main()