1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4from os import path 5from lib.py import ksft_run, ksft_exit 6from lib.py import ksft_eq, KsftSkipEx 7from lib.py import NetDrvEpEnv 8from lib.py import bkg, cmd, rand_port, wait_port_listen 9from lib.py import ksft_disruptive 10 11 12def require_devmem(cfg): 13 if not hasattr(cfg, "_devmem_probed"): 14 probe_command = f"{cfg.bin_local} -f {cfg.ifname}" 15 cfg._devmem_supported = cmd(probe_command, fail=False, shell=True).ret == 0 16 cfg._devmem_probed = True 17 18 if not cfg._devmem_supported: 19 raise KsftSkipEx("Test requires devmem support") 20 21 22@ksft_disruptive 23def check_rx(cfg) -> None: 24 require_devmem(cfg) 25 26 port = rand_port() 27 socat = f"socat -u - TCP{cfg.addr_ipver}:{cfg.addr}:{port},bind={cfg.remote_addr}:{port}" 28 listen_cmd = f"{cfg.bin_local} -l -f {cfg.ifname} -s {cfg.addr} -p {port} -c {cfg.remote_addr} -v 7" 29 30 with bkg(listen_cmd, exit_wait=True) as ncdevmem: 31 wait_port_listen(port) 32 cmd(f"yes $(echo -e \x01\x02\x03\x04\x05\x06) | \ 33 head -c 1K | {socat}", host=cfg.remote, shell=True) 34 35 ksft_eq(ncdevmem.ret, 0) 36 37 38@ksft_disruptive 39def check_tx(cfg) -> None: 40 require_devmem(cfg) 41 42 port = rand_port() 43 listen_cmd = f"socat -U - TCP{cfg.addr_ipver}-LISTEN:{port}" 44 45 with bkg(listen_cmd) as socat: 46 wait_port_listen(port) 47 cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr} -p {port}", host=cfg.remote, shell=True) 48 49 ksft_eq(socat.stdout.strip(), "hello\nworld") 50 51 52@ksft_disruptive 53def check_tx_chunks(cfg) -> None: 54 require_devmem(cfg) 55 56 port = rand_port() 57 listen_cmd = f"socat -U - TCP{cfg.addr_ipver}-LISTEN:{port}" 58 59 with bkg(listen_cmd, exit_wait=True) as socat: 60 wait_port_listen(port) 61 cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr} -p {port} -z 3", host=cfg.remote, shell=True) 62 63 ksft_eq(socat.stdout.strip(), "hello\nworld") 64 65 66def main() -> None: 67 with NetDrvEpEnv(__file__) as cfg: 68 cfg.bin_local = path.abspath(path.dirname(__file__) + "/ncdevmem") 69 cfg.bin_remote = cfg.remote.deploy(cfg.bin_local) 70 71 ksft_run([check_rx, check_tx, check_tx_chunks], 72 args=(cfg, )) 73 ksft_exit() 74 75 76if __name__ == "__main__": 77 main() 78