xref: /linux/tools/testing/selftests/net/lib/py/ksft.py (revision 8195136669661fdfe54e9a8923c33b31c92fc1da)
1# SPDX-License-Identifier: GPL-2.0
2
3import builtins
4import inspect
5import sys
6import time
7import traceback
8from .consts import KSFT_MAIN_NAME
9from .utils import global_defer_queue
10
11KSFT_RESULT = None
12KSFT_RESULT_ALL = True
13
14
15class KsftFailEx(Exception):
16    pass
17
18
19class KsftSkipEx(Exception):
20    pass
21
22
23class KsftXfailEx(Exception):
24    pass
25
26
27def ksft_pr(*objs, **kwargs):
28    print("#", *objs, **kwargs)
29
30
31def _fail(*args):
32    global KSFT_RESULT
33    KSFT_RESULT = False
34
35    frame = inspect.stack()[2]
36    ksft_pr("At " + frame.filename + " line " + str(frame.lineno) + ":")
37    ksft_pr(*args)
38
39
40def ksft_eq(a, b, comment=""):
41    global KSFT_RESULT
42    if a != b:
43        _fail("Check failed", a, "!=", b, comment)
44
45
46def ksft_true(a, comment=""):
47    if not a:
48        _fail("Check failed", a, "does not eval to True", comment)
49
50
51def ksft_in(a, b, comment=""):
52    if a not in b:
53        _fail("Check failed", a, "not in", b, comment)
54
55
56def ksft_ge(a, b, comment=""):
57    if a < b:
58        _fail("Check failed", a, "<", b, comment)
59
60
61def ksft_lt(a, b, comment=""):
62    if a >= b:
63        _fail("Check failed", a, ">=", b, comment)
64
65
66class ksft_raises:
67    def __init__(self, expected_type):
68        self.exception = None
69        self.expected_type = expected_type
70
71    def __enter__(self):
72        return self
73
74    def __exit__(self, exc_type, exc_val, exc_tb):
75        if exc_type is None:
76            _fail(f"Expected exception {str(self.expected_type.__name__)}, none raised")
77        elif self.expected_type != exc_type:
78            _fail(f"Expected exception {str(self.expected_type.__name__)}, raised {str(exc_type.__name__)}")
79        self.exception = exc_val
80        # Suppress the exception if its the expected one
81        return self.expected_type == exc_type
82
83
84def ksft_busy_wait(cond, sleep=0.005, deadline=1, comment=""):
85    end = time.monotonic() + deadline
86    while True:
87        if cond():
88            return
89        if time.monotonic() > end:
90            _fail("Waiting for condition timed out", comment)
91            return
92        time.sleep(sleep)
93
94
95def ktap_result(ok, cnt=1, case="", comment=""):
96    global KSFT_RESULT_ALL
97    KSFT_RESULT_ALL = KSFT_RESULT_ALL and ok
98
99    res = ""
100    if not ok:
101        res += "not "
102    res += "ok "
103    res += str(cnt) + " "
104    res += KSFT_MAIN_NAME
105    if case:
106        res += "." + str(case.__name__)
107    if comment:
108        res += " # " + comment
109    print(res)
110
111
112def ksft_flush_defer():
113    global KSFT_RESULT
114
115    i = 0
116    qlen_start = len(global_defer_queue)
117    while global_defer_queue:
118        i += 1
119        entry = global_defer_queue.pop()
120        try:
121            entry.exec_only()
122        except:
123            ksft_pr(f"Exception while handling defer / cleanup (callback {i} of {qlen_start})!")
124            tb = traceback.format_exc()
125            for line in tb.strip().split('\n'):
126                ksft_pr("Defer Exception|", line)
127            KSFT_RESULT = False
128
129
130def ksft_run(cases=None, globs=None, case_pfx=None, args=()):
131    cases = cases or []
132
133    if globs and case_pfx:
134        for key, value in globs.items():
135            if not callable(value):
136                continue
137            for prefix in case_pfx:
138                if key.startswith(prefix):
139                    cases.append(value)
140                    break
141
142    totals = {"pass": 0, "fail": 0, "skip": 0, "xfail": 0}
143
144    print("KTAP version 1")
145    print("1.." + str(len(cases)))
146
147    global KSFT_RESULT
148    cnt = 0
149    stop = False
150    for case in cases:
151        KSFT_RESULT = True
152        cnt += 1
153        comment = ""
154        cnt_key = ""
155
156        try:
157            case(*args)
158        except KsftSkipEx as e:
159            comment = "SKIP " + str(e)
160            cnt_key = 'skip'
161        except KsftXfailEx as e:
162            comment = "XFAIL " + str(e)
163            cnt_key = 'xfail'
164        except BaseException as e:
165            stop |= isinstance(e, KeyboardInterrupt)
166            tb = traceback.format_exc()
167            for line in tb.strip().split('\n'):
168                ksft_pr("Exception|", line)
169            if stop:
170                ksft_pr("Stopping tests due to KeyboardInterrupt.")
171            KSFT_RESULT = False
172            cnt_key = 'fail'
173
174        ksft_flush_defer()
175
176        if not cnt_key:
177            cnt_key = 'pass' if KSFT_RESULT else 'fail'
178
179        ktap_result(KSFT_RESULT, cnt, case, comment=comment)
180        totals[cnt_key] += 1
181
182        if stop:
183            break
184
185    print(
186        f"# Totals: pass:{totals['pass']} fail:{totals['fail']} xfail:{totals['xfail']} xpass:0 skip:{totals['skip']} error:0"
187    )
188
189
190def ksft_exit():
191    global KSFT_RESULT_ALL
192    sys.exit(0 if KSFT_RESULT_ALL else 1)
193