xref: /linux/tools/testing/selftests/tc-testing/tdc.py (revision f7308991bfeea3f6a4c6281c64fc1ba9dc6e56b3)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import importlib
15import json
16import subprocess
17import time
18from collections import OrderedDict
19from string import Template
20
21from tdc_config import *
22from tdc_helper import *
23
24import TdcPlugin
25
26class PluginMgr:
27    def __init__(self, argparser):
28        super().__init__()
29        self.plugins = {}
30        self.plugin_instances = []
31        self.args = []
32        self.argparser = argparser
33
34        # TODO, put plugins in order
35        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
36        for dirpath, dirnames, filenames in os.walk(plugindir):
37            for fn in filenames:
38                if (fn.endswith('.py') and
39                    not fn == '__init__.py' and
40                    not fn.startswith('#') and
41                    not fn.startswith('.#')):
42                    mn = fn[0:-3]
43                    foo = importlib.import_module('plugins.' + mn)
44                    self.plugins[mn] = foo
45                    self.plugin_instances.append(foo.SubPlugin())
46
47    def call_pre_suite(self, testcount, testidlist):
48        for pgn_inst in self.plugin_instances:
49            pgn_inst.pre_suite(testcount, testidlist)
50
51    def call_post_suite(self, index):
52        for pgn_inst in reversed(self.plugin_instances):
53            pgn_inst.post_suite(index)
54
55    def call_pre_case(self, test_ordinal, testid):
56        for pgn_inst in self.plugin_instances:
57            try:
58                pgn_inst.pre_case(test_ordinal, testid)
59            except Exception as ee:
60                print('exception {} in call to pre_case for {} plugin'.
61                      format(ee, pgn_inst.__class__))
62                print('test_ordinal is {}'.format(test_ordinal))
63                print('testid is {}'.format(testid))
64                raise
65
66    def call_post_case(self):
67        for pgn_inst in reversed(self.plugin_instances):
68            pgn_inst.post_case()
69
70    def call_pre_execute(self):
71        for pgn_inst in self.plugin_instances:
72            pgn_inst.pre_execute()
73
74    def call_post_execute(self):
75        for pgn_inst in reversed(self.plugin_instances):
76            pgn_inst.post_execute()
77
78    def call_add_args(self, parser):
79        for pgn_inst in self.plugin_instances:
80            parser = pgn_inst.add_args(parser)
81        return parser
82
83    def call_check_args(self, args, remaining):
84        for pgn_inst in self.plugin_instances:
85            pgn_inst.check_args(args, remaining)
86
87    def call_adjust_command(self, stage, command):
88        for pgn_inst in self.plugin_instances:
89            command = pgn_inst.adjust_command(stage, command)
90        return command
91
92    @staticmethod
93    def _make_argparser(args):
94        self.argparser = argparse.ArgumentParser(
95            description='Linux TC unit tests')
96
97
98def replace_keywords(cmd):
99    """
100    For a given executable command, substitute any known
101    variables contained within NAMES with the correct values
102    """
103    tcmd = Template(cmd)
104    subcmd = tcmd.safe_substitute(NAMES)
105    return subcmd
106
107
108def exec_cmd(args, pm, stage, command):
109    """
110    Perform any required modifications on an executable command, then run
111    it in a subprocess and return the results.
112    """
113    if len(command.strip()) == 0:
114        return None, None
115    if '$' in command:
116        command = replace_keywords(command)
117
118    command = pm.call_adjust_command(stage, command)
119    if args.verbose > 0:
120        print('command "{}"'.format(command))
121    proc = subprocess.Popen(command,
122        shell=True,
123        stdout=subprocess.PIPE,
124        stderr=subprocess.PIPE,
125        env=ENVIR)
126    (rawout, serr) = proc.communicate()
127
128    if proc.returncode != 0 and len(serr) > 0:
129        foutput = serr.decode("utf-8")
130    else:
131        foutput = rawout.decode("utf-8")
132
133    proc.stdout.close()
134    proc.stderr.close()
135    return proc, foutput
136
137
138def prepare_env(args, pm, stage, prefix, cmdlist):
139    """
140    Execute the setup/teardown commands for a test case.
141    Optionally terminate test execution if the command fails.
142    """
143    if args.verbose > 0:
144        print('{}'.format(prefix))
145    for cmdinfo in cmdlist:
146        if isinstance(cmdinfo, list):
147            exit_codes = cmdinfo[1:]
148            cmd = cmdinfo[0]
149        else:
150            exit_codes = [0]
151            cmd = cmdinfo
152
153        if not cmd:
154            continue
155
156        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
157
158        if proc and (proc.returncode not in exit_codes):
159            print('', file=sys.stderr)
160            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
161                  file=sys.stderr)
162            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
163                  file=sys.stderr)
164            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
165            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
166            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
167            raise Exception('"{}" did not complete successfully'.format(prefix))
168
169def run_one_test(pm, args, index, tidx):
170    result = True
171    tresult = ""
172    tap = ""
173    if args.verbose > 0:
174        print("\t====================\n=====> ", end="")
175    print("Test " + tidx["id"] + ": " + tidx["name"])
176
177    pm.call_pre_case(index, tidx['id'])
178    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
179
180    if (args.verbose > 0):
181        print('-----> execute stage')
182    pm.call_pre_execute()
183    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
184    exit_code = p.returncode
185    pm.call_post_execute()
186
187    if (exit_code != int(tidx["expExitCode"])):
188        result = False
189        print("exit:", exit_code, int(tidx["expExitCode"]))
190        print(procout)
191    else:
192        if args.verbose > 0:
193            print('-----> verify stage')
194        match_pattern = re.compile(
195            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
196        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
197        match_index = re.findall(match_pattern, procout)
198        if len(match_index) != int(tidx["matchCount"]):
199            result = False
200
201    if not result:
202        tresult += 'not '
203    tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
204    tap += tresult
205
206    if result == False:
207        tap += procout
208
209    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'])
210    pm.call_post_case()
211
212    index += 1
213
214    return tap
215
216def test_runner(pm, args, filtered_tests):
217    """
218    Driver function for the unit tests.
219
220    Prints information about the tests being run, executes the setup and
221    teardown commands and the command under test itself. Also determines
222    success/failure based on the information in the test case and generates
223    TAP output accordingly.
224    """
225    testlist = filtered_tests
226    tcount = len(testlist)
227    index = 1
228    tap = str(index) + ".." + str(tcount) + "\n"
229    badtest = None
230
231    pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
232
233    if args.verbose > 1:
234        print('Run tests here')
235    for tidx in testlist:
236        if "flower" in tidx["category"] and args.device == None:
237            continue
238        try:
239            badtest = tidx  # in case it goes bad
240            tap += run_one_test(pm, args, index, tidx)
241        except Exception as ee:
242            print('Exception {} (caught in test_runner, running test {} {} {})'.
243                  format(ee, index, tidx['id'], tidx['name']))
244            break
245        index += 1
246
247    # if we failed in setup or teardown,
248    # fill in the remaining tests with not ok
249    count = index
250    tap += 'about to flush the tap output if tests need to be skipped\n'
251    if tcount + 1 != index:
252        for tidx in testlist[index - 1:]:
253            msg = 'skipped - previous setup or teardown failed'
254            tap += 'ok {} - {} # {} {} {}\n'.format(
255                count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
256            count += 1
257
258    tap += 'done flushing skipped test tap output\n'
259    pm.call_post_suite(index)
260
261    return tap
262
263def has_blank_ids(idlist):
264    """
265    Search the list for empty ID fields and return true/false accordingly.
266    """
267    return not(all(k for k in idlist))
268
269
270def load_from_file(filename):
271    """
272    Open the JSON file containing the test cases and return them
273    as list of ordered dictionary objects.
274    """
275    try:
276        with open(filename) as test_data:
277            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
278    except json.JSONDecodeError as jde:
279        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
280        testlist = list()
281    else:
282        idlist = get_id_list(testlist)
283        if (has_blank_ids(idlist)):
284            for k in testlist:
285                k['filename'] = filename
286    return testlist
287
288
289def args_parse():
290    """
291    Create the argument parser.
292    """
293    parser = argparse.ArgumentParser(description='Linux TC unit tests')
294    return parser
295
296
297def set_args(parser):
298    """
299    Set the command line arguments for tdc.
300    """
301    parser.add_argument(
302        '-p', '--path', type=str,
303        help='The full path to the tc executable to use')
304    sg = parser.add_argument_group(
305        'selection', 'select which test cases: ' +
306        'files plus directories; filtered by categories plus testids')
307    ag = parser.add_argument_group(
308        'action', 'select action to perform on selected test cases')
309
310    sg.add_argument(
311        '-D', '--directory', nargs='+', metavar='DIR',
312        help='Collect tests from the specified directory(ies) ' +
313        '(default [tc-tests])')
314    sg.add_argument(
315        '-f', '--file', nargs='+', metavar='FILE',
316        help='Run tests from the specified file(s)')
317    sg.add_argument(
318        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
319        help='Run tests only from the specified category/ies, ' +
320        'or if no category/ies is/are specified, list known categories.')
321    sg.add_argument(
322        '-e', '--execute', nargs='+', metavar='ID',
323        help='Execute the specified test cases with specified IDs')
324    ag.add_argument(
325        '-l', '--list', action='store_true',
326        help='List all test cases, or those only within the specified category')
327    ag.add_argument(
328        '-s', '--show', action='store_true', dest='showID',
329        help='Display the selected test cases')
330    ag.add_argument(
331        '-i', '--id', action='store_true', dest='gen_id',
332        help='Generate ID numbers for new test cases')
333    parser.add_argument(
334        '-v', '--verbose', action='count', default=0,
335        help='Show the commands that are being run')
336    parser.add_argument('-d', '--device',
337                        help='Execute the test case in flower category')
338    return parser
339
340
341def check_default_settings(args, remaining, pm):
342    """
343    Process any arguments overriding the default settings,
344    and ensure the settings are correct.
345    """
346    # Allow for overriding specific settings
347    global NAMES
348
349    if args.path != None:
350         NAMES['TC'] = args.path
351    if args.device != None:
352         NAMES['DEV2'] = args.device
353    if not os.path.isfile(NAMES['TC']):
354        print("The specified tc path " + NAMES['TC'] + " does not exist.")
355        exit(1)
356
357    pm.call_check_args(args, remaining)
358
359
360def get_id_list(alltests):
361    """
362    Generate a list of all IDs in the test cases.
363    """
364    return [x["id"] for x in alltests]
365
366
367def check_case_id(alltests):
368    """
369    Check for duplicate test case IDs.
370    """
371    idl = get_id_list(alltests)
372    return [x for x in idl if idl.count(x) > 1]
373
374
375def does_id_exist(alltests, newid):
376    """
377    Check if a given ID already exists in the list of test cases.
378    """
379    idl = get_id_list(alltests)
380    return (any(newid == x for x in idl))
381
382
383def generate_case_ids(alltests):
384    """
385    If a test case has a blank ID field, generate a random hex ID for it
386    and then write the test cases back to disk.
387    """
388    import random
389    for c in alltests:
390        if (c["id"] == ""):
391            while True:
392                newid = str('%04x' % random.randrange(16**4))
393                if (does_id_exist(alltests, newid)):
394                    continue
395                else:
396                    c['id'] = newid
397                    break
398
399    ufilename = []
400    for c in alltests:
401        if ('filename' in c):
402            ufilename.append(c['filename'])
403    ufilename = get_unique_item(ufilename)
404    for f in ufilename:
405        testlist = []
406        for t in alltests:
407            if 'filename' in t:
408                if t['filename'] == f:
409                    del t['filename']
410                    testlist.append(t)
411        outfile = open(f, "w")
412        json.dump(testlist, outfile, indent=4)
413        outfile.close()
414
415def filter_tests_by_id(args, testlist):
416    '''
417    Remove tests from testlist that are not in the named id list.
418    If id list is empty, return empty list.
419    '''
420    newlist = list()
421    if testlist and args.execute:
422        target_ids = args.execute
423
424        if isinstance(target_ids, list) and (len(target_ids) > 0):
425            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
426    return newlist
427
428def filter_tests_by_category(args, testlist):
429    '''
430    Remove tests from testlist that are not in a named category.
431    '''
432    answer = list()
433    if args.category and testlist:
434        test_ids = list()
435        for catg in set(args.category):
436            if catg == '+c':
437                continue
438            print('considering category {}'.format(catg))
439            for tc in testlist:
440                if catg in tc['category'] and tc['id'] not in test_ids:
441                    answer.append(tc)
442                    test_ids.append(tc['id'])
443
444    return answer
445
446def get_test_cases(args):
447    """
448    If a test case file is specified, retrieve tests from that file.
449    Otherwise, glob for all json files in subdirectories and load from
450    each one.
451    Also, if requested, filter by category, and add tests matching
452    certain ids.
453    """
454    import fnmatch
455
456    flist = []
457    testdirs = ['tc-tests']
458
459    if args.file:
460        # at least one file was specified - remove the default directory
461        testdirs = []
462
463        for ff in args.file:
464            if not os.path.isfile(ff):
465                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
466            else:
467                flist.append(os.path.abspath(ff))
468
469    if args.directory:
470        testdirs = args.directory
471
472    for testdir in testdirs:
473        for root, dirnames, filenames in os.walk(testdir):
474            for filename in fnmatch.filter(filenames, '*.json'):
475                candidate = os.path.abspath(os.path.join(root, filename))
476                if candidate not in testdirs:
477                    flist.append(candidate)
478
479    alltestcases = list()
480    for casefile in flist:
481        alltestcases = alltestcases + (load_from_file(casefile))
482
483    allcatlist = get_test_categories(alltestcases)
484    allidlist = get_id_list(alltestcases)
485
486    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
487    idtestcases = filter_tests_by_id(args, alltestcases)
488    cattestcases = filter_tests_by_category(args, alltestcases)
489
490    cat_ids = [x['id'] for x in cattestcases]
491    if args.execute:
492        if args.category:
493            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
494        else:
495            alltestcases = idtestcases
496    else:
497        if cat_ids:
498            alltestcases = cattestcases
499        else:
500            # just accept the existing value of alltestcases,
501            # which has been filtered by file/directory
502            pass
503
504    return allcatlist, allidlist, testcases_by_cats, alltestcases
505
506
507def set_operation_mode(pm, args):
508    """
509    Load the test case data and process remaining arguments to determine
510    what the script should do for this run, and call the appropriate
511    function.
512    """
513    ucat, idlist, testcases, alltests = get_test_cases(args)
514
515    if args.gen_id:
516        if (has_blank_ids(idlist)):
517            alltests = generate_case_ids(alltests)
518        else:
519            print("No empty ID fields found in test files.")
520        exit(0)
521
522    duplicate_ids = check_case_id(alltests)
523    if (len(duplicate_ids) > 0):
524        print("The following test case IDs are not unique:")
525        print(str(set(duplicate_ids)))
526        print("Please correct them before continuing.")
527        exit(1)
528
529    if args.showID:
530        for atest in alltests:
531            print_test_case(atest)
532        exit(0)
533
534    if isinstance(args.category, list) and (len(args.category) == 0):
535        print("Available categories:")
536        print_sll(ucat)
537        exit(0)
538
539    if args.list:
540        if args.list:
541            list_test_cases(alltests)
542            exit(0)
543
544    if len(alltests):
545        catresults = test_runner(pm, args, alltests)
546    else:
547        catresults = 'No tests found\n'
548    print('All test results: \n\n{}'.format(catresults))
549
550def main():
551    """
552    Start of execution; set up argument parser and get the arguments,
553    and start operations.
554    """
555    parser = args_parse()
556    parser = set_args(parser)
557    pm = PluginMgr(parser)
558    parser = pm.call_add_args(parser)
559    (args, remaining) = parser.parse_known_args()
560    args.NAMES = NAMES
561    check_default_settings(args, remaining, pm)
562    if args.verbose > 2:
563        print('args is {}'.format(args))
564
565    set_operation_mode(pm, args)
566
567    exit(0)
568
569
570if __name__ == "__main__":
571    main()
572