xref: /linux/tools/testing/selftests/drivers/net/hw/devmem.py (revision 09d7ff0694ea133c50ad905fd6e548c13f8af458)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4from os import path
5from lib.py import ksft_run, ksft_exit
6from lib.py import ksft_eq, KsftSkipEx
7from lib.py import NetDrvEpEnv
8from lib.py import bkg, cmd, rand_port, wait_port_listen
9from lib.py import ksft_disruptive
10
11
12def require_devmem(cfg):
13    if not hasattr(cfg, "_devmem_probed"):
14        probe_command = f"{cfg.bin_local} -f {cfg.ifname}"
15        cfg._devmem_supported = cmd(probe_command, fail=False, shell=True).ret == 0
16        cfg._devmem_probed = True
17
18    if not cfg._devmem_supported:
19        raise KsftSkipEx("Test requires devmem support")
20
21
22@ksft_disruptive
23def check_rx(cfg) -> None:
24    cfg.require_ipver("6")
25    require_devmem(cfg)
26
27    port = rand_port()
28    listen_cmd = f"{cfg.bin_local} -l -f {cfg.ifname} -s {cfg.addr_v['6']} -p {port}"
29
30    with bkg(listen_cmd) as socat:
31        wait_port_listen(port)
32        cmd(f"echo -e \"hello\\nworld\"| socat -u - TCP6:[{cfg.addr_v['6']}]:{port}", host=cfg.remote, shell=True)
33
34    ksft_eq(socat.stdout.strip(), "hello\nworld")
35
36
37@ksft_disruptive
38def check_tx(cfg) -> None:
39    cfg.require_ipver("6")
40    require_devmem(cfg)
41
42    port = rand_port()
43    listen_cmd = f"socat -U - TCP6-LISTEN:{port}"
44
45    with bkg(listen_cmd, exit_wait=True) as socat:
46        wait_port_listen(port)
47        cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr_v['6']} -p {port}", host=cfg.remote, shell=True)
48
49    ksft_eq(socat.stdout.strip(), "hello\nworld")
50
51
52@ksft_disruptive
53def check_tx_chunks(cfg) -> None:
54    cfg.require_ipver("6")
55    require_devmem(cfg)
56
57    port = rand_port()
58    listen_cmd = f"socat -U - TCP6-LISTEN:{port}"
59
60    with bkg(listen_cmd, exit_wait=True) as socat:
61        wait_port_listen(port)
62        cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr_v['6']} -p {port} -z 3", host=cfg.remote, shell=True)
63
64    ksft_eq(socat.stdout.strip(), "hello\nworld")
65
66
67def main() -> None:
68    with NetDrvEpEnv(__file__) as cfg:
69        cfg.bin_local = path.abspath(path.dirname(__file__) + "/ncdevmem")
70        cfg.bin_remote = cfg.remote.deploy(cfg.bin_local)
71
72        ksft_run([check_rx, check_tx, check_tx_chunks],
73                 args=(cfg, ))
74    ksft_exit()
75
76
77if __name__ == "__main__":
78    main()
79