1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4""" 5tdc.py - Linux tc (Traffic Control) unit test driver 6 7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 8""" 9 10import re 11import os 12import sys 13import argparse 14import importlib 15import json 16import subprocess 17import time 18import traceback 19from collections import OrderedDict 20from string import Template 21 22from tdc_config import * 23from tdc_helper import * 24 25import TdcPlugin 26from TdcResults import * 27 28 29class PluginMgrTestFail(Exception): 30 def __init__(self, stage, output, message): 31 self.stage = stage 32 self.output = output 33 self.message = message 34 35class PluginMgr: 36 def __init__(self, argparser): 37 super().__init__() 38 self.plugins = {} 39 self.plugin_instances = [] 40 self.args = [] 41 self.argparser = argparser 42 43 # TODO, put plugins in order 44 plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins') 45 for dirpath, dirnames, filenames in os.walk(plugindir): 46 for fn in filenames: 47 if (fn.endswith('.py') and 48 not fn == '__init__.py' and 49 not fn.startswith('#') and 50 not fn.startswith('.#')): 51 mn = fn[0:-3] 52 foo = importlib.import_module('plugins.' + mn) 53 self.plugins[mn] = foo 54 self.plugin_instances.append(foo.SubPlugin()) 55 56 def call_pre_suite(self, testcount, testidlist): 57 for pgn_inst in self.plugin_instances: 58 pgn_inst.pre_suite(testcount, testidlist) 59 60 def call_post_suite(self, index): 61 for pgn_inst in reversed(self.plugin_instances): 62 pgn_inst.post_suite(index) 63 64 def call_pre_case(self, testid, test_name, *, test_skip=False): 65 for pgn_inst in self.plugin_instances: 66 try: 67 pgn_inst.pre_case(testid, test_name, test_skip) 68 except Exception as ee: 69 print('exception {} in call to pre_case for {} plugin'. 70 format(ee, pgn_inst.__class__)) 71 print('test_ordinal is {}'.format(test_ordinal)) 72 print('testid is {}'.format(testid)) 73 raise 74 75 def call_post_case(self): 76 for pgn_inst in reversed(self.plugin_instances): 77 pgn_inst.post_case() 78 79 def call_pre_execute(self): 80 for pgn_inst in self.plugin_instances: 81 pgn_inst.pre_execute() 82 83 def call_post_execute(self): 84 for pgn_inst in reversed(self.plugin_instances): 85 pgn_inst.post_execute() 86 87 def call_add_args(self, parser): 88 for pgn_inst in self.plugin_instances: 89 parser = pgn_inst.add_args(parser) 90 return parser 91 92 def call_check_args(self, args, remaining): 93 for pgn_inst in self.plugin_instances: 94 pgn_inst.check_args(args, remaining) 95 96 def call_adjust_command(self, stage, command): 97 for pgn_inst in self.plugin_instances: 98 command = pgn_inst.adjust_command(stage, command) 99 return command 100 101 @staticmethod 102 def _make_argparser(args): 103 self.argparser = argparse.ArgumentParser( 104 description='Linux TC unit tests') 105 106def replace_keywords(cmd): 107 """ 108 For a given executable command, substitute any known 109 variables contained within NAMES with the correct values 110 """ 111 tcmd = Template(cmd) 112 subcmd = tcmd.safe_substitute(NAMES) 113 return subcmd 114 115 116def exec_cmd(args, pm, stage, command): 117 """ 118 Perform any required modifications on an executable command, then run 119 it in a subprocess and return the results. 120 """ 121 if len(command.strip()) == 0: 122 return None, None 123 if '$' in command: 124 command = replace_keywords(command) 125 126 command = pm.call_adjust_command(stage, command) 127 if args.verbose > 0: 128 print('command "{}"'.format(command)) 129 proc = subprocess.Popen(command, 130 shell=True, 131 stdout=subprocess.PIPE, 132 stderr=subprocess.PIPE, 133 env=ENVIR) 134 135 try: 136 (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT']) 137 if proc.returncode != 0 and len(serr) > 0: 138 foutput = serr.decode("utf-8", errors="ignore") 139 else: 140 foutput = rawout.decode("utf-8", errors="ignore") 141 except subprocess.TimeoutExpired: 142 foutput = "Command \"{}\" timed out\n".format(command) 143 proc.returncode = 255 144 145 proc.stdout.close() 146 proc.stderr.close() 147 return proc, foutput 148 149 150def prepare_env(args, pm, stage, prefix, cmdlist, output = None): 151 """ 152 Execute the setup/teardown commands for a test case. 153 Optionally terminate test execution if the command fails. 154 """ 155 if args.verbose > 0: 156 print('{}'.format(prefix)) 157 for cmdinfo in cmdlist: 158 if isinstance(cmdinfo, list): 159 exit_codes = cmdinfo[1:] 160 cmd = cmdinfo[0] 161 else: 162 exit_codes = [0] 163 cmd = cmdinfo 164 165 if not cmd: 166 continue 167 168 (proc, foutput) = exec_cmd(args, pm, stage, cmd) 169 170 if proc and (proc.returncode not in exit_codes): 171 print('', file=sys.stderr) 172 print("{} *** Could not execute: \"{}\"".format(prefix, cmd), 173 file=sys.stderr) 174 print("\n{} *** Error message: \"{}\"".format(prefix, foutput), 175 file=sys.stderr) 176 print("returncode {}; expected {}".format(proc.returncode, 177 exit_codes)) 178 print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr) 179 print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr) 180 print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr) 181 raise PluginMgrTestFail( 182 stage, output, 183 '"{}" did not complete successfully'.format(prefix)) 184 185def run_one_test(pm, args, index, tidx): 186 global NAMES 187 result = True 188 tresult = "" 189 tap = "" 190 res = TestResult(tidx['id'], tidx['name']) 191 if args.verbose > 0: 192 print("\t====================\n=====> ", end="") 193 print("Test " + tidx["id"] + ": " + tidx["name"]) 194 195 if 'skip' in tidx: 196 if tidx['skip'] == 'yes': 197 res = TestResult(tidx['id'], tidx['name']) 198 res.set_result(ResultState.skip) 199 res.set_errormsg('Test case designated as skipped.') 200 pm.call_pre_case(tidx['id'], tidx['name'], test_skip=True) 201 pm.call_post_execute() 202 return res 203 204 # populate NAMES with TESTID for this test 205 NAMES['TESTID'] = tidx['id'] 206 207 pm.call_pre_case(tidx['id'], tidx['name']) 208 prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"]) 209 210 if (args.verbose > 0): 211 print('-----> execute stage') 212 pm.call_pre_execute() 213 (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"]) 214 if p: 215 exit_code = p.returncode 216 else: 217 exit_code = None 218 219 pm.call_post_execute() 220 221 if (exit_code is None or exit_code != int(tidx["expExitCode"])): 222 print("exit: {!r}".format(exit_code)) 223 print("exit: {}".format(int(tidx["expExitCode"]))) 224 #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"]))) 225 res.set_result(ResultState.fail) 226 res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout)) 227 print(procout) 228 else: 229 if args.verbose > 0: 230 print('-----> verify stage') 231 match_pattern = re.compile( 232 str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE) 233 (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"]) 234 if procout: 235 match_index = re.findall(match_pattern, procout) 236 if len(match_index) != int(tidx["matchCount"]): 237 res.set_result(ResultState.fail) 238 res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout)) 239 else: 240 res.set_result(ResultState.success) 241 elif int(tidx["matchCount"]) != 0: 242 res.set_result(ResultState.fail) 243 res.set_failmsg('No output generated by verify command.') 244 else: 245 res.set_result(ResultState.success) 246 247 prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout) 248 pm.call_post_case() 249 250 index += 1 251 252 # remove TESTID from NAMES 253 del(NAMES['TESTID']) 254 return res 255 256def test_runner(pm, args, filtered_tests): 257 """ 258 Driver function for the unit tests. 259 260 Prints information about the tests being run, executes the setup and 261 teardown commands and the command under test itself. Also determines 262 success/failure based on the information in the test case and generates 263 TAP output accordingly. 264 """ 265 testlist = filtered_tests 266 tcount = len(testlist) 267 index = 1 268 tap = '' 269 badtest = None 270 stage = None 271 emergency_exit = False 272 emergency_exit_message = '' 273 274 tsr = TestSuiteReport() 275 276 try: 277 pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist]) 278 except Exception as ee: 279 ex_type, ex, ex_tb = sys.exc_info() 280 print('Exception {} {} (caught in pre_suite).'. 281 format(ex_type, ex)) 282 traceback.print_tb(ex_tb) 283 emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex) 284 emergency_exit = True 285 stage = 'pre-SUITE' 286 287 if emergency_exit: 288 pm.call_post_suite(index) 289 return emergency_exit_message 290 if args.verbose > 1: 291 print('give test rig 2 seconds to stabilize') 292 time.sleep(2) 293 for tidx in testlist: 294 if "flower" in tidx["category"] and args.device == None: 295 if args.verbose > 1: 296 print('Not executing test {} {} because DEV2 not defined'. 297 format(tidx['id'], tidx['name'])) 298 res = TestResult(tidx['id'], tidx['name']) 299 res.set_result(ResultState.skip) 300 res.set_errormsg('Not executed because DEV2 is not defined') 301 tsr.add_resultdata(res) 302 continue 303 try: 304 badtest = tidx # in case it goes bad 305 res = run_one_test(pm, args, index, tidx) 306 tsr.add_resultdata(res) 307 except PluginMgrTestFail as pmtf: 308 ex_type, ex, ex_tb = sys.exc_info() 309 stage = pmtf.stage 310 message = pmtf.message 311 output = pmtf.output 312 res = TestResult(tidx['id'], tidx['name']) 313 res.set_result(ResultState.skip) 314 res.set_errormsg(pmtf.message) 315 res.set_failmsg(pmtf.output) 316 tsr.add_resultdata(res) 317 index += 1 318 print(message) 319 print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'. 320 format(ex_type, ex, index, tidx['id'], tidx['name'], stage)) 321 print('---------------') 322 print('traceback') 323 traceback.print_tb(ex_tb) 324 print('---------------') 325 if stage == 'teardown': 326 print('accumulated output for this test:') 327 if pmtf.output: 328 print(pmtf.output) 329 print('---------------') 330 break 331 index += 1 332 333 # if we failed in setup or teardown, 334 # fill in the remaining tests with ok-skipped 335 count = index 336 337 if tcount + 1 != count: 338 for tidx in testlist[count - 1:]: 339 res = TestResult(tidx['id'], tidx['name']) 340 res.set_result(ResultState.skip) 341 msg = 'skipped - previous {} failed {} {}'.format(stage, 342 index, badtest.get('id', '--Unknown--')) 343 res.set_errormsg(msg) 344 tsr.add_resultdata(res) 345 count += 1 346 347 if args.pause: 348 print('Want to pause\nPress enter to continue ...') 349 if input(sys.stdin): 350 print('got something on stdin') 351 352 pm.call_post_suite(index) 353 354 return tsr 355 356def has_blank_ids(idlist): 357 """ 358 Search the list for empty ID fields and return true/false accordingly. 359 """ 360 return not(all(k for k in idlist)) 361 362 363def load_from_file(filename): 364 """ 365 Open the JSON file containing the test cases and return them 366 as list of ordered dictionary objects. 367 """ 368 try: 369 with open(filename) as test_data: 370 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 371 except json.JSONDecodeError as jde: 372 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde)) 373 testlist = list() 374 else: 375 idlist = get_id_list(testlist) 376 if (has_blank_ids(idlist)): 377 for k in testlist: 378 k['filename'] = filename 379 return testlist 380 381 382def args_parse(): 383 """ 384 Create the argument parser. 385 """ 386 parser = argparse.ArgumentParser(description='Linux TC unit tests') 387 return parser 388 389 390def set_args(parser): 391 """ 392 Set the command line arguments for tdc. 393 """ 394 parser.add_argument( 395 '--outfile', type=str, 396 help='Path to the file in which results should be saved. ' + 397 'Default target is the current directory.') 398 parser.add_argument( 399 '-p', '--path', type=str, 400 help='The full path to the tc executable to use') 401 sg = parser.add_argument_group( 402 'selection', 'select which test cases: ' + 403 'files plus directories; filtered by categories plus testids') 404 ag = parser.add_argument_group( 405 'action', 'select action to perform on selected test cases') 406 407 sg.add_argument( 408 '-D', '--directory', nargs='+', metavar='DIR', 409 help='Collect tests from the specified directory(ies) ' + 410 '(default [tc-tests])') 411 sg.add_argument( 412 '-f', '--file', nargs='+', metavar='FILE', 413 help='Run tests from the specified file(s)') 414 sg.add_argument( 415 '-c', '--category', nargs='*', metavar='CATG', default=['+c'], 416 help='Run tests only from the specified category/ies, ' + 417 'or if no category/ies is/are specified, list known categories.') 418 sg.add_argument( 419 '-e', '--execute', nargs='+', metavar='ID', 420 help='Execute the specified test cases with specified IDs') 421 ag.add_argument( 422 '-l', '--list', action='store_true', 423 help='List all test cases, or those only within the specified category') 424 ag.add_argument( 425 '-s', '--show', action='store_true', dest='showID', 426 help='Display the selected test cases') 427 ag.add_argument( 428 '-i', '--id', action='store_true', dest='gen_id', 429 help='Generate ID numbers for new test cases') 430 parser.add_argument( 431 '-v', '--verbose', action='count', default=0, 432 help='Show the commands that are being run') 433 parser.add_argument( 434 '--format', default='tap', const='tap', nargs='?', 435 choices=['none', 'xunit', 'tap'], 436 help='Specify the format for test results. (Default: TAP)') 437 parser.add_argument('-d', '--device', 438 help='Execute the test case in flower category') 439 parser.add_argument( 440 '-P', '--pause', action='store_true', 441 help='Pause execution just before post-suite stage') 442 return parser 443 444 445def check_default_settings(args, remaining, pm): 446 """ 447 Process any arguments overriding the default settings, 448 and ensure the settings are correct. 449 """ 450 # Allow for overriding specific settings 451 global NAMES 452 453 if args.path != None: 454 NAMES['TC'] = args.path 455 if args.device != None: 456 NAMES['DEV2'] = args.device 457 if 'TIMEOUT' not in NAMES: 458 NAMES['TIMEOUT'] = None 459 if not os.path.isfile(NAMES['TC']): 460 print("The specified tc path " + NAMES['TC'] + " does not exist.") 461 exit(1) 462 463 pm.call_check_args(args, remaining) 464 465 466def get_id_list(alltests): 467 """ 468 Generate a list of all IDs in the test cases. 469 """ 470 return [x["id"] for x in alltests] 471 472 473def check_case_id(alltests): 474 """ 475 Check for duplicate test case IDs. 476 """ 477 idl = get_id_list(alltests) 478 return [x for x in idl if idl.count(x) > 1] 479 480 481def does_id_exist(alltests, newid): 482 """ 483 Check if a given ID already exists in the list of test cases. 484 """ 485 idl = get_id_list(alltests) 486 return (any(newid == x for x in idl)) 487 488 489def generate_case_ids(alltests): 490 """ 491 If a test case has a blank ID field, generate a random hex ID for it 492 and then write the test cases back to disk. 493 """ 494 import random 495 for c in alltests: 496 if (c["id"] == ""): 497 while True: 498 newid = str('{:04x}'.format(random.randrange(16**4))) 499 if (does_id_exist(alltests, newid)): 500 continue 501 else: 502 c['id'] = newid 503 break 504 505 ufilename = [] 506 for c in alltests: 507 if ('filename' in c): 508 ufilename.append(c['filename']) 509 ufilename = get_unique_item(ufilename) 510 for f in ufilename: 511 testlist = [] 512 for t in alltests: 513 if 'filename' in t: 514 if t['filename'] == f: 515 del t['filename'] 516 testlist.append(t) 517 outfile = open(f, "w") 518 json.dump(testlist, outfile, indent=4) 519 outfile.write("\n") 520 outfile.close() 521 522def filter_tests_by_id(args, testlist): 523 ''' 524 Remove tests from testlist that are not in the named id list. 525 If id list is empty, return empty list. 526 ''' 527 newlist = list() 528 if testlist and args.execute: 529 target_ids = args.execute 530 531 if isinstance(target_ids, list) and (len(target_ids) > 0): 532 newlist = list(filter(lambda x: x['id'] in target_ids, testlist)) 533 return newlist 534 535def filter_tests_by_category(args, testlist): 536 ''' 537 Remove tests from testlist that are not in a named category. 538 ''' 539 answer = list() 540 if args.category and testlist: 541 test_ids = list() 542 for catg in set(args.category): 543 if catg == '+c': 544 continue 545 print('considering category {}'.format(catg)) 546 for tc in testlist: 547 if catg in tc['category'] and tc['id'] not in test_ids: 548 answer.append(tc) 549 test_ids.append(tc['id']) 550 551 return answer 552 553def get_test_cases(args): 554 """ 555 If a test case file is specified, retrieve tests from that file. 556 Otherwise, glob for all json files in subdirectories and load from 557 each one. 558 Also, if requested, filter by category, and add tests matching 559 certain ids. 560 """ 561 import fnmatch 562 563 flist = [] 564 testdirs = ['tc-tests'] 565 566 if args.file: 567 # at least one file was specified - remove the default directory 568 testdirs = [] 569 570 for ff in args.file: 571 if not os.path.isfile(ff): 572 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.") 573 else: 574 flist.append(os.path.abspath(ff)) 575 576 if args.directory: 577 testdirs = args.directory 578 579 for testdir in testdirs: 580 for root, dirnames, filenames in os.walk(testdir): 581 for filename in fnmatch.filter(filenames, '*.json'): 582 candidate = os.path.abspath(os.path.join(root, filename)) 583 if candidate not in testdirs: 584 flist.append(candidate) 585 586 alltestcases = list() 587 for casefile in flist: 588 alltestcases = alltestcases + (load_from_file(casefile)) 589 590 allcatlist = get_test_categories(alltestcases) 591 allidlist = get_id_list(alltestcases) 592 593 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist) 594 idtestcases = filter_tests_by_id(args, alltestcases) 595 cattestcases = filter_tests_by_category(args, alltestcases) 596 597 cat_ids = [x['id'] for x in cattestcases] 598 if args.execute: 599 if args.category: 600 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids] 601 else: 602 alltestcases = idtestcases 603 else: 604 if cat_ids: 605 alltestcases = cattestcases 606 else: 607 # just accept the existing value of alltestcases, 608 # which has been filtered by file/directory 609 pass 610 611 return allcatlist, allidlist, testcases_by_cats, alltestcases 612 613 614def set_operation_mode(pm, args): 615 """ 616 Load the test case data and process remaining arguments to determine 617 what the script should do for this run, and call the appropriate 618 function. 619 """ 620 ucat, idlist, testcases, alltests = get_test_cases(args) 621 622 if args.gen_id: 623 if (has_blank_ids(idlist)): 624 alltests = generate_case_ids(alltests) 625 else: 626 print("No empty ID fields found in test files.") 627 exit(0) 628 629 duplicate_ids = check_case_id(alltests) 630 if (len(duplicate_ids) > 0): 631 print("The following test case IDs are not unique:") 632 print(str(set(duplicate_ids))) 633 print("Please correct them before continuing.") 634 exit(1) 635 636 if args.showID: 637 for atest in alltests: 638 print_test_case(atest) 639 exit(0) 640 641 if isinstance(args.category, list) and (len(args.category) == 0): 642 print("Available categories:") 643 print_sll(ucat) 644 exit(0) 645 646 if args.list: 647 if args.list: 648 list_test_cases(alltests) 649 exit(0) 650 651 if len(alltests): 652 catresults = test_runner(pm, args, alltests) 653 if args.format == 'none': 654 print('Test results output suppression requested\n') 655 else: 656 print('\nAll test results: \n') 657 if args.format == 'xunit': 658 suffix = 'xml' 659 res = catresults.format_xunit() 660 elif args.format == 'tap': 661 suffix = 'tap' 662 res = catresults.format_tap() 663 print(res) 664 print('\n\n') 665 if not args.outfile: 666 fname = 'test-results.{}'.format(suffix) 667 else: 668 fname = args.outfile 669 with open(fname, 'w') as fh: 670 fh.write(res) 671 fh.close() 672 if os.getenv('SUDO_UID') is not None: 673 os.chown(fname, uid=int(os.getenv('SUDO_UID')), 674 gid=int(os.getenv('SUDO_GID'))) 675 else: 676 print('No tests found\n') 677 678def main(): 679 """ 680 Start of execution; set up argument parser and get the arguments, 681 and start operations. 682 """ 683 parser = args_parse() 684 parser = set_args(parser) 685 pm = PluginMgr(parser) 686 parser = pm.call_add_args(parser) 687 (args, remaining) = parser.parse_known_args() 688 args.NAMES = NAMES 689 check_default_settings(args, remaining, pm) 690 if args.verbose > 2: 691 print('args is {}'.format(args)) 692 693 set_operation_mode(pm, args) 694 695 exit(0) 696 697 698if __name__ == "__main__": 699 main() 700