xref: /linux/tools/testing/selftests/tc-testing/tdc.py (revision d96fc832bcb6269d96e33d506f33033d7ed08598)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import importlib
15import json
16import subprocess
17import time
18import traceback
19from collections import OrderedDict
20from string import Template
21
22from tdc_config import *
23from tdc_helper import *
24
25import TdcPlugin
26
27
28class PluginMgrTestFail(Exception):
29    def __init__(self, stage, output, message):
30        self.stage = stage
31        self.output = output
32        self.message = message
33
34class PluginMgr:
35    def __init__(self, argparser):
36        super().__init__()
37        self.plugins = {}
38        self.plugin_instances = []
39        self.args = []
40        self.argparser = argparser
41
42        # TODO, put plugins in order
43        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
44        for dirpath, dirnames, filenames in os.walk(plugindir):
45            for fn in filenames:
46                if (fn.endswith('.py') and
47                    not fn == '__init__.py' and
48                    not fn.startswith('#') and
49                    not fn.startswith('.#')):
50                    mn = fn[0:-3]
51                    foo = importlib.import_module('plugins.' + mn)
52                    self.plugins[mn] = foo
53                    self.plugin_instances.append(foo.SubPlugin())
54
55    def call_pre_suite(self, testcount, testidlist):
56        for pgn_inst in self.plugin_instances:
57            pgn_inst.pre_suite(testcount, testidlist)
58
59    def call_post_suite(self, index):
60        for pgn_inst in reversed(self.plugin_instances):
61            pgn_inst.post_suite(index)
62
63    def call_pre_case(self, test_ordinal, testid):
64        for pgn_inst in self.plugin_instances:
65            try:
66                pgn_inst.pre_case(test_ordinal, testid)
67            except Exception as ee:
68                print('exception {} in call to pre_case for {} plugin'.
69                      format(ee, pgn_inst.__class__))
70                print('test_ordinal is {}'.format(test_ordinal))
71                print('testid is {}'.format(testid))
72                raise
73
74    def call_post_case(self):
75        for pgn_inst in reversed(self.plugin_instances):
76            pgn_inst.post_case()
77
78    def call_pre_execute(self):
79        for pgn_inst in self.plugin_instances:
80            pgn_inst.pre_execute()
81
82    def call_post_execute(self):
83        for pgn_inst in reversed(self.plugin_instances):
84            pgn_inst.post_execute()
85
86    def call_add_args(self, parser):
87        for pgn_inst in self.plugin_instances:
88            parser = pgn_inst.add_args(parser)
89        return parser
90
91    def call_check_args(self, args, remaining):
92        for pgn_inst in self.plugin_instances:
93            pgn_inst.check_args(args, remaining)
94
95    def call_adjust_command(self, stage, command):
96        for pgn_inst in self.plugin_instances:
97            command = pgn_inst.adjust_command(stage, command)
98        return command
99
100    @staticmethod
101    def _make_argparser(args):
102        self.argparser = argparse.ArgumentParser(
103            description='Linux TC unit tests')
104
105
106def replace_keywords(cmd):
107    """
108    For a given executable command, substitute any known
109    variables contained within NAMES with the correct values
110    """
111    tcmd = Template(cmd)
112    subcmd = tcmd.safe_substitute(NAMES)
113    return subcmd
114
115
116def exec_cmd(args, pm, stage, command):
117    """
118    Perform any required modifications on an executable command, then run
119    it in a subprocess and return the results.
120    """
121    if len(command.strip()) == 0:
122        return None, None
123    if '$' in command:
124        command = replace_keywords(command)
125
126    command = pm.call_adjust_command(stage, command)
127    if args.verbose > 0:
128        print('command "{}"'.format(command))
129    proc = subprocess.Popen(command,
130        shell=True,
131        stdout=subprocess.PIPE,
132        stderr=subprocess.PIPE,
133        env=ENVIR)
134    (rawout, serr) = proc.communicate()
135
136    if proc.returncode != 0 and len(serr) > 0:
137        foutput = serr.decode("utf-8")
138    else:
139        foutput = rawout.decode("utf-8")
140
141    proc.stdout.close()
142    proc.stderr.close()
143    return proc, foutput
144
145
146def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
147    """
148    Execute the setup/teardown commands for a test case.
149    Optionally terminate test execution if the command fails.
150    """
151    if args.verbose > 0:
152        print('{}'.format(prefix))
153    for cmdinfo in cmdlist:
154        if isinstance(cmdinfo, list):
155            exit_codes = cmdinfo[1:]
156            cmd = cmdinfo[0]
157        else:
158            exit_codes = [0]
159            cmd = cmdinfo
160
161        if not cmd:
162            continue
163
164        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
165
166        if proc and (proc.returncode not in exit_codes):
167            print('', file=sys.stderr)
168            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
169                  file=sys.stderr)
170            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
171                  file=sys.stderr)
172            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
173            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
174            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
175            raise PluginMgrTestFail(
176                stage, output,
177                '"{}" did not complete successfully'.format(prefix))
178
179def run_one_test(pm, args, index, tidx):
180    result = True
181    tresult = ""
182    tap = ""
183    if args.verbose > 0:
184        print("\t====================\n=====> ", end="")
185    print("Test " + tidx["id"] + ": " + tidx["name"])
186
187    pm.call_pre_case(index, tidx['id'])
188    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
189
190    if (args.verbose > 0):
191        print('-----> execute stage')
192    pm.call_pre_execute()
193    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
194    exit_code = p.returncode
195    pm.call_post_execute()
196
197    if (exit_code != int(tidx["expExitCode"])):
198        result = False
199        print("exit:", exit_code, int(tidx["expExitCode"]))
200        print(procout)
201    else:
202        if args.verbose > 0:
203            print('-----> verify stage')
204        match_pattern = re.compile(
205            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
206        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
207        if procout:
208            match_index = re.findall(match_pattern, procout)
209            if len(match_index) != int(tidx["matchCount"]):
210                result = False
211        elif int(tidx["matchCount"]) != 0:
212            result = False
213
214    if not result:
215        tresult += 'not '
216    tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
217    tap += tresult
218
219    if result == False:
220        if procout:
221            tap += procout
222        else:
223            tap += 'No output!\n'
224
225    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
226    pm.call_post_case()
227
228    index += 1
229
230    return tap
231
232def test_runner(pm, args, filtered_tests):
233    """
234    Driver function for the unit tests.
235
236    Prints information about the tests being run, executes the setup and
237    teardown commands and the command under test itself. Also determines
238    success/failure based on the information in the test case and generates
239    TAP output accordingly.
240    """
241    testlist = filtered_tests
242    tcount = len(testlist)
243    index = 1
244    tap = str(index) + ".." + str(tcount) + "\n"
245    badtest = None
246    stage = None
247    emergency_exit = False
248    emergency_exit_message = ''
249
250    try:
251        pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
252    except Exception as ee:
253        ex_type, ex, ex_tb = sys.exc_info()
254        print('Exception {} {} (caught in pre_suite).'.
255              format(ex_type, ex))
256        # when the extra print statements are uncommented,
257        # the traceback does not appear between them
258        # (it appears way earlier in the tdc.py output)
259        # so don't bother ...
260        # print('--------------------(')
261        # print('traceback')
262        traceback.print_tb(ex_tb)
263        # print('--------------------)')
264        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
265        emergency_exit = True
266        stage = 'pre-SUITE'
267
268    if emergency_exit:
269        pm.call_post_suite(index)
270        return emergency_exit_message
271    if args.verbose > 1:
272        print('give test rig 2 seconds to stabilize')
273    time.sleep(2)
274    for tidx in testlist:
275        if "flower" in tidx["category"] and args.device == None:
276            if args.verbose > 1:
277                print('Not executing test {} {} because DEV2 not defined'.
278                      format(tidx['id'], tidx['name']))
279            continue
280        try:
281            badtest = tidx  # in case it goes bad
282            tap += run_one_test(pm, args, index, tidx)
283        except PluginMgrTestFail as pmtf:
284            ex_type, ex, ex_tb = sys.exc_info()
285            stage = pmtf.stage
286            message = pmtf.message
287            output = pmtf.output
288            print(message)
289            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
290                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
291            print('---------------')
292            print('traceback')
293            traceback.print_tb(ex_tb)
294            print('---------------')
295            if stage == 'teardown':
296                print('accumulated output for this test:')
297                if pmtf.output:
298                    print(pmtf.output)
299            print('---------------')
300            break
301        index += 1
302
303    # if we failed in setup or teardown,
304    # fill in the remaining tests with ok-skipped
305    count = index
306    tap += 'about to flush the tap output if tests need to be skipped\n'
307    if tcount + 1 != index:
308        for tidx in testlist[index - 1:]:
309            msg = 'skipped - previous {} failed'.format(stage)
310            tap += 'ok {} - {} # {} {} {}\n'.format(
311                count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
312            count += 1
313
314    tap += 'done flushing skipped test tap output\n'
315    pm.call_post_suite(index)
316
317    return tap
318
319def has_blank_ids(idlist):
320    """
321    Search the list for empty ID fields and return true/false accordingly.
322    """
323    return not(all(k for k in idlist))
324
325
326def load_from_file(filename):
327    """
328    Open the JSON file containing the test cases and return them
329    as list of ordered dictionary objects.
330    """
331    try:
332        with open(filename) as test_data:
333            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
334    except json.JSONDecodeError as jde:
335        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
336        testlist = list()
337    else:
338        idlist = get_id_list(testlist)
339        if (has_blank_ids(idlist)):
340            for k in testlist:
341                k['filename'] = filename
342    return testlist
343
344
345def args_parse():
346    """
347    Create the argument parser.
348    """
349    parser = argparse.ArgumentParser(description='Linux TC unit tests')
350    return parser
351
352
353def set_args(parser):
354    """
355    Set the command line arguments for tdc.
356    """
357    parser.add_argument(
358        '-p', '--path', type=str,
359        help='The full path to the tc executable to use')
360    sg = parser.add_argument_group(
361        'selection', 'select which test cases: ' +
362        'files plus directories; filtered by categories plus testids')
363    ag = parser.add_argument_group(
364        'action', 'select action to perform on selected test cases')
365
366    sg.add_argument(
367        '-D', '--directory', nargs='+', metavar='DIR',
368        help='Collect tests from the specified directory(ies) ' +
369        '(default [tc-tests])')
370    sg.add_argument(
371        '-f', '--file', nargs='+', metavar='FILE',
372        help='Run tests from the specified file(s)')
373    sg.add_argument(
374        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
375        help='Run tests only from the specified category/ies, ' +
376        'or if no category/ies is/are specified, list known categories.')
377    sg.add_argument(
378        '-e', '--execute', nargs='+', metavar='ID',
379        help='Execute the specified test cases with specified IDs')
380    ag.add_argument(
381        '-l', '--list', action='store_true',
382        help='List all test cases, or those only within the specified category')
383    ag.add_argument(
384        '-s', '--show', action='store_true', dest='showID',
385        help='Display the selected test cases')
386    ag.add_argument(
387        '-i', '--id', action='store_true', dest='gen_id',
388        help='Generate ID numbers for new test cases')
389    parser.add_argument(
390        '-v', '--verbose', action='count', default=0,
391        help='Show the commands that are being run')
392    parser.add_argument('-d', '--device',
393                        help='Execute the test case in flower category')
394    return parser
395
396
397def check_default_settings(args, remaining, pm):
398    """
399    Process any arguments overriding the default settings,
400    and ensure the settings are correct.
401    """
402    # Allow for overriding specific settings
403    global NAMES
404
405    if args.path != None:
406        NAMES['TC'] = args.path
407    if args.device != None:
408        NAMES['DEV2'] = args.device
409    if not os.path.isfile(NAMES['TC']):
410        print("The specified tc path " + NAMES['TC'] + " does not exist.")
411        exit(1)
412
413    pm.call_check_args(args, remaining)
414
415
416def get_id_list(alltests):
417    """
418    Generate a list of all IDs in the test cases.
419    """
420    return [x["id"] for x in alltests]
421
422
423def check_case_id(alltests):
424    """
425    Check for duplicate test case IDs.
426    """
427    idl = get_id_list(alltests)
428    return [x for x in idl if idl.count(x) > 1]
429
430
431def does_id_exist(alltests, newid):
432    """
433    Check if a given ID already exists in the list of test cases.
434    """
435    idl = get_id_list(alltests)
436    return (any(newid == x for x in idl))
437
438
439def generate_case_ids(alltests):
440    """
441    If a test case has a blank ID field, generate a random hex ID for it
442    and then write the test cases back to disk.
443    """
444    import random
445    for c in alltests:
446        if (c["id"] == ""):
447            while True:
448                newid = str('{:04x}'.format(random.randrange(16**4)))
449                if (does_id_exist(alltests, newid)):
450                    continue
451                else:
452                    c['id'] = newid
453                    break
454
455    ufilename = []
456    for c in alltests:
457        if ('filename' in c):
458            ufilename.append(c['filename'])
459    ufilename = get_unique_item(ufilename)
460    for f in ufilename:
461        testlist = []
462        for t in alltests:
463            if 'filename' in t:
464                if t['filename'] == f:
465                    del t['filename']
466                    testlist.append(t)
467        outfile = open(f, "w")
468        json.dump(testlist, outfile, indent=4)
469        outfile.close()
470
471def filter_tests_by_id(args, testlist):
472    '''
473    Remove tests from testlist that are not in the named id list.
474    If id list is empty, return empty list.
475    '''
476    newlist = list()
477    if testlist and args.execute:
478        target_ids = args.execute
479
480        if isinstance(target_ids, list) and (len(target_ids) > 0):
481            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
482    return newlist
483
484def filter_tests_by_category(args, testlist):
485    '''
486    Remove tests from testlist that are not in a named category.
487    '''
488    answer = list()
489    if args.category and testlist:
490        test_ids = list()
491        for catg in set(args.category):
492            if catg == '+c':
493                continue
494            print('considering category {}'.format(catg))
495            for tc in testlist:
496                if catg in tc['category'] and tc['id'] not in test_ids:
497                    answer.append(tc)
498                    test_ids.append(tc['id'])
499
500    return answer
501
502def get_test_cases(args):
503    """
504    If a test case file is specified, retrieve tests from that file.
505    Otherwise, glob for all json files in subdirectories and load from
506    each one.
507    Also, if requested, filter by category, and add tests matching
508    certain ids.
509    """
510    import fnmatch
511
512    flist = []
513    testdirs = ['tc-tests']
514
515    if args.file:
516        # at least one file was specified - remove the default directory
517        testdirs = []
518
519        for ff in args.file:
520            if not os.path.isfile(ff):
521                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
522            else:
523                flist.append(os.path.abspath(ff))
524
525    if args.directory:
526        testdirs = args.directory
527
528    for testdir in testdirs:
529        for root, dirnames, filenames in os.walk(testdir):
530            for filename in fnmatch.filter(filenames, '*.json'):
531                candidate = os.path.abspath(os.path.join(root, filename))
532                if candidate not in testdirs:
533                    flist.append(candidate)
534
535    alltestcases = list()
536    for casefile in flist:
537        alltestcases = alltestcases + (load_from_file(casefile))
538
539    allcatlist = get_test_categories(alltestcases)
540    allidlist = get_id_list(alltestcases)
541
542    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
543    idtestcases = filter_tests_by_id(args, alltestcases)
544    cattestcases = filter_tests_by_category(args, alltestcases)
545
546    cat_ids = [x['id'] for x in cattestcases]
547    if args.execute:
548        if args.category:
549            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
550        else:
551            alltestcases = idtestcases
552    else:
553        if cat_ids:
554            alltestcases = cattestcases
555        else:
556            # just accept the existing value of alltestcases,
557            # which has been filtered by file/directory
558            pass
559
560    return allcatlist, allidlist, testcases_by_cats, alltestcases
561
562
563def set_operation_mode(pm, args):
564    """
565    Load the test case data and process remaining arguments to determine
566    what the script should do for this run, and call the appropriate
567    function.
568    """
569    ucat, idlist, testcases, alltests = get_test_cases(args)
570
571    if args.gen_id:
572        if (has_blank_ids(idlist)):
573            alltests = generate_case_ids(alltests)
574        else:
575            print("No empty ID fields found in test files.")
576        exit(0)
577
578    duplicate_ids = check_case_id(alltests)
579    if (len(duplicate_ids) > 0):
580        print("The following test case IDs are not unique:")
581        print(str(set(duplicate_ids)))
582        print("Please correct them before continuing.")
583        exit(1)
584
585    if args.showID:
586        for atest in alltests:
587            print_test_case(atest)
588        exit(0)
589
590    if isinstance(args.category, list) and (len(args.category) == 0):
591        print("Available categories:")
592        print_sll(ucat)
593        exit(0)
594
595    if args.list:
596        if args.list:
597            list_test_cases(alltests)
598            exit(0)
599
600    if len(alltests):
601        catresults = test_runner(pm, args, alltests)
602    else:
603        catresults = 'No tests found\n'
604    print('All test results: \n\n{}'.format(catresults))
605
606def main():
607    """
608    Start of execution; set up argument parser and get the arguments,
609    and start operations.
610    """
611    parser = args_parse()
612    parser = set_args(parser)
613    pm = PluginMgr(parser)
614    parser = pm.call_add_args(parser)
615    (args, remaining) = parser.parse_known_args()
616    args.NAMES = NAMES
617    check_default_settings(args, remaining, pm)
618    if args.verbose > 2:
619        print('args is {}'.format(args))
620
621    set_operation_mode(pm, args)
622
623    exit(0)
624
625
626if __name__ == "__main__":
627    main()
628