1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4from lib.py import ksft_run, ksft_exit 5from lib.py import ksft_eq, KsftSkipEx 6from lib.py import NetDrvEpEnv 7from lib.py import bkg, cmd, rand_port, wait_port_listen 8from lib.py import ksft_disruptive 9 10 11def require_devmem(cfg): 12 if not hasattr(cfg, "_devmem_probed"): 13 port = rand_port() 14 probe_command = f"./ncdevmem -f {cfg.ifname}" 15 cfg._devmem_supported = cmd(probe_command, fail=False, shell=True).ret == 0 16 cfg._devmem_probed = True 17 18 if not cfg._devmem_supported: 19 raise KsftSkipEx("Test requires devmem support") 20 21 22@ksft_disruptive 23def check_rx(cfg) -> None: 24 cfg.require_v6() 25 require_devmem(cfg) 26 27 port = rand_port() 28 listen_cmd = f"./ncdevmem -l -f {cfg.ifname} -s {cfg.v6} -p {port}" 29 30 with bkg(listen_cmd) as socat: 31 wait_port_listen(port) 32 cmd(f"echo -e \"hello\\nworld\"| socat -u - TCP6:[{cfg.v6}]:{port}", host=cfg.remote, shell=True) 33 34 ksft_eq(socat.stdout.strip(), "hello\nworld") 35 36 37def main() -> None: 38 with NetDrvEpEnv(__file__) as cfg: 39 ksft_run([check_rx], 40 args=(cfg, )) 41 ksft_exit() 42 43 44if __name__ == "__main__": 45 main() 46