xref: /linux/tools/testing/selftests/tc-testing/tdc.py (revision 93707cbabcc8baf2b2b5f4a99c1f08ee83eb7abd)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import importlib
15import json
16import subprocess
17import time
18from collections import OrderedDict
19from string import Template
20
21from tdc_config import *
22from tdc_helper import *
23
24import TdcPlugin
25
26USE_NS = True
27
28class PluginMgr:
29    def __init__(self, argparser):
30        super().__init__()
31        self.plugins = {}
32        self.plugin_instances = []
33        self.args = []
34        self.argparser = argparser
35
36        # TODO, put plugins in order
37        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
38        for dirpath, dirnames, filenames in os.walk(plugindir):
39            for fn in filenames:
40                if (fn.endswith('.py') and
41                    not fn == '__init__.py' and
42                    not fn.startswith('#') and
43                    not fn.startswith('.#')):
44                    mn = fn[0:-3]
45                    foo = importlib.import_module('plugins.' + mn)
46                    self.plugins[mn] = foo
47                    self.plugin_instances.append(foo.SubPlugin())
48
49    def call_pre_suite(self, testcount, testidlist):
50        for pgn_inst in self.plugin_instances:
51            pgn_inst.pre_suite(testcount, testidlist)
52
53    def call_post_suite(self, index):
54        for pgn_inst in reversed(self.plugin_instances):
55            pgn_inst.post_suite(index)
56
57    def call_pre_case(self, test_ordinal, testid):
58        for pgn_inst in self.plugin_instances:
59            try:
60                pgn_inst.pre_case(test_ordinal, testid)
61            except Exception as ee:
62                print('exception {} in call to pre_case for {} plugin'.
63                      format(ee, pgn_inst.__class__))
64                print('test_ordinal is {}'.format(test_ordinal))
65                print('testid is {}'.format(testid))
66                raise
67
68    def call_post_case(self):
69        for pgn_inst in reversed(self.plugin_instances):
70            pgn_inst.post_case()
71
72    def call_pre_execute(self):
73        for pgn_inst in self.plugin_instances:
74            pgn_inst.pre_execute()
75
76    def call_post_execute(self):
77        for pgn_inst in reversed(self.plugin_instances):
78            pgn_inst.post_execute()
79
80    def call_add_args(self, parser):
81        for pgn_inst in self.plugin_instances:
82            parser = pgn_inst.add_args(parser)
83        return parser
84
85    def call_check_args(self, args, remaining):
86        for pgn_inst in self.plugin_instances:
87            pgn_inst.check_args(args, remaining)
88
89    def call_adjust_command(self, stage, command):
90        for pgn_inst in self.plugin_instances:
91            command = pgn_inst.adjust_command(stage, command)
92        return command
93
94    @staticmethod
95    def _make_argparser(args):
96        self.argparser = argparse.ArgumentParser(
97            description='Linux TC unit tests')
98
99
100def replace_keywords(cmd):
101    """
102    For a given executable command, substitute any known
103    variables contained within NAMES with the correct values
104    """
105    tcmd = Template(cmd)
106    subcmd = tcmd.safe_substitute(NAMES)
107    return subcmd
108
109
110def exec_cmd(args, pm, stage, command, nsonly=True):
111    """
112    Perform any required modifications on an executable command, then run
113    it in a subprocess and return the results.
114    """
115    if len(command.strip()) == 0:
116        return None, None
117    if (USE_NS and nsonly):
118        command = 'ip netns exec $NS ' + command
119
120    if '$' in command:
121        command = replace_keywords(command)
122
123    command = pm.call_adjust_command(stage, command)
124    if args.verbose > 0:
125        print('command "{}"'.format(command))
126    proc = subprocess.Popen(command,
127        shell=True,
128        stdout=subprocess.PIPE,
129        stderr=subprocess.PIPE,
130        env=ENVIR)
131    (rawout, serr) = proc.communicate()
132
133    if proc.returncode != 0 and len(serr) > 0:
134        foutput = serr.decode("utf-8")
135    else:
136        foutput = rawout.decode("utf-8")
137
138    proc.stdout.close()
139    proc.stderr.close()
140    return proc, foutput
141
142
143def prepare_env(args, pm, stage, prefix, cmdlist):
144    """
145    Execute the setup/teardown commands for a test case.
146    Optionally terminate test execution if the command fails.
147    """
148    if args.verbose > 0:
149        print('{}'.format(prefix))
150    for cmdinfo in cmdlist:
151        if isinstance(cmdinfo, list):
152            exit_codes = cmdinfo[1:]
153            cmd = cmdinfo[0]
154        else:
155            exit_codes = [0]
156            cmd = cmdinfo
157
158        if not cmd:
159            continue
160
161        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
162
163        if proc and (proc.returncode not in exit_codes):
164            print('', file=sys.stderr)
165            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
166                  file=sys.stderr)
167            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
168                  file=sys.stderr)
169            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
170            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
171            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
172            raise Exception('"{}" did not complete successfully'.format(prefix))
173
174def run_one_test(pm, args, index, tidx):
175    result = True
176    tresult = ""
177    tap = ""
178    if args.verbose > 0:
179        print("\t====================\n=====> ", end="")
180    print("Test " + tidx["id"] + ": " + tidx["name"])
181
182    pm.call_pre_case(index, tidx['id'])
183    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
184
185    if (args.verbose > 0):
186        print('-----> execute stage')
187    pm.call_pre_execute()
188    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
189    exit_code = p.returncode
190    pm.call_post_execute()
191
192    if (exit_code != int(tidx["expExitCode"])):
193        result = False
194        print("exit:", exit_code, int(tidx["expExitCode"]))
195        print(procout)
196    else:
197        if args.verbose > 0:
198            print('-----> verify stage')
199        match_pattern = re.compile(
200            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
201        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
202        match_index = re.findall(match_pattern, procout)
203        if len(match_index) != int(tidx["matchCount"]):
204            result = False
205
206    if not result:
207        tresult += 'not '
208    tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
209    tap += tresult
210
211    if result == False:
212        tap += procout
213
214    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'])
215    pm.call_post_case()
216
217    index += 1
218
219    return tap
220
221def test_runner(pm, args, filtered_tests):
222    """
223    Driver function for the unit tests.
224
225    Prints information about the tests being run, executes the setup and
226    teardown commands and the command under test itself. Also determines
227    success/failure based on the information in the test case and generates
228    TAP output accordingly.
229    """
230    testlist = filtered_tests
231    tcount = len(testlist)
232    index = 1
233    tap = str(index) + ".." + str(tcount) + "\n"
234    badtest = None
235
236    pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
237
238    if args.verbose > 1:
239        print('Run tests here')
240    for tidx in testlist:
241        if "flower" in tidx["category"] and args.device == None:
242            continue
243        try:
244            badtest = tidx  # in case it goes bad
245            tap += run_one_test(pm, args, index, tidx)
246        except Exception as ee:
247            print('Exception {} (caught in test_runner, running test {} {} {})'.
248                  format(ee, index, tidx['id'], tidx['name']))
249            break
250        index += 1
251
252    # if we failed in setup or teardown,
253    # fill in the remaining tests with not ok
254    count = index
255    tap += 'about to flush the tap output if tests need to be skipped\n'
256    if tcount + 1 != index:
257        for tidx in testlist[index - 1:]:
258            msg = 'skipped - previous setup or teardown failed'
259            tap += 'ok {} - {} # {} {} {}\n'.format(
260                count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
261            count += 1
262
263    tap += 'done flushing skipped test tap output\n'
264    pm.call_post_suite(index)
265
266    return tap
267
268
269def ns_create(args, pm):
270    """
271    Create the network namespace in which the tests will be run and set up
272    the required network devices for it.
273    """
274    if (USE_NS):
275        cmd = 'ip netns add $NS'
276        exec_cmd(args, pm, 'pre', cmd, False)
277        cmd = 'ip link add $DEV0 type veth peer name $DEV1'
278        exec_cmd(args, pm, 'pre', cmd, False)
279        cmd = 'ip link set $DEV1 netns $NS'
280        exec_cmd(args, pm, 'pre', cmd, False)
281        cmd = 'ip link set $DEV0 up'
282        exec_cmd(args, pm, 'pre', cmd, False)
283        cmd = 'ip -n $NS link set $DEV1 up'
284        exec_cmd(args, pm, 'pre', cmd, False)
285        cmd = 'ip link set $DEV2 netns $NS'
286        exec_cmd(args, pm, 'pre', cmd, False)
287        cmd = 'ip -n $NS link set $DEV2 up'
288        exec_cmd(args, pm, 'pre', cmd, False)
289
290
291def ns_destroy(args, pm):
292    """
293    Destroy the network namespace for testing (and any associated network
294    devices as well)
295    """
296    if (USE_NS):
297        cmd = 'ip netns delete $NS'
298        exec_cmd(args, pm, 'post', cmd, False)
299
300
301def has_blank_ids(idlist):
302    """
303    Search the list for empty ID fields and return true/false accordingly.
304    """
305    return not(all(k for k in idlist))
306
307
308def load_from_file(filename):
309    """
310    Open the JSON file containing the test cases and return them
311    as list of ordered dictionary objects.
312    """
313    try:
314        with open(filename) as test_data:
315            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
316    except json.JSONDecodeError as jde:
317        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
318        testlist = list()
319    else:
320        idlist = get_id_list(testlist)
321        if (has_blank_ids(idlist)):
322            for k in testlist:
323                k['filename'] = filename
324    return testlist
325
326
327def args_parse():
328    """
329    Create the argument parser.
330    """
331    parser = argparse.ArgumentParser(description='Linux TC unit tests')
332    return parser
333
334
335def set_args(parser):
336    """
337    Set the command line arguments for tdc.
338    """
339    parser.add_argument(
340        '-p', '--path', type=str,
341        help='The full path to the tc executable to use')
342    sg = parser.add_argument_group(
343        'selection', 'select which test cases: ' +
344        'files plus directories; filtered by categories plus testids')
345    ag = parser.add_argument_group(
346        'action', 'select action to perform on selected test cases')
347
348    sg.add_argument(
349        '-D', '--directory', nargs='+', metavar='DIR',
350        help='Collect tests from the specified directory(ies) ' +
351        '(default [tc-tests])')
352    sg.add_argument(
353        '-f', '--file', nargs='+', metavar='FILE',
354        help='Run tests from the specified file(s)')
355    sg.add_argument(
356        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
357        help='Run tests only from the specified category/ies, ' +
358        'or if no category/ies is/are specified, list known categories.')
359    sg.add_argument(
360        '-e', '--execute', nargs='+', metavar='ID',
361        help='Execute the specified test cases with specified IDs')
362    ag.add_argument(
363        '-l', '--list', action='store_true',
364        help='List all test cases, or those only within the specified category')
365    ag.add_argument(
366        '-s', '--show', action='store_true', dest='showID',
367        help='Display the selected test cases')
368    ag.add_argument(
369        '-i', '--id', action='store_true', dest='gen_id',
370        help='Generate ID numbers for new test cases')
371    parser.add_argument(
372        '-v', '--verbose', action='count', default=0,
373        help='Show the commands that are being run')
374    parser.add_argument('-d', '--device',
375                        help='Execute the test case in flower category')
376    return parser
377
378
379def check_default_settings(args, remaining, pm):
380    """
381    Process any arguments overriding the default settings,
382    and ensure the settings are correct.
383    """
384    # Allow for overriding specific settings
385    global NAMES
386
387    if args.path != None:
388         NAMES['TC'] = args.path
389    if args.device != None:
390         NAMES['DEV2'] = args.device
391    if not os.path.isfile(NAMES['TC']):
392        print("The specified tc path " + NAMES['TC'] + " does not exist.")
393        exit(1)
394
395    pm.call_check_args(args, remaining)
396
397
398def get_id_list(alltests):
399    """
400    Generate a list of all IDs in the test cases.
401    """
402    return [x["id"] for x in alltests]
403
404
405def check_case_id(alltests):
406    """
407    Check for duplicate test case IDs.
408    """
409    idl = get_id_list(alltests)
410    return [x for x in idl if idl.count(x) > 1]
411
412
413def does_id_exist(alltests, newid):
414    """
415    Check if a given ID already exists in the list of test cases.
416    """
417    idl = get_id_list(alltests)
418    return (any(newid == x for x in idl))
419
420
421def generate_case_ids(alltests):
422    """
423    If a test case has a blank ID field, generate a random hex ID for it
424    and then write the test cases back to disk.
425    """
426    import random
427    for c in alltests:
428        if (c["id"] == ""):
429            while True:
430                newid = str('%04x' % random.randrange(16**4))
431                if (does_id_exist(alltests, newid)):
432                    continue
433                else:
434                    c['id'] = newid
435                    break
436
437    ufilename = []
438    for c in alltests:
439        if ('filename' in c):
440            ufilename.append(c['filename'])
441    ufilename = get_unique_item(ufilename)
442    for f in ufilename:
443        testlist = []
444        for t in alltests:
445            if 'filename' in t:
446                if t['filename'] == f:
447                    del t['filename']
448                    testlist.append(t)
449        outfile = open(f, "w")
450        json.dump(testlist, outfile, indent=4)
451        outfile.close()
452
453def filter_tests_by_id(args, testlist):
454    '''
455    Remove tests from testlist that are not in the named id list.
456    If id list is empty, return empty list.
457    '''
458    newlist = list()
459    if testlist and args.execute:
460        target_ids = args.execute
461
462        if isinstance(target_ids, list) and (len(target_ids) > 0):
463            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
464    return newlist
465
466def filter_tests_by_category(args, testlist):
467    '''
468    Remove tests from testlist that are not in a named category.
469    '''
470    answer = list()
471    if args.category and testlist:
472        test_ids = list()
473        for catg in set(args.category):
474            if catg == '+c':
475                continue
476            print('considering category {}'.format(catg))
477            for tc in testlist:
478                if catg in tc['category'] and tc['id'] not in test_ids:
479                    answer.append(tc)
480                    test_ids.append(tc['id'])
481
482    return answer
483
484def get_test_cases(args):
485    """
486    If a test case file is specified, retrieve tests from that file.
487    Otherwise, glob for all json files in subdirectories and load from
488    each one.
489    Also, if requested, filter by category, and add tests matching
490    certain ids.
491    """
492    import fnmatch
493
494    flist = []
495    testdirs = ['tc-tests']
496
497    if args.file:
498        # at least one file was specified - remove the default directory
499        testdirs = []
500
501        for ff in args.file:
502            if not os.path.isfile(ff):
503                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
504            else:
505                flist.append(os.path.abspath(ff))
506
507    if args.directory:
508        testdirs = args.directory
509
510    for testdir in testdirs:
511        for root, dirnames, filenames in os.walk(testdir):
512            for filename in fnmatch.filter(filenames, '*.json'):
513                candidate = os.path.abspath(os.path.join(root, filename))
514                if candidate not in testdirs:
515                    flist.append(candidate)
516
517    alltestcases = list()
518    for casefile in flist:
519        alltestcases = alltestcases + (load_from_file(casefile))
520
521    allcatlist = get_test_categories(alltestcases)
522    allidlist = get_id_list(alltestcases)
523
524    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
525    idtestcases = filter_tests_by_id(args, alltestcases)
526    cattestcases = filter_tests_by_category(args, alltestcases)
527
528    cat_ids = [x['id'] for x in cattestcases]
529    if args.execute:
530        if args.category:
531            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
532        else:
533            alltestcases = idtestcases
534    else:
535        if cat_ids:
536            alltestcases = cattestcases
537        else:
538            # just accept the existing value of alltestcases,
539            # which has been filtered by file/directory
540            pass
541
542    return allcatlist, allidlist, testcases_by_cats, alltestcases
543
544
545def set_operation_mode(pm, args):
546    """
547    Load the test case data and process remaining arguments to determine
548    what the script should do for this run, and call the appropriate
549    function.
550    """
551    ucat, idlist, testcases, alltests = get_test_cases(args)
552
553    if args.gen_id:
554        if (has_blank_ids(idlist)):
555            alltests = generate_case_ids(alltests)
556        else:
557            print("No empty ID fields found in test files.")
558        exit(0)
559
560    duplicate_ids = check_case_id(alltests)
561    if (len(duplicate_ids) > 0):
562        print("The following test case IDs are not unique:")
563        print(str(set(duplicate_ids)))
564        print("Please correct them before continuing.")
565        exit(1)
566
567    if args.showID:
568        for atest in alltests:
569            print_test_case(atest)
570        exit(0)
571
572    if isinstance(args.category, list) and (len(args.category) == 0):
573        print("Available categories:")
574        print_sll(ucat)
575        exit(0)
576
577    if args.list:
578        if args.list:
579            list_test_cases(alltests)
580            exit(0)
581
582    if (os.geteuid() != 0):
583        print("This script must be run with root privileges.\n")
584        exit(1)
585
586    ns_create(args, pm)
587
588    if len(alltests):
589        catresults = test_runner(pm, args, alltests)
590    else:
591        catresults = 'No tests found\n'
592    print('All test results: \n\n{}'.format(catresults))
593
594    ns_destroy(args, pm)
595
596
597def main():
598    """
599    Start of execution; set up argument parser and get the arguments,
600    and start operations.
601    """
602    parser = args_parse()
603    parser = set_args(parser)
604    pm = PluginMgr(parser)
605    parser = pm.call_add_args(parser)
606    (args, remaining) = parser.parse_known_args()
607    args.NAMES = NAMES
608    check_default_settings(args, remaining, pm)
609    if args.verbose > 2:
610        print('args is {}'.format(args))
611
612    set_operation_mode(pm, args)
613
614    exit(0)
615
616
617if __name__ == "__main__":
618    main()
619