xref: /linux/tools/testing/selftests/tc-testing/tdc.py (revision e53524cdcc02d089e757b668da031ba06ff665c3)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import importlib
15import json
16import subprocess
17import time
18import traceback
19from collections import OrderedDict
20from string import Template
21
22from tdc_config import *
23from tdc_helper import *
24
25import TdcPlugin
26from TdcResults import *
27
28class PluginDependencyException(Exception):
29    def __init__(self, missing_pg):
30        self.missing_pg = missing_pg
31
32class PluginMgrTestFail(Exception):
33    def __init__(self, stage, output, message):
34        self.stage = stage
35        self.output = output
36        self.message = message
37
38class PluginMgr:
39    def __init__(self, argparser):
40        super().__init__()
41        self.plugins = {}
42        self.plugin_instances = []
43        self.failed_plugins = {}
44        self.argparser = argparser
45
46        # TODO, put plugins in order
47        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
48        for dirpath, dirnames, filenames in os.walk(plugindir):
49            for fn in filenames:
50                if (fn.endswith('.py') and
51                    not fn == '__init__.py' and
52                    not fn.startswith('#') and
53                    not fn.startswith('.#')):
54                    mn = fn[0:-3]
55                    foo = importlib.import_module('plugins.' + mn)
56                    self.plugins[mn] = foo
57                    self.plugin_instances.append(foo.SubPlugin())
58
59    def load_plugin(self, pgdir, pgname):
60        pgname = pgname[0:-3]
61        foo = importlib.import_module('{}.{}'.format(pgdir, pgname))
62        self.plugins[pgname] = foo
63        self.plugin_instances.append(foo.SubPlugin())
64        self.plugin_instances[-1].check_args(self.args, None)
65
66    def get_required_plugins(self, testlist):
67        '''
68        Get all required plugins from the list of test cases and return
69        all unique items.
70        '''
71        reqs = []
72        for t in testlist:
73            try:
74                if 'requires' in t['plugins']:
75                    if isinstance(t['plugins']['requires'], list):
76                        reqs.extend(t['plugins']['requires'])
77                    else:
78                        reqs.append(t['plugins']['requires'])
79            except KeyError:
80                continue
81        reqs = get_unique_item(reqs)
82        return reqs
83
84    def load_required_plugins(self, reqs, parser, args, remaining):
85        '''
86        Get all required plugins from the list of test cases and load any plugin
87        that is not already enabled.
88        '''
89        pgd = ['plugin-lib', 'plugin-lib-custom']
90        pnf = []
91
92        for r in reqs:
93            if r not in self.plugins:
94                fname = '{}.py'.format(r)
95                source_path = []
96                for d in pgd:
97                    pgpath = '{}/{}'.format(d, fname)
98                    if os.path.isfile(pgpath):
99                        source_path.append(pgpath)
100                if len(source_path) == 0:
101                    print('ERROR: unable to find required plugin {}'.format(r))
102                    pnf.append(fname)
103                    continue
104                elif len(source_path) > 1:
105                    print('WARNING: multiple copies of plugin {} found, using version found')
106                    print('at {}'.format(source_path[0]))
107                pgdir = source_path[0]
108                pgdir = pgdir.split('/')[0]
109                self.load_plugin(pgdir, fname)
110        if len(pnf) > 0:
111            raise PluginDependencyException(pnf)
112
113        parser = self.call_add_args(parser)
114        (args, remaining) = parser.parse_known_args(args=remaining, namespace=args)
115        return args
116
117    def call_pre_suite(self, testcount, testidlist):
118        for pgn_inst in self.plugin_instances:
119            pgn_inst.pre_suite(testcount, testidlist)
120
121    def call_post_suite(self, index):
122        for pgn_inst in reversed(self.plugin_instances):
123            pgn_inst.post_suite(index)
124
125    def call_pre_case(self, caseinfo, *, test_skip=False):
126        for pgn_inst in self.plugin_instances:
127            try:
128                pgn_inst.pre_case(caseinfo, test_skip)
129            except Exception as ee:
130                print('exception {} in call to pre_case for {} plugin'.
131                      format(ee, pgn_inst.__class__))
132                print('test_ordinal is {}'.format(test_ordinal))
133                print('testid is {}'.format(caseinfo['id']))
134                raise
135
136    def call_post_case(self):
137        for pgn_inst in reversed(self.plugin_instances):
138            pgn_inst.post_case()
139
140    def call_pre_execute(self):
141        for pgn_inst in self.plugin_instances:
142            pgn_inst.pre_execute()
143
144    def call_post_execute(self):
145        for pgn_inst in reversed(self.plugin_instances):
146            pgn_inst.post_execute()
147
148    def call_add_args(self, parser):
149        for pgn_inst in self.plugin_instances:
150            parser = pgn_inst.add_args(parser)
151        return parser
152
153    def call_check_args(self, args, remaining):
154        for pgn_inst in self.plugin_instances:
155            pgn_inst.check_args(args, remaining)
156
157    def call_adjust_command(self, stage, command):
158        for pgn_inst in self.plugin_instances:
159            command = pgn_inst.adjust_command(stage, command)
160        return command
161
162    def set_args(self, args):
163        self.args = args
164
165    @staticmethod
166    def _make_argparser(args):
167        self.argparser = argparse.ArgumentParser(
168            description='Linux TC unit tests')
169
170def replace_keywords(cmd):
171    """
172    For a given executable command, substitute any known
173    variables contained within NAMES with the correct values
174    """
175    tcmd = Template(cmd)
176    subcmd = tcmd.safe_substitute(NAMES)
177    return subcmd
178
179
180def exec_cmd(args, pm, stage, command):
181    """
182    Perform any required modifications on an executable command, then run
183    it in a subprocess and return the results.
184    """
185    if len(command.strip()) == 0:
186        return None, None
187    if '$' in command:
188        command = replace_keywords(command)
189
190    command = pm.call_adjust_command(stage, command)
191    if args.verbose > 0:
192        print('command "{}"'.format(command))
193    proc = subprocess.Popen(command,
194        shell=True,
195        stdout=subprocess.PIPE,
196        stderr=subprocess.PIPE,
197        env=ENVIR)
198
199    try:
200        (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT'])
201        if proc.returncode != 0 and len(serr) > 0:
202            foutput = serr.decode("utf-8", errors="ignore")
203        else:
204            foutput = rawout.decode("utf-8", errors="ignore")
205    except subprocess.TimeoutExpired:
206        foutput = "Command \"{}\" timed out\n".format(command)
207        proc.returncode = 255
208
209    proc.stdout.close()
210    proc.stderr.close()
211    return proc, foutput
212
213
214def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
215    """
216    Execute the setup/teardown commands for a test case.
217    Optionally terminate test execution if the command fails.
218    """
219    if args.verbose > 0:
220        print('{}'.format(prefix))
221    for cmdinfo in cmdlist:
222        if isinstance(cmdinfo, list):
223            exit_codes = cmdinfo[1:]
224            cmd = cmdinfo[0]
225        else:
226            exit_codes = [0]
227            cmd = cmdinfo
228
229        if not cmd:
230            continue
231
232        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
233
234        if proc and (proc.returncode not in exit_codes):
235            print('', file=sys.stderr)
236            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
237                  file=sys.stderr)
238            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
239                  file=sys.stderr)
240            print("returncode {}; expected {}".format(proc.returncode,
241                                                      exit_codes))
242            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
243            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
244            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
245            raise PluginMgrTestFail(
246                stage, output,
247                '"{}" did not complete successfully'.format(prefix))
248
249def verify_by_json(procout, res, tidx, args, pm):
250    try:
251        outputJSON = json.loads(procout)
252    except json.JSONDecodeError:
253        res.set_result(ResultState.fail)
254        res.set_failmsg('Cannot decode verify command\'s output. Is it JSON?')
255        return res
256
257    matchJSON = json.loads(json.dumps(tidx['matchJSON']))
258
259    if type(outputJSON) != type(matchJSON):
260        failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {} '
261        failmsg = failmsg.format(type(outputJSON).__name__, type(matchJSON).__name__)
262        res.set_result(ResultState.fail)
263        res.set_failmsg(failmsg)
264        return res
265
266    if len(matchJSON) > len(outputJSON):
267        failmsg = "Your matchJSON value is an array, and it contains more elements than the command under test\'s output:\ncommand output (length: {}):\n{}\nmatchJSON value (length: {}):\n{}"
268        failmsg = failmsg.format(len(outputJSON), outputJSON, len(matchJSON), matchJSON)
269        res.set_result(ResultState.fail)
270        res.set_failmsg(failmsg)
271        return res
272    res = find_in_json(res, outputJSON, matchJSON, 0)
273
274    return res
275
276def find_in_json(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
277    if res.get_result() == ResultState.fail:
278        return res
279
280    if type(matchJSONVal) == list:
281        res = find_in_json_list(res, outputJSONVal, matchJSONVal, matchJSONKey)
282
283    elif type(matchJSONVal) == dict:
284        res = find_in_json_dict(res, outputJSONVal, matchJSONVal)
285    else:
286        res = find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey)
287
288    if res.get_result() != ResultState.fail:
289        res.set_result(ResultState.success)
290        return res
291
292    return res
293
294def find_in_json_list(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
295    if (type(matchJSONVal) != type(outputJSONVal)):
296        failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {}'
297        failmsg = failmsg.format(outputJSONVal, matchJSONVal)
298        res.set_result(ResultState.fail)
299        res.set_failmsg(failmsg)
300        return res
301
302    if len(matchJSONVal) > len(outputJSONVal):
303        failmsg = "Your matchJSON value is an array, and it contains more elements than the command under test\'s output:\ncommand output (length: {}):\n{}\nmatchJSON value (length: {}):\n{}"
304        failmsg = failmsg.format(len(outputJSONVal), outputJSONVal, len(matchJSONVal), matchJSONVal)
305        res.set_result(ResultState.fail)
306        res.set_failmsg(failmsg)
307        return res
308
309    for matchJSONIdx, matchJSONVal in enumerate(matchJSONVal):
310        res = find_in_json(res, outputJSONVal[matchJSONIdx], matchJSONVal,
311                           matchJSONKey)
312    return res
313
314def find_in_json_dict(res, outputJSONVal, matchJSONVal):
315    for matchJSONKey, matchJSONVal in matchJSONVal.items():
316        if type(outputJSONVal) == dict:
317            if matchJSONKey not in outputJSONVal:
318                failmsg = 'Key not found in json output: {}: {}\nMatching against output: {}'
319                failmsg = failmsg.format(matchJSONKey, matchJSONVal, outputJSONVal)
320                res.set_result(ResultState.fail)
321                res.set_failmsg(failmsg)
322                return res
323
324        else:
325            failmsg = 'Original output and matchJSON value are not the same type: output: {} != matchJSON: {}'
326            failmsg = failmsg.format(type(outputJSON).__name__, type(matchJSON).__name__)
327            res.set_result(ResultState.fail)
328            res.set_failmsg(failmsg)
329            return rest
330
331        if type(outputJSONVal) == dict and (type(outputJSONVal[matchJSONKey]) == dict or
332                type(outputJSONVal[matchJSONKey]) == list):
333            if len(matchJSONVal) > 0:
334                res = find_in_json(res, outputJSONVal[matchJSONKey], matchJSONVal, matchJSONKey)
335            # handling corner case where matchJSONVal == [] or matchJSONVal == {}
336            else:
337                res = find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey)
338        else:
339            res = find_in_json(res, outputJSONVal, matchJSONVal, matchJSONKey)
340    return res
341
342def find_in_json_other(res, outputJSONVal, matchJSONVal, matchJSONKey=None):
343    if matchJSONKey in outputJSONVal:
344        if matchJSONVal != outputJSONVal[matchJSONKey]:
345            failmsg = 'Value doesn\'t match: {}: {} != {}\nMatching against output: {}'
346            failmsg = failmsg.format(matchJSONKey, matchJSONVal, outputJSONVal[matchJSONKey], outputJSONVal)
347            res.set_result(ResultState.fail)
348            res.set_failmsg(failmsg)
349            return res
350
351    return res
352
353def run_one_test(pm, args, index, tidx):
354    global NAMES
355    result = True
356    tresult = ""
357    tap = ""
358    res = TestResult(tidx['id'], tidx['name'])
359    if args.verbose > 0:
360        print("\t====================\n=====> ", end="")
361    print("Test " + tidx["id"] + ": " + tidx["name"])
362
363    if 'skip' in tidx:
364        if tidx['skip'] == 'yes':
365            res = TestResult(tidx['id'], tidx['name'])
366            res.set_result(ResultState.skip)
367            res.set_errormsg('Test case designated as skipped.')
368            pm.call_pre_case(tidx, test_skip=True)
369            pm.call_post_execute()
370            return res
371
372    if 'dependsOn' in tidx:
373        if (args.verbose > 0):
374            print('probe command for test skip')
375        (p, procout) = exec_cmd(args, pm, 'execute', tidx['dependsOn'])
376        if p:
377            if (p.returncode != 0):
378                res = TestResult(tidx['id'], tidx['name'])
379                res.set_result(ResultState.skip)
380                res.set_errormsg('probe command: test skipped.')
381                pm.call_pre_case(tidx, test_skip=True)
382                pm.call_post_execute()
383                return res
384
385    # populate NAMES with TESTID for this test
386    NAMES['TESTID'] = tidx['id']
387
388    pm.call_pre_case(tidx)
389    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
390
391    if (args.verbose > 0):
392        print('-----> execute stage')
393    pm.call_pre_execute()
394    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
395    if p:
396        exit_code = p.returncode
397    else:
398        exit_code = None
399
400    pm.call_post_execute()
401
402    if (exit_code is None or exit_code != int(tidx["expExitCode"])):
403        print("exit: {!r}".format(exit_code))
404        print("exit: {}".format(int(tidx["expExitCode"])))
405        #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
406        res.set_result(ResultState.fail)
407        res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout))
408        print(procout)
409    else:
410        if args.verbose > 0:
411            print('-----> verify stage')
412        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
413        if procout:
414            if 'matchJSON' in tidx:
415                verify_by_json(procout, res, tidx, args, pm)
416            elif 'matchPattern' in tidx:
417                match_pattern = re.compile(
418                    str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
419                match_index = re.findall(match_pattern, procout)
420                if len(match_index) != int(tidx["matchCount"]):
421                    res.set_result(ResultState.fail)
422                    res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout))
423                else:
424                    res.set_result(ResultState.success)
425            else:
426                res.set_result(ResultState.fail)
427                res.set_failmsg('Must specify a match option: matchJSON or matchPattern\n{}'.format(procout))
428        elif int(tidx["matchCount"]) != 0:
429            res.set_result(ResultState.fail)
430            res.set_failmsg('No output generated by verify command.')
431        else:
432            res.set_result(ResultState.success)
433
434    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
435    pm.call_post_case()
436
437    index += 1
438
439    # remove TESTID from NAMES
440    del(NAMES['TESTID'])
441    return res
442
443def test_runner(pm, args, filtered_tests):
444    """
445    Driver function for the unit tests.
446
447    Prints information about the tests being run, executes the setup and
448    teardown commands and the command under test itself. Also determines
449    success/failure based on the information in the test case and generates
450    TAP output accordingly.
451    """
452    testlist = filtered_tests
453    tcount = len(testlist)
454    index = 1
455    tap = ''
456    badtest = None
457    stage = None
458    emergency_exit = False
459    emergency_exit_message = ''
460
461    tsr = TestSuiteReport()
462
463    try:
464        pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
465    except Exception as ee:
466        ex_type, ex, ex_tb = sys.exc_info()
467        print('Exception {} {} (caught in pre_suite).'.
468              format(ex_type, ex))
469        traceback.print_tb(ex_tb)
470        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
471        emergency_exit = True
472        stage = 'pre-SUITE'
473
474    if emergency_exit:
475        pm.call_post_suite(index)
476        return emergency_exit_message
477    if args.verbose > 1:
478        print('give test rig 2 seconds to stabilize')
479    time.sleep(2)
480    for tidx in testlist:
481        if "flower" in tidx["category"] and args.device == None:
482            errmsg = "Tests using the DEV2 variable must define the name of a "
483            errmsg += "physical NIC with the -d option when running tdc.\n"
484            errmsg += "Test has been skipped."
485            if args.verbose > 1:
486                print(errmsg)
487            res = TestResult(tidx['id'], tidx['name'])
488            res.set_result(ResultState.skip)
489            res.set_errormsg(errmsg)
490            tsr.add_resultdata(res)
491            index += 1
492            continue
493        try:
494            badtest = tidx  # in case it goes bad
495            res = run_one_test(pm, args, index, tidx)
496            tsr.add_resultdata(res)
497        except PluginMgrTestFail as pmtf:
498            ex_type, ex, ex_tb = sys.exc_info()
499            stage = pmtf.stage
500            message = pmtf.message
501            output = pmtf.output
502            res = TestResult(tidx['id'], tidx['name'])
503            res.set_result(ResultState.skip)
504            res.set_errormsg(pmtf.message)
505            res.set_failmsg(pmtf.output)
506            tsr.add_resultdata(res)
507            index += 1
508            print(message)
509            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
510                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
511            print('---------------')
512            print('traceback')
513            traceback.print_tb(ex_tb)
514            print('---------------')
515            if stage == 'teardown':
516                print('accumulated output for this test:')
517                if pmtf.output:
518                    print(pmtf.output)
519            print('---------------')
520            break
521        index += 1
522
523    # if we failed in setup or teardown,
524    # fill in the remaining tests with ok-skipped
525    count = index
526
527    if tcount + 1 != count:
528        for tidx in testlist[count - 1:]:
529            res = TestResult(tidx['id'], tidx['name'])
530            res.set_result(ResultState.skip)
531            msg = 'skipped - previous {} failed {} {}'.format(stage,
532                index, badtest.get('id', '--Unknown--'))
533            res.set_errormsg(msg)
534            tsr.add_resultdata(res)
535            count += 1
536
537    if args.pause:
538        print('Want to pause\nPress enter to continue ...')
539        if input(sys.stdin):
540            print('got something on stdin')
541
542    pm.call_post_suite(index)
543
544    return tsr
545
546def has_blank_ids(idlist):
547    """
548    Search the list for empty ID fields and return true/false accordingly.
549    """
550    return not(all(k for k in idlist))
551
552
553def load_from_file(filename):
554    """
555    Open the JSON file containing the test cases and return them
556    as list of ordered dictionary objects.
557    """
558    try:
559        with open(filename) as test_data:
560            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
561    except json.JSONDecodeError as jde:
562        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
563        testlist = list()
564    else:
565        idlist = get_id_list(testlist)
566        if (has_blank_ids(idlist)):
567            for k in testlist:
568                k['filename'] = filename
569    return testlist
570
571
572def args_parse():
573    """
574    Create the argument parser.
575    """
576    parser = argparse.ArgumentParser(description='Linux TC unit tests')
577    return parser
578
579
580def set_args(parser):
581    """
582    Set the command line arguments for tdc.
583    """
584    parser.add_argument(
585        '--outfile', type=str,
586        help='Path to the file in which results should be saved. ' +
587        'Default target is the current directory.')
588    parser.add_argument(
589        '-p', '--path', type=str,
590        help='The full path to the tc executable to use')
591    sg = parser.add_argument_group(
592        'selection', 'select which test cases: ' +
593        'files plus directories; filtered by categories plus testids')
594    ag = parser.add_argument_group(
595        'action', 'select action to perform on selected test cases')
596
597    sg.add_argument(
598        '-D', '--directory', nargs='+', metavar='DIR',
599        help='Collect tests from the specified directory(ies) ' +
600        '(default [tc-tests])')
601    sg.add_argument(
602        '-f', '--file', nargs='+', metavar='FILE',
603        help='Run tests from the specified file(s)')
604    sg.add_argument(
605        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
606        help='Run tests only from the specified category/ies, ' +
607        'or if no category/ies is/are specified, list known categories.')
608    sg.add_argument(
609        '-e', '--execute', nargs='+', metavar='ID',
610        help='Execute the specified test cases with specified IDs')
611    ag.add_argument(
612        '-l', '--list', action='store_true',
613        help='List all test cases, or those only within the specified category')
614    ag.add_argument(
615        '-s', '--show', action='store_true', dest='showID',
616        help='Display the selected test cases')
617    ag.add_argument(
618        '-i', '--id', action='store_true', dest='gen_id',
619        help='Generate ID numbers for new test cases')
620    parser.add_argument(
621        '-v', '--verbose', action='count', default=0,
622        help='Show the commands that are being run')
623    parser.add_argument(
624        '--format', default='tap', const='tap', nargs='?',
625        choices=['none', 'xunit', 'tap'],
626        help='Specify the format for test results. (Default: TAP)')
627    parser.add_argument('-d', '--device',
628                        help='Execute test cases that use a physical device, ' +
629                        'where DEVICE is its name. (If not defined, tests ' +
630                        'that require a physical device will be skipped)')
631    parser.add_argument(
632        '-P', '--pause', action='store_true',
633        help='Pause execution just before post-suite stage')
634    return parser
635
636
637def check_default_settings(args, remaining, pm):
638    """
639    Process any arguments overriding the default settings,
640    and ensure the settings are correct.
641    """
642    # Allow for overriding specific settings
643    global NAMES
644
645    if args.path != None:
646        NAMES['TC'] = args.path
647    if args.device != None:
648        NAMES['DEV2'] = args.device
649    if 'TIMEOUT' not in NAMES:
650        NAMES['TIMEOUT'] = None
651    if not os.path.isfile(NAMES['TC']):
652        print("The specified tc path " + NAMES['TC'] + " does not exist.")
653        exit(1)
654
655    pm.call_check_args(args, remaining)
656
657
658def get_id_list(alltests):
659    """
660    Generate a list of all IDs in the test cases.
661    """
662    return [x["id"] for x in alltests]
663
664
665def check_case_id(alltests):
666    """
667    Check for duplicate test case IDs.
668    """
669    idl = get_id_list(alltests)
670    return [x for x in idl if idl.count(x) > 1]
671
672
673def does_id_exist(alltests, newid):
674    """
675    Check if a given ID already exists in the list of test cases.
676    """
677    idl = get_id_list(alltests)
678    return (any(newid == x for x in idl))
679
680
681def generate_case_ids(alltests):
682    """
683    If a test case has a blank ID field, generate a random hex ID for it
684    and then write the test cases back to disk.
685    """
686    import random
687    for c in alltests:
688        if (c["id"] == ""):
689            while True:
690                newid = str('{:04x}'.format(random.randrange(16**4)))
691                if (does_id_exist(alltests, newid)):
692                    continue
693                else:
694                    c['id'] = newid
695                    break
696
697    ufilename = []
698    for c in alltests:
699        if ('filename' in c):
700            ufilename.append(c['filename'])
701    ufilename = get_unique_item(ufilename)
702    for f in ufilename:
703        testlist = []
704        for t in alltests:
705            if 'filename' in t:
706                if t['filename'] == f:
707                    del t['filename']
708                    testlist.append(t)
709        outfile = open(f, "w")
710        json.dump(testlist, outfile, indent=4)
711        outfile.write("\n")
712        outfile.close()
713
714def filter_tests_by_id(args, testlist):
715    '''
716    Remove tests from testlist that are not in the named id list.
717    If id list is empty, return empty list.
718    '''
719    newlist = list()
720    if testlist and args.execute:
721        target_ids = args.execute
722
723        if isinstance(target_ids, list) and (len(target_ids) > 0):
724            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
725    return newlist
726
727def filter_tests_by_category(args, testlist):
728    '''
729    Remove tests from testlist that are not in a named category.
730    '''
731    answer = list()
732    if args.category and testlist:
733        test_ids = list()
734        for catg in set(args.category):
735            if catg == '+c':
736                continue
737            print('considering category {}'.format(catg))
738            for tc in testlist:
739                if catg in tc['category'] and tc['id'] not in test_ids:
740                    answer.append(tc)
741                    test_ids.append(tc['id'])
742
743    return answer
744
745
746def get_test_cases(args):
747    """
748    If a test case file is specified, retrieve tests from that file.
749    Otherwise, glob for all json files in subdirectories and load from
750    each one.
751    Also, if requested, filter by category, and add tests matching
752    certain ids.
753    """
754    import fnmatch
755
756    flist = []
757    testdirs = ['tc-tests']
758
759    if args.file:
760        # at least one file was specified - remove the default directory
761        testdirs = []
762
763        for ff in args.file:
764            if not os.path.isfile(ff):
765                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
766            else:
767                flist.append(os.path.abspath(ff))
768
769    if args.directory:
770        testdirs = args.directory
771
772    for testdir in testdirs:
773        for root, dirnames, filenames in os.walk(testdir):
774            for filename in fnmatch.filter(filenames, '*.json'):
775                candidate = os.path.abspath(os.path.join(root, filename))
776                if candidate not in testdirs:
777                    flist.append(candidate)
778
779    alltestcases = list()
780    for casefile in flist:
781        alltestcases = alltestcases + (load_from_file(casefile))
782
783    allcatlist = get_test_categories(alltestcases)
784    allidlist = get_id_list(alltestcases)
785
786    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
787    idtestcases = filter_tests_by_id(args, alltestcases)
788    cattestcases = filter_tests_by_category(args, alltestcases)
789
790    cat_ids = [x['id'] for x in cattestcases]
791    if args.execute:
792        if args.category:
793            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
794        else:
795            alltestcases = idtestcases
796    else:
797        if cat_ids:
798            alltestcases = cattestcases
799        else:
800            # just accept the existing value of alltestcases,
801            # which has been filtered by file/directory
802            pass
803
804    return allcatlist, allidlist, testcases_by_cats, alltestcases
805
806
807def set_operation_mode(pm, parser, args, remaining):
808    """
809    Load the test case data and process remaining arguments to determine
810    what the script should do for this run, and call the appropriate
811    function.
812    """
813    ucat, idlist, testcases, alltests = get_test_cases(args)
814
815    if args.gen_id:
816        if (has_blank_ids(idlist)):
817            alltests = generate_case_ids(alltests)
818        else:
819            print("No empty ID fields found in test files.")
820        exit(0)
821
822    duplicate_ids = check_case_id(alltests)
823    if (len(duplicate_ids) > 0):
824        print("The following test case IDs are not unique:")
825        print(str(set(duplicate_ids)))
826        print("Please correct them before continuing.")
827        exit(1)
828
829    if args.showID:
830        for atest in alltests:
831            print_test_case(atest)
832        exit(0)
833
834    if isinstance(args.category, list) and (len(args.category) == 0):
835        print("Available categories:")
836        print_sll(ucat)
837        exit(0)
838
839    if args.list:
840        list_test_cases(alltests)
841        exit(0)
842
843    exit_code = 0 # KSFT_PASS
844    if len(alltests):
845        req_plugins = pm.get_required_plugins(alltests)
846        try:
847            args = pm.load_required_plugins(req_plugins, parser, args, remaining)
848        except PluginDependencyException as pde:
849            print('The following plugins were not found:')
850            print('{}'.format(pde.missing_pg))
851        catresults = test_runner(pm, args, alltests)
852        if catresults.count_failures() != 0:
853            exit_code = 1 # KSFT_FAIL
854        if args.format == 'none':
855            print('Test results output suppression requested\n')
856        else:
857            print('\nAll test results: \n')
858            if args.format == 'xunit':
859                suffix = 'xml'
860                res = catresults.format_xunit()
861            elif args.format == 'tap':
862                suffix = 'tap'
863                res = catresults.format_tap()
864            print(res)
865            print('\n\n')
866            if not args.outfile:
867                fname = 'test-results.{}'.format(suffix)
868            else:
869                fname = args.outfile
870            with open(fname, 'w') as fh:
871                fh.write(res)
872                fh.close()
873                if os.getenv('SUDO_UID') is not None:
874                    os.chown(fname, uid=int(os.getenv('SUDO_UID')),
875                        gid=int(os.getenv('SUDO_GID')))
876    else:
877        print('No tests found\n')
878        exit_code = 4 # KSFT_SKIP
879    exit(exit_code)
880
881def main():
882    """
883    Start of execution; set up argument parser and get the arguments,
884    and start operations.
885    """
886    parser = args_parse()
887    parser = set_args(parser)
888    pm = PluginMgr(parser)
889    parser = pm.call_add_args(parser)
890    (args, remaining) = parser.parse_known_args()
891    args.NAMES = NAMES
892    pm.set_args(args)
893    check_default_settings(args, remaining, pm)
894    if args.verbose > 2:
895        print('args is {}'.format(args))
896
897    set_operation_mode(pm, parser, args, remaining)
898
899if __name__ == "__main__":
900    main()
901