xref: /linux/tools/testing/selftests/drivers/net/so_txtime.py (revision 6a4c4656b0d2d4056a1f0c35442db4e8a5cf8021)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""Regression tests for the SO_TXTIME interface.
5
6Test delivery time in FQ and ETF qdiscs.
7"""
8
9import time
10
11from lib.py import ksft_exit, ksft_run, ksft_variants
12from lib.py import KsftNamedVariant, KsftSkipEx
13from lib.py import NetDrvEpEnv, bkg, cmd, defer, tc
14
15
16def test_so_txtime(cfg, clockid, ipver, args_tx, args_rx, expect_success):
17    """Main function. Run so_txtime as sender and receiver."""
18    bin_path = cfg.test_dir / "so_txtime"
19
20    tstart = time.time_ns() + 200_000_000
21
22    cmd_addr = f"-S {cfg.addr_v[ipver]} -D {cfg.remote_addr_v[ipver]}"
23    cmd_base = f"{bin_path} -{ipver} -c {clockid} -t {tstart} {cmd_addr}"
24    cmd_rx = f"{cmd_base} {args_rx} -r"
25    cmd_tx = f"{cmd_base} {args_tx}"
26
27    with bkg(cmd_rx, host=cfg.remote, fail=expect_success,
28             expect_fail=(not expect_success), exit_wait=True):
29        cmd(cmd_tx)
30
31
32def _qdisc_setup(ifname, qdisc, optargs=""):
33    """Replace root qdisc. Restore the original after the test.
34
35    If the original is mq, children will be of type default_qdisc.
36    """
37    orig = tc(f"qdisc show dev {ifname} root", json=True)[0].get("kind", None)
38    defer(tc, f"qdisc replace dev {ifname} root {orig}")
39    tc(f"qdisc replace dev {ifname} root {qdisc} {optargs}")
40
41
42def _test_variants_mono():
43    for ipver in ["4", "6"]:
44        for testcase in [
45            ["no_delay", "a,-1", "a,-1"],
46            ["zero_delay", "a,0", "a,0"],
47            ["one_pkt", "a,10", "a,10"],
48            ["in_order", "a,10,b,20", "a,10,b,20"],
49            ["reverse_order", "a,20,b,10", "b,20,a,20"],
50        ]:
51            name = f"v{ipver}_{testcase[0]}"
52            yield KsftNamedVariant(name, ipver, testcase[1], testcase[2])
53
54
55@ksft_variants(_test_variants_mono())
56def test_so_txtime_mono(cfg, ipver, args_tx, args_rx):
57    """Run all variants of monotonic (fq) tests."""
58    _qdisc_setup(cfg.ifname, "fq")
59    test_so_txtime(cfg, "mono", ipver, args_tx, args_rx, True)
60
61
62def _test_variants_etf():
63    for ipver in ["4", "6"]:
64        for testcase in [
65            ["no_delay", "a,-1", "a,-1", False],
66            ["zero_delay", "a,0", "a,0", False],
67            ["one_pkt", "a,10", "a,10", True],
68            ["in_order", "a,10,b,20", "a,10,b,20", True],
69            ["reverse_order", "a,20,b,10", "b,10,a,20", True],
70        ]:
71            name = f"v{ipver}_{testcase[0]}"
72            yield KsftNamedVariant(
73                name, ipver, testcase[1], testcase[2], testcase[3]
74            )
75
76
77@ksft_variants(_test_variants_etf())
78def test_so_txtime_etf(cfg, ipver, args_tx, args_rx, expect_fail):
79    """Run all variants of etf tests."""
80    try:
81        _qdisc_setup(cfg.ifname, "etf", "clockid CLOCK_TAI delta 400000")
82    except Exception as e:
83        raise KsftSkipEx("tc does not support qdisc etf. skipping") from e
84
85    test_so_txtime(cfg, "tai", ipver, args_tx, args_rx, expect_fail)
86
87
88def main() -> None:
89    """Boilerplate ksft main."""
90    with NetDrvEpEnv(__file__) as cfg:
91        ksft_run([test_so_txtime_mono, test_so_txtime_etf], args=(cfg,))
92    ksft_exit()
93
94
95if __name__ == "__main__":
96    main()
97