1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4from os import path 5from lib.py import ksft_run, ksft_exit 6from lib.py import ksft_eq, KsftSkipEx 7from lib.py import NetDrvEpEnv 8from lib.py import bkg, cmd, rand_port, wait_port_listen 9from lib.py import ksft_disruptive 10 11 12def require_devmem(cfg): 13 if not hasattr(cfg, "_devmem_probed"): 14 probe_command = f"{cfg.bin_local} -f {cfg.ifname}" 15 cfg._devmem_supported = cmd(probe_command, fail=False, shell=True).ret == 0 16 cfg._devmem_probed = True 17 18 if not cfg._devmem_supported: 19 raise KsftSkipEx("Test requires devmem support") 20 21 22@ksft_disruptive 23def check_rx(cfg) -> None: 24 cfg.require_ipver("6") 25 require_devmem(cfg) 26 27 port = rand_port() 28 listen_cmd = f"{cfg.bin_local} -l -f {cfg.ifname} -s {cfg.addr_v['6']} -p {port}" 29 30 with bkg(listen_cmd) as socat: 31 wait_port_listen(port) 32 cmd(f"echo -e \"hello\\nworld\"| socat -u - TCP6:[{cfg.addr_v['6']}]:{port}", host=cfg.remote, shell=True) 33 34 ksft_eq(socat.stdout.strip(), "hello\nworld") 35 36 37@ksft_disruptive 38def check_tx(cfg) -> None: 39 cfg.require_ipver("6") 40 require_devmem(cfg) 41 42 port = rand_port() 43 listen_cmd = f"socat -U - TCP6-LISTEN:{port}" 44 45 with bkg(listen_cmd, exit_wait=True) as socat: 46 wait_port_listen(port) 47 cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr_v['6']} -p {port}", host=cfg.remote, shell=True) 48 49 ksft_eq(socat.stdout.strip(), "hello\nworld") 50 51 52@ksft_disruptive 53def check_tx_chunks(cfg) -> None: 54 cfg.require_ipver("6") 55 require_devmem(cfg) 56 57 port = rand_port() 58 listen_cmd = f"socat -U - TCP6-LISTEN:{port}" 59 60 with bkg(listen_cmd, exit_wait=True) as socat: 61 wait_port_listen(port) 62 cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr_v['6']} -p {port} -z 3", host=cfg.remote, shell=True) 63 64 ksft_eq(socat.stdout.strip(), "hello\nworld") 65 66 67def main() -> None: 68 with NetDrvEpEnv(__file__) as cfg: 69 cfg.bin_local = path.abspath(path.dirname(__file__) + "/ncdevmem") 70 cfg.bin_remote = cfg.remote.deploy(cfg.bin_local) 71 72 ksft_run([check_rx, check_tx, check_tx_chunks], 73 args=(cfg, )) 74 ksft_exit() 75 76 77if __name__ == "__main__": 78 main() 79