xref: /linux/tools/testing/selftests/tc-testing/tdc.py (revision 6fac733d9d07c4fcc349a44add75c6435cc3f18c)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import json
15import subprocess
16from collections import OrderedDict
17from string import Template
18
19from tdc_config import *
20from tdc_helper import *
21
22
23USE_NS = True
24
25
26def replace_keywords(cmd):
27    """
28    For a given executable command, substitute any known
29    variables contained within NAMES with the correct values
30    """
31    tcmd = Template(cmd)
32    subcmd = tcmd.safe_substitute(NAMES)
33    return subcmd
34
35
36def exec_cmd(command, nsonly=True):
37    """
38    Perform any required modifications on an executable command, then run
39    it in a subprocess and return the results.
40    """
41    if (USE_NS and nsonly):
42        command = 'ip netns exec $NS ' + command
43
44    if '$' in command:
45        command = replace_keywords(command)
46
47    proc = subprocess.Popen(command,
48        shell=True,
49        stdout=subprocess.PIPE,
50        stderr=subprocess.PIPE)
51    (rawout, serr) = proc.communicate()
52
53    if proc.returncode != 0 and len(serr) > 0:
54        foutput = serr.decode("utf-8")
55    else:
56        foutput = rawout.decode("utf-8")
57
58    proc.stdout.close()
59    proc.stderr.close()
60    return proc, foutput
61
62
63def prepare_env(cmdlist):
64    """
65    Execute the setup/teardown commands for a test case. Optionally
66    terminate test execution if the command fails.
67    """
68    for cmdinfo in cmdlist:
69        if (type(cmdinfo) == list):
70            exit_codes = cmdinfo[1:]
71            cmd = cmdinfo[0]
72        else:
73            exit_codes = [0]
74            cmd = cmdinfo
75
76        if (len(cmd) == 0):
77            continue
78
79        (proc, foutput) = exec_cmd(cmd)
80
81        if proc.returncode not in exit_codes:
82            print
83            print("Could not execute:")
84            print(cmd)
85            print("\nError message:")
86            print(foutput)
87            print("\nAborting test run.")
88            # ns_destroy()
89            raise Exception('prepare_env did not complete successfully')
90
91def run_one_test(index, tidx):
92    result = True
93    tresult = ""
94    tap = ""
95    print("Test " + tidx["id"] + ": " + tidx["name"])
96    prepare_env(tidx["setup"])
97    (p, procout) = exec_cmd(tidx["cmdUnderTest"])
98    exit_code = p.returncode
99
100    if (exit_code != int(tidx["expExitCode"])):
101        result = False
102        print("exit:", exit_code, int(tidx["expExitCode"]))
103        print(procout)
104    else:
105        match_pattern = re.compile(str(tidx["matchPattern"]),
106                                   re.DOTALL | re.MULTILINE)
107        (p, procout) = exec_cmd(tidx["verifyCmd"])
108        match_index = re.findall(match_pattern, procout)
109        if len(match_index) != int(tidx["matchCount"]):
110            result = False
111
112    if not result:
113        tresult += "not "
114    tresult += "ok {} - {} # {}\n".format(str(index), tidx['id'], tidx["name"])
115    tap += tresult
116
117    if result == False:
118        tap += procout
119
120    prepare_env(tidx["teardown"])
121    index += 1
122
123    return tap
124
125def test_runner(filtered_tests, args):
126    """
127    Driver function for the unit tests.
128
129    Prints information about the tests being run, executes the setup and
130    teardown commands and the command under test itself. Also determines
131    success/failure based on the information in the test case and generates
132    TAP output accordingly.
133    """
134    testlist = filtered_tests
135    tcount = len(testlist)
136    index = 1
137    tap = str(index) + ".." + str(tcount) + "\n"
138
139    for tidx in testlist:
140        if "flower" in tidx["category"] and args.device == None:
141            continue
142        try:
143            badtest = tidx  # in case it goes bad
144            tap += run_one_test(index, tidx)
145        except Exception as ee:
146            print('Exception {} (caught in test_runner, running test {} {} {})'.
147                  format(ee, index, tidx['id'], tidx['name']))
148            break
149        index += 1
150
151    count = index
152    tap += 'about to flush the tap output if tests need to be skipped\n'
153    if tcount + 1 != index:
154        for tidx in testlist[index - 1:]:
155            msg = 'skipped - previous setup or teardown failed'
156            tap += 'ok {} - {} # {} {} {} \n'.format(
157                count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
158            count += 1
159
160    tap += 'done flushing skipped test tap output\n'
161
162    return tap
163
164
165def ns_create():
166    """
167    Create the network namespace in which the tests will be run and set up
168    the required network devices for it.
169    """
170    if (USE_NS):
171        cmd = 'ip netns add $NS'
172        exec_cmd(cmd, False)
173        cmd = 'ip link add $DEV0 type veth peer name $DEV1'
174        exec_cmd(cmd, False)
175        cmd = 'ip link set $DEV1 netns $NS'
176        exec_cmd(cmd, False)
177        cmd = 'ip link set $DEV0 up'
178        exec_cmd(cmd, False)
179        cmd = 'ip -n $NS link set $DEV1 up'
180        exec_cmd(cmd, False)
181        cmd = 'ip link set $DEV2 netns $NS'
182        exec_cmd(cmd, False)
183        cmd = 'ip -n $NS link set $DEV2 up'
184        exec_cmd(cmd, False)
185
186
187def ns_destroy():
188    """
189    Destroy the network namespace for testing (and any associated network
190    devices as well)
191    """
192    if (USE_NS):
193        cmd = 'ip netns delete $NS'
194        exec_cmd(cmd, False)
195
196
197def has_blank_ids(idlist):
198    """
199    Search the list for empty ID fields and return true/false accordingly.
200    """
201    return not(all(k for k in idlist))
202
203
204def load_from_file(filename):
205    """
206    Open the JSON file containing the test cases and return them
207    as list of ordered dictionary objects.
208    """
209    try:
210        with open(filename) as test_data:
211            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
212    except json.JSONDecodeError as jde:
213        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
214        testlist = list()
215    else:
216        idlist = get_id_list(testlist)
217        if (has_blank_ids(idlist)):
218            for k in testlist:
219                k['filename'] = filename
220    return testlist
221
222
223def args_parse():
224    """
225    Create the argument parser.
226    """
227    parser = argparse.ArgumentParser(description='Linux TC unit tests')
228    return parser
229
230
231def set_args(parser):
232    """
233    Set the command line arguments for tdc.
234    """
235    parser.add_argument(
236        '-p', '--path', type=str,
237        help='The full path to the tc executable to use')
238    sg = parser.add_argument_group(
239        'selection', 'select which test cases: ' +
240        'files plus directories; filtered by categories plus testids')
241    ag = parser.add_argument_group(
242        'action', 'select action to perform on selected test cases')
243
244    sg.add_argument(
245        '-D', '--directory', nargs='+', metavar='DIR',
246        help='Collect tests from the specified directory(ies) ' +
247        '(default [tc-tests])')
248    sg.add_argument(
249        '-f', '--file', nargs='+', metavar='FILE',
250        help='Run tests from the specified file(s)')
251    sg.add_argument(
252        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
253        help='Run tests only from the specified category/ies, ' +
254        'or if no category/ies is/are specified, list known categories.')
255    sg.add_argument(
256        '-e', '--execute', nargs='+', metavar='ID',
257        help='Execute the specified test cases with specified IDs')
258    ag.add_argument(
259        '-l', '--list', action='store_true',
260        help='List all test cases, or those only within the specified category')
261    ag.add_argument(
262        '-s', '--show', action='store_true', dest='showID',
263        help='Display the selected test cases')
264    ag.add_argument(
265        '-i', '--id', action='store_true', dest='gen_id',
266        help='Generate ID numbers for new test cases')
267    parser.add_argument(
268        '-v', '--verbose', action='count', default=0,
269        help='Show the commands that are being run')
270    parser.add_argument('-d', '--device',
271                        help='Execute the test case in flower category')
272    return parser
273
274
275def check_default_settings(args):
276    """
277    Process any arguments overriding the default settings, and ensure the
278    settings are correct.
279    """
280    # Allow for overriding specific settings
281    global NAMES
282
283    if args.path != None:
284         NAMES['TC'] = args.path
285    if args.device != None:
286         NAMES['DEV2'] = args.device
287    if not os.path.isfile(NAMES['TC']):
288        print("The specified tc path " + NAMES['TC'] + " does not exist.")
289        exit(1)
290
291
292def get_id_list(alltests):
293    """
294    Generate a list of all IDs in the test cases.
295    """
296    return [x["id"] for x in alltests]
297
298
299def check_case_id(alltests):
300    """
301    Check for duplicate test case IDs.
302    """
303    idl = get_id_list(alltests)
304    # print('check_case_id:  idl is {}'.format(idl))
305    # answer = list()
306    # for x in idl:
307    #     print('Looking at {}'.format(x))
308    #     print('what the heck is idl.count(x)???   {}'.format(idl.count(x)))
309    #     if idl.count(x) > 1:
310    #         answer.append(x)
311    #         print(' ... append it {}'.format(x))
312    return [x for x in idl if idl.count(x) > 1]
313    return answer
314
315
316def does_id_exist(alltests, newid):
317    """
318    Check if a given ID already exists in the list of test cases.
319    """
320    idl = get_id_list(alltests)
321    return (any(newid == x for x in idl))
322
323
324def generate_case_ids(alltests):
325    """
326    If a test case has a blank ID field, generate a random hex ID for it
327    and then write the test cases back to disk.
328    """
329    import random
330    for c in alltests:
331        if (c["id"] == ""):
332            while True:
333                newid = str('%04x' % random.randrange(16**4))
334                if (does_id_exist(alltests, newid)):
335                    continue
336                else:
337                    c['id'] = newid
338                    break
339
340    ufilename = []
341    for c in alltests:
342        if ('filename' in c):
343            ufilename.append(c['filename'])
344    ufilename = get_unique_item(ufilename)
345    for f in ufilename:
346        testlist = []
347        for t in alltests:
348            if 'filename' in t:
349                if t['filename'] == f:
350                    del t['filename']
351                    testlist.append(t)
352        outfile = open(f, "w")
353        json.dump(testlist, outfile, indent=4)
354        outfile.close()
355
356def filter_tests_by_id(args, testlist):
357    '''
358    Remove tests from testlist that are not in the named id list.
359    If id list is empty, return empty list.
360    '''
361    newlist = list()
362    if testlist and args.execute:
363        target_ids = args.execute
364
365        if isinstance(target_ids, list) and (len(target_ids) > 0):
366            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
367    return newlist
368
369def filter_tests_by_category(args, testlist):
370    '''
371    Remove tests from testlist that are not in a named category.
372    '''
373    answer = list()
374    if args.category and testlist:
375        test_ids = list()
376        for catg in set(args.category):
377            if catg == '+c':
378                continue
379            print('considering category {}'.format(catg))
380            for tc in testlist:
381                if catg in tc['category'] and tc['id'] not in test_ids:
382                    answer.append(tc)
383                    test_ids.append(tc['id'])
384
385    return answer
386
387def get_test_cases(args):
388    """
389    If a test case file is specified, retrieve tests from that file.
390    Otherwise, glob for all json files in subdirectories and load from
391    each one.
392    Also, if requested, filter by category, and add tests matching
393    certain ids.
394    """
395    import fnmatch
396
397    flist = []
398    testdirs = ['tc-tests']
399
400    if args.file:
401        # at least one file was specified - remove the default directory
402        testdirs = []
403
404        for ff in args.file:
405            if not os.path.isfile(ff):
406                print("IGNORING file " + ff + " \n\tBECAUSE does not exist.")
407            else:
408                flist.append(os.path.abspath(ff))
409
410    if args.directory:
411        testdirs = args.directory
412
413    for testdir in testdirs:
414        for root, dirnames, filenames in os.walk(testdir):
415            for filename in fnmatch.filter(filenames, '*.json'):
416                candidate = os.path.abspath(os.path.join(root, filename))
417                if candidate not in testdirs:
418                    flist.append(candidate)
419
420    alltestcases = list()
421    for casefile in flist:
422        alltestcases = alltestcases + (load_from_file(casefile))
423
424    allcatlist = get_test_categories(alltestcases)
425    allidlist = get_id_list(alltestcases)
426
427    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
428    idtestcases = filter_tests_by_id(args, alltestcases)
429    cattestcases = filter_tests_by_category(args, alltestcases)
430
431    cat_ids = [x['id'] for x in cattestcases]
432    if args.execute:
433        if args.category:
434            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
435        else:
436            alltestcases = idtestcases
437    else:
438        if cat_ids:
439            alltestcases = cattestcases
440        else:
441            # just accept the existing value of alltestcases,
442            # which has been filtered by file/directory
443            pass
444
445    return allcatlist, allidlist, testcases_by_cats, alltestcases
446
447
448def set_operation_mode(args):
449    """
450    Load the test case data and process remaining arguments to determine
451    what the script should do for this run, and call the appropriate
452    function.
453    """
454    ucat, idlist, testcases, alltests = get_test_cases(args)
455
456    if args.gen_id:
457        if (has_blank_ids(idlist)):
458            alltests = generate_case_ids(alltests)
459        else:
460            print("No empty ID fields found in test files.")
461        exit(0)
462
463    duplicate_ids = check_case_id(alltests)
464    if (len(duplicate_ids) > 0):
465        print("The following test case IDs are not unique:")
466        print(str(set(duplicate_ids)))
467        print("Please correct them before continuing.")
468        exit(1)
469
470    if args.showID:
471        for atest in alltests:
472            print_test_case(atest)
473        exit(0)
474
475    if isinstance(args.category, list) and (len(args.category) == 0):
476        print("Available categories:")
477        print_sll(ucat)
478        exit(0)
479
480    if args.list:
481        if args.list:
482            list_test_cases(alltests)
483            exit(0)
484
485    if (os.geteuid() != 0):
486        print("This script must be run with root privileges.\n")
487        exit(1)
488
489    ns_create()
490
491    catresults = test_runner(alltests, args)
492    print('All test results: \n\n{}'.format(catresults))
493
494    ns_destroy()
495
496
497def main():
498    """
499    Start of execution; set up argument parser and get the arguments,
500    and start operations.
501    """
502    parser = args_parse()
503    parser = set_args(parser)
504    (args, remaining) = parser.parse_known_args()
505    check_default_settings(args)
506
507    set_operation_mode(args)
508
509    exit(0)
510
511
512if __name__ == "__main__":
513    main()
514