xref: /linux/tools/testing/selftests/drivers/net/hw/devmem.py (revision 54fd6bd42e7bd351802ff1d193a2e33e4bfb1836)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4from os import path
5from lib.py import ksft_run, ksft_exit
6from lib.py import ksft_eq, KsftSkipEx
7from lib.py import NetDrvEpEnv
8from lib.py import bkg, cmd, rand_port, wait_port_listen
9from lib.py import ksft_disruptive
10
11
12def require_devmem(cfg):
13    if not hasattr(cfg, "_devmem_probed"):
14        probe_command = f"{cfg.bin_local} -f {cfg.ifname}"
15        cfg._devmem_supported = cmd(probe_command, fail=False, shell=True).ret == 0
16        cfg._devmem_probed = True
17
18    if not cfg._devmem_supported:
19        raise KsftSkipEx("Test requires devmem support")
20
21
22@ksft_disruptive
23def check_rx(cfg) -> None:
24    require_devmem(cfg)
25
26    port = rand_port()
27    socat = f"socat -u - TCP{cfg.addr_ipver}:{cfg.addr}:{port},bind={cfg.remote_addr}:{port}"
28    listen_cmd = f"{cfg.bin_local} -l -f {cfg.ifname} -s {cfg.addr} -p {port} -c {cfg.remote_addr} -v 7"
29
30    with bkg(listen_cmd, exit_wait=True) as ncdevmem:
31        wait_port_listen(port)
32        cmd(f"yes $(echo -e \x01\x02\x03\x04\x05\x06) | \
33            head -c 1K | {socat}", host=cfg.remote, shell=True)
34
35    ksft_eq(ncdevmem.ret, 0)
36
37
38@ksft_disruptive
39def check_tx(cfg) -> None:
40    require_devmem(cfg)
41
42    port = rand_port()
43    listen_cmd = f"socat -U - TCP{cfg.addr_ipver}-LISTEN:{port}"
44
45    with bkg(listen_cmd) as socat:
46        wait_port_listen(port)
47        cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr} -p {port}", host=cfg.remote, shell=True)
48
49    ksft_eq(socat.stdout.strip(), "hello\nworld")
50
51
52@ksft_disruptive
53def check_tx_chunks(cfg) -> None:
54    require_devmem(cfg)
55
56    port = rand_port()
57    listen_cmd = f"socat -U - TCP{cfg.addr_ipver}-LISTEN:{port}"
58
59    with bkg(listen_cmd, exit_wait=True) as socat:
60        wait_port_listen(port)
61        cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr} -p {port} -z 3", host=cfg.remote, shell=True)
62
63    ksft_eq(socat.stdout.strip(), "hello\nworld")
64
65
66def main() -> None:
67    with NetDrvEpEnv(__file__) as cfg:
68        cfg.bin_local = path.abspath(path.dirname(__file__) + "/ncdevmem")
69        cfg.bin_remote = cfg.remote.deploy(cfg.bin_local)
70
71        ksft_run([check_rx, check_tx, check_tx_chunks],
72                 args=(cfg, ))
73    ksft_exit()
74
75
76if __name__ == "__main__":
77    main()
78