xref: /linux/tools/testing/selftests/tc-testing/tdc.py (revision 22ac5ad4a7d4e201d19b7f04ce8d79346c80a34b)
1#!/usr/bin/env python3
2
3"""
4tdc.py - Linux tc (Traffic Control) unit test driver
5
6Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
7"""
8
9import re
10import os
11import sys
12import argparse
13import json
14import subprocess
15from collections import OrderedDict
16from string import Template
17
18from tdc_config import *
19from tdc_helper import *
20
21
22USE_NS = True
23
24
25def replace_keywords(cmd):
26    """
27    For a given executable command, substitute any known
28    variables contained within NAMES with the correct values
29    """
30    tcmd = Template(cmd)
31    subcmd = tcmd.safe_substitute(NAMES)
32    return subcmd
33
34
35def exec_cmd(command, nsonly=True):
36    """
37    Perform any required modifications on an executable command, then run
38    it in a subprocess and return the results.
39    """
40    if (USE_NS and nsonly):
41        command = 'ip netns exec $NS ' + command
42
43    if '$' in command:
44        command = replace_keywords(command)
45
46    proc = subprocess.Popen(command,
47        shell=True,
48        stdout=subprocess.PIPE,
49        stderr=subprocess.PIPE)
50    (rawout, serr) = proc.communicate()
51
52    if proc.returncode != 0 and len(serr) > 0:
53        foutput = serr.decode("utf-8")
54    else:
55        foutput = rawout.decode("utf-8")
56
57    proc.stdout.close()
58    proc.stderr.close()
59    return proc, foutput
60
61
62def prepare_env(cmdlist):
63    """
64    Execute the setup/teardown commands for a test case. Optionally
65    terminate test execution if the command fails.
66    """
67    for cmdinfo in cmdlist:
68        if (type(cmdinfo) == list):
69            exit_codes = cmdinfo[1:]
70            cmd = cmdinfo[0]
71        else:
72            exit_codes = [0]
73            cmd = cmdinfo
74
75        if (len(cmd) == 0):
76            continue
77
78        (proc, foutput) = exec_cmd(cmd)
79
80        if proc.returncode not in exit_codes:
81            print
82            print("Could not execute:")
83            print(cmd)
84            print("\nError message:")
85            print(foutput)
86            print("\nAborting test run.")
87            ns_destroy()
88            exit(1)
89
90
91def test_runner(filtered_tests, args):
92    """
93    Driver function for the unit tests.
94
95    Prints information about the tests being run, executes the setup and
96    teardown commands and the command under test itself. Also determines
97    success/failure based on the information in the test case and generates
98    TAP output accordingly.
99    """
100    testlist = filtered_tests
101    tcount = len(testlist)
102    index = 1
103    tap = str(index) + ".." + str(tcount) + "\n"
104
105    for tidx in testlist:
106        result = True
107        tresult = ""
108        if "flower" in tidx["category"] and args.device == None:
109            continue
110        print("Test " + tidx["id"] + ": " + tidx["name"])
111        prepare_env(tidx["setup"])
112        (p, procout) = exec_cmd(tidx["cmdUnderTest"])
113        exit_code = p.returncode
114
115        if (exit_code != int(tidx["expExitCode"])):
116            result = False
117            print("exit:", exit_code, int(tidx["expExitCode"]))
118            print(procout)
119        else:
120            match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
121            (p, procout) = exec_cmd(tidx["verifyCmd"])
122            match_index = re.findall(match_pattern, procout)
123            if len(match_index) != int(tidx["matchCount"]):
124                result = False
125
126        if result == True:
127            tresult += "ok "
128        else:
129            tresult += "not ok "
130        tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
131
132        if result == False:
133            tap += procout
134
135        prepare_env(tidx["teardown"])
136        index += 1
137
138    return tap
139
140
141def ns_create():
142    """
143    Create the network namespace in which the tests will be run and set up
144    the required network devices for it.
145    """
146    if (USE_NS):
147        cmd = 'ip netns add $NS'
148        exec_cmd(cmd, False)
149        cmd = 'ip link add $DEV0 type veth peer name $DEV1'
150        exec_cmd(cmd, False)
151        cmd = 'ip link set $DEV1 netns $NS'
152        exec_cmd(cmd, False)
153        cmd = 'ip link set $DEV0 up'
154        exec_cmd(cmd, False)
155        cmd = 'ip -s $NS link set $DEV1 up'
156        exec_cmd(cmd, False)
157        cmd = 'ip link set $DEV2 netns $NS'
158        exec_cmd(cmd, False)
159        cmd = 'ip -s $NS link set $DEV2 up'
160        exec_cmd(cmd, False)
161
162
163def ns_destroy():
164    """
165    Destroy the network namespace for testing (and any associated network
166    devices as well)
167    """
168    if (USE_NS):
169        cmd = 'ip netns delete $NS'
170        exec_cmd(cmd, False)
171
172
173def has_blank_ids(idlist):
174    """
175    Search the list for empty ID fields and return true/false accordingly.
176    """
177    return not(all(k for k in idlist))
178
179
180def load_from_file(filename):
181    """
182    Open the JSON file containing the test cases and return them
183    as list of ordered dictionary objects.
184    """
185    try:
186        with open(filename) as test_data:
187            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
188    except json.JSONDecodeError as jde:
189        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
190        testlist = list()
191    else:
192        idlist = get_id_list(testlist)
193        if (has_blank_ids(idlist)):
194            for k in testlist:
195                k['filename'] = filename
196    return testlist
197
198
199def args_parse():
200    """
201    Create the argument parser.
202    """
203    parser = argparse.ArgumentParser(description='Linux TC unit tests')
204    return parser
205
206
207def set_args(parser):
208    """
209    Set the command line arguments for tdc.
210    """
211    parser.add_argument('-p', '--path', type=str,
212                        help='The full path to the tc executable to use')
213    parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
214                        help='Run tests only from the specified category, or if no category is specified, list known categories.')
215    parser.add_argument('-f', '--file', type=str,
216                        help='Run tests from the specified file')
217    parser.add_argument('-l', '--list', type=str, nargs='?', const="++", metavar='CATEGORY',
218                        help='List all test cases, or those only within the specified category')
219    parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
220                        help='Display the test case with specified id')
221    parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
222                        help='Execute the single test case with specified ID')
223    parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
224                        help='Generate ID numbers for new test cases')
225    parser.add_argument('-d', '--device',
226                        help='Execute the test case in flower category')
227    return parser
228
229
230def check_default_settings(args):
231    """
232    Process any arguments overriding the default settings, and ensure the
233    settings are correct.
234    """
235    # Allow for overriding specific settings
236    global NAMES
237
238    if args.path != None:
239         NAMES['TC'] = args.path
240    if args.device != None:
241         NAMES['DEV2'] = args.device
242    if not os.path.isfile(NAMES['TC']):
243        print("The specified tc path " + NAMES['TC'] + " does not exist.")
244        exit(1)
245
246
247def get_id_list(alltests):
248    """
249    Generate a list of all IDs in the test cases.
250    """
251    return [x["id"] for x in alltests]
252
253
254def check_case_id(alltests):
255    """
256    Check for duplicate test case IDs.
257    """
258    idl = get_id_list(alltests)
259    return [x for x in idl if idl.count(x) > 1]
260
261
262def does_id_exist(alltests, newid):
263    """
264    Check if a given ID already exists in the list of test cases.
265    """
266    idl = get_id_list(alltests)
267    return (any(newid == x for x in idl))
268
269
270def generate_case_ids(alltests):
271    """
272    If a test case has a blank ID field, generate a random hex ID for it
273    and then write the test cases back to disk.
274    """
275    import random
276    for c in alltests:
277        if (c["id"] == ""):
278            while True:
279                newid = str('%04x' % random.randrange(16**4))
280                if (does_id_exist(alltests, newid)):
281                    continue
282                else:
283                    c['id'] = newid
284                    break
285
286    ufilename = []
287    for c in alltests:
288        if ('filename' in c):
289            ufilename.append(c['filename'])
290    ufilename = get_unique_item(ufilename)
291    for f in ufilename:
292        testlist = []
293        for t in alltests:
294            if 'filename' in t:
295                if t['filename'] == f:
296                    del t['filename']
297                    testlist.append(t)
298        outfile = open(f, "w")
299        json.dump(testlist, outfile, indent=4)
300        outfile.close()
301
302
303def get_test_cases(args):
304    """
305    If a test case file is specified, retrieve tests from that file.
306    Otherwise, glob for all json files in subdirectories and load from
307    each one.
308    """
309    import fnmatch
310    if args.file != None:
311        if not os.path.isfile(args.file):
312            print("The specified test case file " + args.file + " does not exist.")
313            exit(1)
314        flist = [args.file]
315    else:
316        flist = []
317        for root, dirnames, filenames in os.walk('tc-tests'):
318            for filename in fnmatch.filter(filenames, '*.json'):
319                flist.append(os.path.join(root, filename))
320    alltests = list()
321    for casefile in flist:
322        alltests = alltests + (load_from_file(casefile))
323    return alltests
324
325
326def set_operation_mode(args):
327    """
328    Load the test case data and process remaining arguments to determine
329    what the script should do for this run, and call the appropriate
330    function.
331    """
332    alltests = get_test_cases(args)
333
334    if args.gen_id:
335        idlist = get_id_list(alltests)
336        if (has_blank_ids(idlist)):
337            alltests = generate_case_ids(alltests)
338        else:
339            print("No empty ID fields found in test files.")
340        exit(0)
341
342    duplicate_ids = check_case_id(alltests)
343    if (len(duplicate_ids) > 0):
344        print("The following test case IDs are not unique:")
345        print(str(set(duplicate_ids)))
346        print("Please correct them before continuing.")
347        exit(1)
348
349    ucat = get_test_categories(alltests)
350
351    if args.showID:
352        show_test_case_by_id(alltests, args.showID[0])
353        exit(0)
354
355    if args.execute:
356        target_id = args.execute[0]
357    else:
358        target_id = ""
359
360    if args.category:
361        if (args.category == '+c'):
362            print("Available categories:")
363            print_sll(ucat)
364            exit(0)
365        else:
366            target_category = args.category
367    else:
368        target_category = ""
369
370
371    testcases = get_categorized_testlist(alltests, ucat)
372
373    if args.list:
374        if (args.list == "++"):
375            list_test_cases(alltests)
376            exit(0)
377        elif(len(args.list) > 0):
378            if (args.list not in ucat):
379                print("Unknown category " + args.list)
380                print("Available categories:")
381                print_sll(ucat)
382                exit(1)
383            list_test_cases(testcases[args.list])
384            exit(0)
385
386    if (os.geteuid() != 0):
387        print("This script must be run with root privileges.\n")
388        exit(1)
389
390    ns_create()
391
392    if (len(target_category) == 0):
393        if (len(target_id) > 0):
394            alltests = list(filter(lambda x: target_id in x['id'], alltests))
395            if (len(alltests) == 0):
396                print("Cannot find a test case with ID matching " + target_id)
397                exit(1)
398        catresults = test_runner(alltests, args)
399        print("All test results: " + "\n\n" + catresults)
400    elif (len(target_category) > 0):
401        if (target_category == "flower") and args.device == None:
402            print("Please specify a NIC device (-d) to run category flower")
403            exit(1)
404        if (target_category not in ucat):
405            print("Specified category is not present in this file.")
406            exit(1)
407        else:
408            catresults = test_runner(testcases[target_category], args)
409            print("Category " + target_category + "\n\n" + catresults)
410
411    ns_destroy()
412
413
414def main():
415    """
416    Start of execution; set up argument parser and get the arguments,
417    and start operations.
418    """
419    parser = args_parse()
420    parser = set_args(parser)
421    (args, remaining) = parser.parse_known_args()
422    check_default_settings(args)
423
424    set_operation_mode(args)
425
426    exit(0)
427
428
429if __name__ == "__main__":
430    main()
431