xref: /linux/tools/testing/selftests/tc-testing/tdc.py (revision bd628c1bed7902ec1f24ba0fe70758949146abbe)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import importlib
15import json
16import subprocess
17import time
18import traceback
19from collections import OrderedDict
20from string import Template
21
22from tdc_config import *
23from tdc_helper import *
24
25import TdcPlugin
26from TdcResults import *
27
28
29class PluginMgrTestFail(Exception):
30    def __init__(self, stage, output, message):
31        self.stage = stage
32        self.output = output
33        self.message = message
34
35class PluginMgr:
36    def __init__(self, argparser):
37        super().__init__()
38        self.plugins = {}
39        self.plugin_instances = []
40        self.args = []
41        self.argparser = argparser
42
43        # TODO, put plugins in order
44        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
45        for dirpath, dirnames, filenames in os.walk(plugindir):
46            for fn in filenames:
47                if (fn.endswith('.py') and
48                    not fn == '__init__.py' and
49                    not fn.startswith('#') and
50                    not fn.startswith('.#')):
51                    mn = fn[0:-3]
52                    foo = importlib.import_module('plugins.' + mn)
53                    self.plugins[mn] = foo
54                    self.plugin_instances.append(foo.SubPlugin())
55
56    def call_pre_suite(self, testcount, testidlist):
57        for pgn_inst in self.plugin_instances:
58            pgn_inst.pre_suite(testcount, testidlist)
59
60    def call_post_suite(self, index):
61        for pgn_inst in reversed(self.plugin_instances):
62            pgn_inst.post_suite(index)
63
64    def call_pre_case(self, test_ordinal, testid, test_name):
65        for pgn_inst in self.plugin_instances:
66            try:
67                pgn_inst.pre_case(test_ordinal, testid, test_name)
68            except Exception as ee:
69                print('exception {} in call to pre_case for {} plugin'.
70                      format(ee, pgn_inst.__class__))
71                print('test_ordinal is {}'.format(test_ordinal))
72                print('testid is {}'.format(testid))
73                raise
74
75    def call_post_case(self):
76        for pgn_inst in reversed(self.plugin_instances):
77            pgn_inst.post_case()
78
79    def call_pre_execute(self):
80        for pgn_inst in self.plugin_instances:
81            pgn_inst.pre_execute()
82
83    def call_post_execute(self):
84        for pgn_inst in reversed(self.plugin_instances):
85            pgn_inst.post_execute()
86
87    def call_add_args(self, parser):
88        for pgn_inst in self.plugin_instances:
89            parser = pgn_inst.add_args(parser)
90        return parser
91
92    def call_check_args(self, args, remaining):
93        for pgn_inst in self.plugin_instances:
94            pgn_inst.check_args(args, remaining)
95
96    def call_adjust_command(self, stage, command):
97        for pgn_inst in self.plugin_instances:
98            command = pgn_inst.adjust_command(stage, command)
99        return command
100
101    @staticmethod
102    def _make_argparser(args):
103        self.argparser = argparse.ArgumentParser(
104            description='Linux TC unit tests')
105
106def replace_keywords(cmd):
107    """
108    For a given executable command, substitute any known
109    variables contained within NAMES with the correct values
110    """
111    tcmd = Template(cmd)
112    subcmd = tcmd.safe_substitute(NAMES)
113    return subcmd
114
115
116def exec_cmd(args, pm, stage, command):
117    """
118    Perform any required modifications on an executable command, then run
119    it in a subprocess and return the results.
120    """
121    if len(command.strip()) == 0:
122        return None, None
123    if '$' in command:
124        command = replace_keywords(command)
125
126    command = pm.call_adjust_command(stage, command)
127    if args.verbose > 0:
128        print('command "{}"'.format(command))
129    proc = subprocess.Popen(command,
130        shell=True,
131        stdout=subprocess.PIPE,
132        stderr=subprocess.PIPE,
133        env=ENVIR)
134
135    try:
136        (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
137        if proc.returncode != 0 and len(serr) > 0:
138            foutput = serr.decode("utf-8", errors="ignore")
139        else:
140            foutput = rawout.decode("utf-8", errors="ignore")
141    except subprocess.TimeoutExpired:
142        foutput = "Command \"{}\" timed out\n".format(command)
143        proc.returncode = 255
144
145    proc.stdout.close()
146    proc.stderr.close()
147    return proc, foutput
148
149
150def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
151    """
152    Execute the setup/teardown commands for a test case.
153    Optionally terminate test execution if the command fails.
154    """
155    if args.verbose > 0:
156        print('{}'.format(prefix))
157    for cmdinfo in cmdlist:
158        if isinstance(cmdinfo, list):
159            exit_codes = cmdinfo[1:]
160            cmd = cmdinfo[0]
161        else:
162            exit_codes = [0]
163            cmd = cmdinfo
164
165        if not cmd:
166            continue
167
168        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
169
170        if proc and (proc.returncode not in exit_codes):
171            print('', file=sys.stderr)
172            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
173                  file=sys.stderr)
174            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
175                  file=sys.stderr)
176            print("returncode {}; expected {}".format(proc.returncode,
177                                                      exit_codes))
178            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
179            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
180            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
181            raise PluginMgrTestFail(
182                stage, output,
183                '"{}" did not complete successfully'.format(prefix))
184
185def run_one_test(pm, args, index, tidx):
186    global NAMES
187    result = True
188    tresult = ""
189    tap = ""
190    res = TestResult(tidx['id'], tidx['name'])
191    if args.verbose > 0:
192        print("\t====================\n=====> ", end="")
193    print("Test " + tidx["id"] + ": " + tidx["name"])
194
195    # populate NAMES with TESTID for this test
196    NAMES['TESTID'] = tidx['id']
197
198    pm.call_pre_case(index, tidx['id'], tidx['name'])
199    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
200
201    if (args.verbose > 0):
202        print('-----> execute stage')
203    pm.call_pre_execute()
204    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
205    if p:
206        exit_code = p.returncode
207    else:
208        exit_code = None
209
210    pm.call_post_execute()
211
212    if (exit_code is None or exit_code != int(tidx["expExitCode"])):
213        print("exit: {!r}".format(exit_code))
214        print("exit: {}".format(int(tidx["expExitCode"])))
215        #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
216        res.set_result(ResultState.fail)
217        res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
218        print(procout)
219    else:
220        if args.verbose > 0:
221            print('-----> verify stage')
222        match_pattern = re.compile(
223            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
224        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
225        if procout:
226            match_index = re.findall(match_pattern, procout)
227            if len(match_index) != int(tidx["matchCount"]):
228                res.set_result(ResultState.fail)
229                res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
230            else:
231                res.set_result(ResultState.success)
232        elif int(tidx["matchCount"]) != 0:
233            res.set_result(ResultState.fail)
234            res.set_failmsg('No output generated by verify command.')
235        else:
236            res.set_result(ResultState.success)
237
238    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
239    pm.call_post_case()
240
241    index += 1
242
243    # remove TESTID from NAMES
244    del(NAMES['TESTID'])
245    return res
246
247def test_runner(pm, args, filtered_tests):
248    """
249    Driver function for the unit tests.
250
251    Prints information about the tests being run, executes the setup and
252    teardown commands and the command under test itself. Also determines
253    success/failure based on the information in the test case and generates
254    TAP output accordingly.
255    """
256    testlist = filtered_tests
257    tcount = len(testlist)
258    index = 1
259    tap = ''
260    badtest = None
261    stage = None
262    emergency_exit = False
263    emergency_exit_message = ''
264
265    tsr = TestSuiteReport()
266
267    try:
268        pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
269    except Exception as ee:
270        ex_type, ex, ex_tb = sys.exc_info()
271        print('Exception {} {} (caught in pre_suite).'.
272              format(ex_type, ex))
273        traceback.print_tb(ex_tb)
274        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
275        emergency_exit = True
276        stage = 'pre-SUITE'
277
278    if emergency_exit:
279        pm.call_post_suite(index)
280        return emergency_exit_message
281    if args.verbose > 1:
282        print('give test rig 2 seconds to stabilize')
283    time.sleep(2)
284    for tidx in testlist:
285        if "flower" in tidx["category"] and args.device == None:
286            if args.verbose > 1:
287                print('Not executing test {} {} because DEV2 not defined'.
288                      format(tidx['id'], tidx['name']))
289            res = TestResult(tidx['id'], tidx['name'])
290            res.set_result(ResultState.skip)
291            res.set_errormsg('Not executed because DEV2 is not defined')
292            tsr.add_resultdata(res)
293            continue
294        try:
295            badtest = tidx  # in case it goes bad
296            res = run_one_test(pm, args, index, tidx)
297            tsr.add_resultdata(res)
298        except PluginMgrTestFail as pmtf:
299            ex_type, ex, ex_tb = sys.exc_info()
300            stage = pmtf.stage
301            message = pmtf.message
302            output = pmtf.output
303            res = TestResult(tidx['id'], tidx['name'])
304            res.set_result(ResultState.skip)
305            res.set_errormsg(pmtf.message)
306            res.set_failmsg(pmtf.output)
307            tsr.add_resultdata(res)
308            index += 1
309            print(message)
310            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
311                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
312            print('---------------')
313            print('traceback')
314            traceback.print_tb(ex_tb)
315            print('---------------')
316            if stage == 'teardown':
317                print('accumulated output for this test:')
318                if pmtf.output:
319                    print(pmtf.output)
320            print('---------------')
321            break
322        index += 1
323
324    # if we failed in setup or teardown,
325    # fill in the remaining tests with ok-skipped
326    count = index
327
328    if tcount + 1 != count:
329        for tidx in testlist[count - 1:]:
330            res = TestResult(tidx['id'], tidx['name'])
331            res.set_result(ResultState.skip)
332            msg = 'skipped - previous {} failed {} {}'.format(stage,
333                index, badtest.get('id', '--Unknown--'))
334            res.set_errormsg(msg)
335            tsr.add_resultdata(res)
336            count += 1
337
338    if args.pause:
339        print('Want to pause\nPress enter to continue ...')
340        if input(sys.stdin):
341            print('got something on stdin')
342
343    pm.call_post_suite(index)
344
345    return tsr
346
347def has_blank_ids(idlist):
348    """
349    Search the list for empty ID fields and return true/false accordingly.
350    """
351    return not(all(k for k in idlist))
352
353
354def load_from_file(filename):
355    """
356    Open the JSON file containing the test cases and return them
357    as list of ordered dictionary objects.
358    """
359    try:
360        with open(filename) as test_data:
361            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
362    except json.JSONDecodeError as jde:
363        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
364        testlist = list()
365    else:
366        idlist = get_id_list(testlist)
367        if (has_blank_ids(idlist)):
368            for k in testlist:
369                k['filename'] = filename
370    return testlist
371
372
373def args_parse():
374    """
375    Create the argument parser.
376    """
377    parser = argparse.ArgumentParser(description='Linux TC unit tests')
378    return parser
379
380
381def set_args(parser):
382    """
383    Set the command line arguments for tdc.
384    """
385    parser.add_argument(
386        '--outfile', type=str,
387        help='Path to the file in which results should be saved. ' +
388        'Default target is the current directory.')
389    parser.add_argument(
390        '-p', '--path', type=str,
391        help='The full path to the tc executable to use')
392    sg = parser.add_argument_group(
393        'selection', 'select which test cases: ' +
394        'files plus directories; filtered by categories plus testids')
395    ag = parser.add_argument_group(
396        'action', 'select action to perform on selected test cases')
397
398    sg.add_argument(
399        '-D', '--directory', nargs='+', metavar='DIR',
400        help='Collect tests from the specified directory(ies) ' +
401        '(default [tc-tests])')
402    sg.add_argument(
403        '-f', '--file', nargs='+', metavar='FILE',
404        help='Run tests from the specified file(s)')
405    sg.add_argument(
406        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
407        help='Run tests only from the specified category/ies, ' +
408        'or if no category/ies is/are specified, list known categories.')
409    sg.add_argument(
410        '-e', '--execute', nargs='+', metavar='ID',
411        help='Execute the specified test cases with specified IDs')
412    ag.add_argument(
413        '-l', '--list', action='store_true',
414        help='List all test cases, or those only within the specified category')
415    ag.add_argument(
416        '-s', '--show', action='store_true', dest='showID',
417        help='Display the selected test cases')
418    ag.add_argument(
419        '-i', '--id', action='store_true', dest='gen_id',
420        help='Generate ID numbers for new test cases')
421    parser.add_argument(
422        '-v', '--verbose', action='count', default=0,
423        help='Show the commands that are being run')
424    parser.add_argument(
425        '--format', default='tap', const='tap', nargs='?',
426        choices=['none', 'xunit', 'tap'],
427        help='Specify the format for test results. (Default: TAP)')
428    parser.add_argument('-d', '--device',
429                        help='Execute the test case in flower category')
430    parser.add_argument(
431        '-P', '--pause', action='store_true',
432        help='Pause execution just before post-suite stage')
433    return parser
434
435
436def check_default_settings(args, remaining, pm):
437    """
438    Process any arguments overriding the default settings,
439    and ensure the settings are correct.
440    """
441    # Allow for overriding specific settings
442    global NAMES
443
444    if args.path != None:
445        NAMES['TC'] = args.path
446    if args.device != None:
447        NAMES['DEV2'] = args.device
448    if 'TIMEOUT' not in NAMES:
449        NAMES['TIMEOUT'] = None
450    if not os.path.isfile(NAMES['TC']):
451        print("The specified tc path " + NAMES['TC'] + " does not exist.")
452        exit(1)
453
454    pm.call_check_args(args, remaining)
455
456
457def get_id_list(alltests):
458    """
459    Generate a list of all IDs in the test cases.
460    """
461    return [x["id"] for x in alltests]
462
463
464def check_case_id(alltests):
465    """
466    Check for duplicate test case IDs.
467    """
468    idl = get_id_list(alltests)
469    return [x for x in idl if idl.count(x) > 1]
470
471
472def does_id_exist(alltests, newid):
473    """
474    Check if a given ID already exists in the list of test cases.
475    """
476    idl = get_id_list(alltests)
477    return (any(newid == x for x in idl))
478
479
480def generate_case_ids(alltests):
481    """
482    If a test case has a blank ID field, generate a random hex ID for it
483    and then write the test cases back to disk.
484    """
485    import random
486    for c in alltests:
487        if (c["id"] == ""):
488            while True:
489                newid = str('{:04x}'.format(random.randrange(16**4)))
490                if (does_id_exist(alltests, newid)):
491                    continue
492                else:
493                    c['id'] = newid
494                    break
495
496    ufilename = []
497    for c in alltests:
498        if ('filename' in c):
499            ufilename.append(c['filename'])
500    ufilename = get_unique_item(ufilename)
501    for f in ufilename:
502        testlist = []
503        for t in alltests:
504            if 'filename' in t:
505                if t['filename'] == f:
506                    del t['filename']
507                    testlist.append(t)
508        outfile = open(f, "w")
509        json.dump(testlist, outfile, indent=4)
510        outfile.write("\n")
511        outfile.close()
512
513def filter_tests_by_id(args, testlist):
514    '''
515    Remove tests from testlist that are not in the named id list.
516    If id list is empty, return empty list.
517    '''
518    newlist = list()
519    if testlist and args.execute:
520        target_ids = args.execute
521
522        if isinstance(target_ids, list) and (len(target_ids) > 0):
523            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
524    return newlist
525
526def filter_tests_by_category(args, testlist):
527    '''
528    Remove tests from testlist that are not in a named category.
529    '''
530    answer = list()
531    if args.category and testlist:
532        test_ids = list()
533        for catg in set(args.category):
534            if catg == '+c':
535                continue
536            print('considering category {}'.format(catg))
537            for tc in testlist:
538                if catg in tc['category'] and tc['id'] not in test_ids:
539                    answer.append(tc)
540                    test_ids.append(tc['id'])
541
542    return answer
543
544def get_test_cases(args):
545    """
546    If a test case file is specified, retrieve tests from that file.
547    Otherwise, glob for all json files in subdirectories and load from
548    each one.
549    Also, if requested, filter by category, and add tests matching
550    certain ids.
551    """
552    import fnmatch
553
554    flist = []
555    testdirs = ['tc-tests']
556
557    if args.file:
558        # at least one file was specified - remove the default directory
559        testdirs = []
560
561        for ff in args.file:
562            if not os.path.isfile(ff):
563                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
564            else:
565                flist.append(os.path.abspath(ff))
566
567    if args.directory:
568        testdirs = args.directory
569
570    for testdir in testdirs:
571        for root, dirnames, filenames in os.walk(testdir):
572            for filename in fnmatch.filter(filenames, '*.json'):
573                candidate = os.path.abspath(os.path.join(root, filename))
574                if candidate not in testdirs:
575                    flist.append(candidate)
576
577    alltestcases = list()
578    for casefile in flist:
579        alltestcases = alltestcases + (load_from_file(casefile))
580
581    allcatlist = get_test_categories(alltestcases)
582    allidlist = get_id_list(alltestcases)
583
584    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
585    idtestcases = filter_tests_by_id(args, alltestcases)
586    cattestcases = filter_tests_by_category(args, alltestcases)
587
588    cat_ids = [x['id'] for x in cattestcases]
589    if args.execute:
590        if args.category:
591            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
592        else:
593            alltestcases = idtestcases
594    else:
595        if cat_ids:
596            alltestcases = cattestcases
597        else:
598            # just accept the existing value of alltestcases,
599            # which has been filtered by file/directory
600            pass
601
602    return allcatlist, allidlist, testcases_by_cats, alltestcases
603
604
605def set_operation_mode(pm, args):
606    """
607    Load the test case data and process remaining arguments to determine
608    what the script should do for this run, and call the appropriate
609    function.
610    """
611    ucat, idlist, testcases, alltests = get_test_cases(args)
612
613    if args.gen_id:
614        if (has_blank_ids(idlist)):
615            alltests = generate_case_ids(alltests)
616        else:
617            print("No empty ID fields found in test files.")
618        exit(0)
619
620    duplicate_ids = check_case_id(alltests)
621    if (len(duplicate_ids) > 0):
622        print("The following test case IDs are not unique:")
623        print(str(set(duplicate_ids)))
624        print("Please correct them before continuing.")
625        exit(1)
626
627    if args.showID:
628        for atest in alltests:
629            print_test_case(atest)
630        exit(0)
631
632    if isinstance(args.category, list) and (len(args.category) == 0):
633        print("Available categories:")
634        print_sll(ucat)
635        exit(0)
636
637    if args.list:
638        if args.list:
639            list_test_cases(alltests)
640            exit(0)
641
642    if len(alltests):
643        catresults = test_runner(pm, args, alltests)
644        if args.format == 'none':
645            print('Test results output suppression requested\n')
646        else:
647            print('\nAll test results: \n')
648            if args.format == 'xunit':
649                suffix = 'xml'
650                res = catresults.format_xunit()
651            elif args.format == 'tap':
652                suffix = 'tap'
653                res = catresults.format_tap()
654            print(res)
655            print('\n\n')
656            if not args.outfile:
657                fname = 'test-results.{}'.format(suffix)
658            else:
659                fname = args.outfile
660            with open(fname, 'w') as fh:
661                fh.write(res)
662                fh.close()
663                if os.getenv('SUDO_UID') is not None:
664                    os.chown(fname, uid=int(os.getenv('SUDO_UID')),
665                        gid=int(os.getenv('SUDO_GID')))
666    else:
667        print('No tests found\n')
668
669def main():
670    """
671    Start of execution; set up argument parser and get the arguments,
672    and start operations.
673    """
674    parser = args_parse()
675    parser = set_args(parser)
676    pm = PluginMgr(parser)
677    parser = pm.call_add_args(parser)
678    (args, remaining) = parser.parse_known_args()
679    args.NAMES = NAMES
680    check_default_settings(args, remaining, pm)
681    if args.verbose > 2:
682        print('args is {}'.format(args))
683
684    set_operation_mode(pm, args)
685
686    exit(0)
687
688
689if __name__ == "__main__":
690    main()
691