xref: /linux/tools/testing/selftests/net/lib/py/utils.py (revision 72c19df24a3e352b5eefd6657cf8d4051ccf77c5)
1# SPDX-License-Identifier: GPL-2.0
2
3import json as _json
4import random
5import re
6import subprocess
7import time
8
9
10class cmd:
11    def __init__(self, comm, shell=True, fail=True, ns=None, background=False, host=None):
12        if ns:
13            comm = f'ip netns exec {ns} ' + comm
14
15        self.stdout = None
16        self.stderr = None
17        self.ret = None
18
19        self.comm = comm
20        if host:
21            self.proc = host.cmd(comm)
22        else:
23            self.proc = subprocess.Popen(comm, shell=shell, stdout=subprocess.PIPE,
24                                         stderr=subprocess.PIPE)
25        if not background:
26            self.process(terminate=False, fail=fail)
27
28    def process(self, terminate=True, fail=None):
29        if terminate:
30            self.proc.terminate()
31        stdout, stderr = self.proc.communicate(timeout=5)
32        self.stdout = stdout.decode("utf-8")
33        self.stderr = stderr.decode("utf-8")
34        self.proc.stdout.close()
35        self.proc.stderr.close()
36        self.ret = self.proc.returncode
37
38        if self.proc.returncode != 0 and fail:
39            if len(stderr) > 0 and stderr[-1] == "\n":
40                stderr = stderr[:-1]
41            raise Exception("Command failed: %s\nSTDOUT: %s\nSTDERR: %s" %
42                            (self.proc.args, stdout, stderr))
43
44
45class bkg(cmd):
46    def __init__(self, comm, shell=True, fail=True, ns=None, host=None,
47                 exit_wait=False):
48        super().__init__(comm, background=True,
49                         shell=shell, fail=fail, ns=ns, host=host)
50        self.terminate = not exit_wait
51
52    def __enter__(self):
53        return self
54
55    def __exit__(self, ex_type, ex_value, ex_tb):
56        return self.process(terminate=self.terminate)
57
58
59def ip(args, json=None, ns=None, host=None):
60    cmd_str = "ip "
61    if json:
62        cmd_str += '-j '
63    cmd_str += args
64    cmd_obj = cmd(cmd_str, ns=ns, host=host)
65    if json:
66        return _json.loads(cmd_obj.stdout)
67    return cmd_obj
68
69
70def rand_port():
71    """
72    Get unprivileged port, for now just random, one day we may decide to check if used.
73    """
74    return random.randint(1024, 65535)
75
76
77def wait_port_listen(port, proto="tcp", ns=None, host=None, sleep=0.005, deadline=5):
78    end = time.monotonic() + deadline
79
80    pattern = f":{port:04X} .* "
81    if proto == "tcp": # for tcp protocol additionally check the socket state
82        pattern += "0A"
83    pattern = re.compile(pattern)
84
85    while True:
86        data = cmd(f'cat /proc/net/{proto}*', ns=ns, host=host, shell=True).stdout
87        for row in data.split("\n"):
88            if pattern.search(row):
89                return
90        if time.monotonic() > end:
91            raise Exception("Waiting for port listen timed out")
92        time.sleep(sleep)
93