1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4""" 5tdc.py - Linux tc (Traffic Control) unit test driver 6 7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 8""" 9 10import re 11import os 12import sys 13import argparse 14import importlib 15import json 16import subprocess 17import time 18import traceback 19from collections import OrderedDict 20from string import Template 21 22from tdc_config import * 23from tdc_helper import * 24 25import TdcPlugin 26from TdcResults import * 27 28 29class PluginMgrTestFail(Exception): 30 def __init__(self, stage, output, message): 31 self.stage = stage 32 self.output = output 33 self.message = message 34 35class PluginMgr: 36 def __init__(self, argparser): 37 super().__init__() 38 self.plugins = {} 39 self.plugin_instances = [] 40 self.args = [] 41 self.argparser = argparser 42 43 # TODO, put plugins in order 44 plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins') 45 for dirpath, dirnames, filenames in os.walk(plugindir): 46 for fn in filenames: 47 if (fn.endswith('.py') and 48 not fn == '__init__.py' and 49 not fn.startswith('#') and 50 not fn.startswith('.#')): 51 mn = fn[0:-3] 52 foo = importlib.import_module('plugins.' + mn) 53 self.plugins[mn] = foo 54 self.plugin_instances.append(foo.SubPlugin()) 55 56 def call_pre_suite(self, testcount, testidlist): 57 for pgn_inst in self.plugin_instances: 58 pgn_inst.pre_suite(testcount, testidlist) 59 60 def call_post_suite(self, index): 61 for pgn_inst in reversed(self.plugin_instances): 62 pgn_inst.post_suite(index) 63 64 def call_pre_case(self, test_ordinal, testid, test_name): 65 for pgn_inst in self.plugin_instances: 66 try: 67 pgn_inst.pre_case(test_ordinal, testid, test_name) 68 except Exception as ee: 69 print('exception {} in call to pre_case for {} plugin'. 70 format(ee, pgn_inst.__class__)) 71 print('test_ordinal is {}'.format(test_ordinal)) 72 print('testid is {}'.format(testid)) 73 raise 74 75 def call_post_case(self): 76 for pgn_inst in reversed(self.plugin_instances): 77 pgn_inst.post_case() 78 79 def call_pre_execute(self): 80 for pgn_inst in self.plugin_instances: 81 pgn_inst.pre_execute() 82 83 def call_post_execute(self): 84 for pgn_inst in reversed(self.plugin_instances): 85 pgn_inst.post_execute() 86 87 def call_add_args(self, parser): 88 for pgn_inst in self.plugin_instances: 89 parser = pgn_inst.add_args(parser) 90 return parser 91 92 def call_check_args(self, args, remaining): 93 for pgn_inst in self.plugin_instances: 94 pgn_inst.check_args(args, remaining) 95 96 def call_adjust_command(self, stage, command): 97 for pgn_inst in self.plugin_instances: 98 command = pgn_inst.adjust_command(stage, command) 99 return command 100 101 @staticmethod 102 def _make_argparser(args): 103 self.argparser = argparse.ArgumentParser( 104 description='Linux TC unit tests') 105 106def replace_keywords(cmd): 107 """ 108 For a given executable command, substitute any known 109 variables contained within NAMES with the correct values 110 """ 111 tcmd = Template(cmd) 112 subcmd = tcmd.safe_substitute(NAMES) 113 return subcmd 114 115 116def exec_cmd(args, pm, stage, command): 117 """ 118 Perform any required modifications on an executable command, then run 119 it in a subprocess and return the results. 120 """ 121 if len(command.strip()) == 0: 122 return None, None 123 if '$' in command: 124 command = replace_keywords(command) 125 126 command = pm.call_adjust_command(stage, command) 127 if args.verbose > 0: 128 print('command "{}"'.format(command)) 129 proc = subprocess.Popen(command, 130 shell=True, 131 stdout=subprocess.PIPE, 132 stderr=subprocess.PIPE, 133 env=ENVIR) 134 135 try: 136 (rawout, serr) = proc.communicate(timeout=NAMES['TIMEOUT']) 137 if proc.returncode != 0 and len(serr) > 0: 138 foutput = serr.decode("utf-8", errors="ignore") 139 else: 140 foutput = rawout.decode("utf-8", errors="ignore") 141 except subprocess.TimeoutExpired: 142 foutput = "Command \"{}\" timed out\n".format(command) 143 proc.returncode = 255 144 145 proc.stdout.close() 146 proc.stderr.close() 147 return proc, foutput 148 149 150def prepare_env(args, pm, stage, prefix, cmdlist, output = None): 151 """ 152 Execute the setup/teardown commands for a test case. 153 Optionally terminate test execution if the command fails. 154 """ 155 if args.verbose > 0: 156 print('{}'.format(prefix)) 157 for cmdinfo in cmdlist: 158 if isinstance(cmdinfo, list): 159 exit_codes = cmdinfo[1:] 160 cmd = cmdinfo[0] 161 else: 162 exit_codes = [0] 163 cmd = cmdinfo 164 165 if not cmd: 166 continue 167 168 (proc, foutput) = exec_cmd(args, pm, stage, cmd) 169 170 if proc and (proc.returncode not in exit_codes): 171 print('', file=sys.stderr) 172 print("{} *** Could not execute: \"{}\"".format(prefix, cmd), 173 file=sys.stderr) 174 print("\n{} *** Error message: \"{}\"".format(prefix, foutput), 175 file=sys.stderr) 176 print("returncode {}; expected {}".format(proc.returncode, 177 exit_codes)) 178 print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr) 179 print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr) 180 print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr) 181 raise PluginMgrTestFail( 182 stage, output, 183 '"{}" did not complete successfully'.format(prefix)) 184 185def run_one_test(pm, args, index, tidx): 186 global NAMES 187 result = True 188 tresult = "" 189 tap = "" 190 res = TestResult(tidx['id'], tidx['name']) 191 if args.verbose > 0: 192 print("\t====================\n=====> ", end="") 193 print("Test " + tidx["id"] + ": " + tidx["name"]) 194 195 # populate NAMES with TESTID for this test 196 NAMES['TESTID'] = tidx['id'] 197 198 pm.call_pre_case(index, tidx['id'], tidx['name']) 199 prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"]) 200 201 if (args.verbose > 0): 202 print('-----> execute stage') 203 pm.call_pre_execute() 204 (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"]) 205 if p: 206 exit_code = p.returncode 207 else: 208 exit_code = None 209 210 pm.call_post_execute() 211 212 if (exit_code is None or exit_code != int(tidx["expExitCode"])): 213 print("exit: {!r}".format(exit_code)) 214 print("exit: {}".format(int(tidx["expExitCode"]))) 215 #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"]))) 216 res.set_result(ResultState.fail) 217 res.set_failmsg('Command exited with {}, expected {}\n{}'.format(exit_code, tidx["expExitCode"], procout)) 218 print(procout) 219 else: 220 if args.verbose > 0: 221 print('-----> verify stage') 222 match_pattern = re.compile( 223 str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE) 224 (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"]) 225 if procout: 226 match_index = re.findall(match_pattern, procout) 227 if len(match_index) != int(tidx["matchCount"]): 228 res.set_result(ResultState.fail) 229 res.set_failmsg('Could not match regex pattern. Verify command output:\n{}'.format(procout)) 230 else: 231 res.set_result(ResultState.success) 232 elif int(tidx["matchCount"]) != 0: 233 res.set_result(ResultState.fail) 234 res.set_failmsg('No output generated by verify command.') 235 else: 236 res.set_result(ResultState.success) 237 238 prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout) 239 pm.call_post_case() 240 241 index += 1 242 243 # remove TESTID from NAMES 244 del(NAMES['TESTID']) 245 return res 246 247def test_runner(pm, args, filtered_tests): 248 """ 249 Driver function for the unit tests. 250 251 Prints information about the tests being run, executes the setup and 252 teardown commands and the command under test itself. Also determines 253 success/failure based on the information in the test case and generates 254 TAP output accordingly. 255 """ 256 testlist = filtered_tests 257 tcount = len(testlist) 258 index = 1 259 tap = '' 260 badtest = None 261 stage = None 262 emergency_exit = False 263 emergency_exit_message = '' 264 265 tsr = TestSuiteReport() 266 267 try: 268 pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist]) 269 except Exception as ee: 270 ex_type, ex, ex_tb = sys.exc_info() 271 print('Exception {} {} (caught in pre_suite).'. 272 format(ex_type, ex)) 273 traceback.print_tb(ex_tb) 274 emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex) 275 emergency_exit = True 276 stage = 'pre-SUITE' 277 278 if emergency_exit: 279 pm.call_post_suite(index) 280 return emergency_exit_message 281 if args.verbose > 1: 282 print('give test rig 2 seconds to stabilize') 283 time.sleep(2) 284 for tidx in testlist: 285 if "flower" in tidx["category"] and args.device == None: 286 if args.verbose > 1: 287 print('Not executing test {} {} because DEV2 not defined'. 288 format(tidx['id'], tidx['name'])) 289 res = TestResult(tidx['id'], tidx['name']) 290 res.set_result(ResultState.skip) 291 res.set_errormsg('Not executed because DEV2 is not defined') 292 tsr.add_resultdata(res) 293 continue 294 try: 295 badtest = tidx # in case it goes bad 296 res = run_one_test(pm, args, index, tidx) 297 tsr.add_resultdata(res) 298 except PluginMgrTestFail as pmtf: 299 ex_type, ex, ex_tb = sys.exc_info() 300 stage = pmtf.stage 301 message = pmtf.message 302 output = pmtf.output 303 res = TestResult(tidx['id'], tidx['name']) 304 res.set_result(ResultState.skip) 305 res.set_errormsg(pmtf.message) 306 res.set_failmsg(pmtf.output) 307 tsr.add_resultdata(res) 308 index += 1 309 print(message) 310 print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'. 311 format(ex_type, ex, index, tidx['id'], tidx['name'], stage)) 312 print('---------------') 313 print('traceback') 314 traceback.print_tb(ex_tb) 315 print('---------------') 316 if stage == 'teardown': 317 print('accumulated output for this test:') 318 if pmtf.output: 319 print(pmtf.output) 320 print('---------------') 321 break 322 index += 1 323 324 # if we failed in setup or teardown, 325 # fill in the remaining tests with ok-skipped 326 count = index 327 328 if tcount + 1 != count: 329 for tidx in testlist[count - 1:]: 330 res = TestResult(tidx['id'], tidx['name']) 331 res.set_result(ResultState.skip) 332 msg = 'skipped - previous {} failed {} {}'.format(stage, 333 index, badtest.get('id', '--Unknown--')) 334 res.set_errormsg(msg) 335 tsr.add_resultdata(res) 336 count += 1 337 338 if args.pause: 339 print('Want to pause\nPress enter to continue ...') 340 if input(sys.stdin): 341 print('got something on stdin') 342 343 pm.call_post_suite(index) 344 345 return tsr 346 347def has_blank_ids(idlist): 348 """ 349 Search the list for empty ID fields and return true/false accordingly. 350 """ 351 return not(all(k for k in idlist)) 352 353 354def load_from_file(filename): 355 """ 356 Open the JSON file containing the test cases and return them 357 as list of ordered dictionary objects. 358 """ 359 try: 360 with open(filename) as test_data: 361 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 362 except json.JSONDecodeError as jde: 363 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde)) 364 testlist = list() 365 else: 366 idlist = get_id_list(testlist) 367 if (has_blank_ids(idlist)): 368 for k in testlist: 369 k['filename'] = filename 370 return testlist 371 372 373def args_parse(): 374 """ 375 Create the argument parser. 376 """ 377 parser = argparse.ArgumentParser(description='Linux TC unit tests') 378 return parser 379 380 381def set_args(parser): 382 """ 383 Set the command line arguments for tdc. 384 """ 385 parser.add_argument( 386 '--outfile', type=str, 387 help='Path to the file in which results should be saved. ' + 388 'Default target is the current directory.') 389 parser.add_argument( 390 '-p', '--path', type=str, 391 help='The full path to the tc executable to use') 392 sg = parser.add_argument_group( 393 'selection', 'select which test cases: ' + 394 'files plus directories; filtered by categories plus testids') 395 ag = parser.add_argument_group( 396 'action', 'select action to perform on selected test cases') 397 398 sg.add_argument( 399 '-D', '--directory', nargs='+', metavar='DIR', 400 help='Collect tests from the specified directory(ies) ' + 401 '(default [tc-tests])') 402 sg.add_argument( 403 '-f', '--file', nargs='+', metavar='FILE', 404 help='Run tests from the specified file(s)') 405 sg.add_argument( 406 '-c', '--category', nargs='*', metavar='CATG', default=['+c'], 407 help='Run tests only from the specified category/ies, ' + 408 'or if no category/ies is/are specified, list known categories.') 409 sg.add_argument( 410 '-e', '--execute', nargs='+', metavar='ID', 411 help='Execute the specified test cases with specified IDs') 412 ag.add_argument( 413 '-l', '--list', action='store_true', 414 help='List all test cases, or those only within the specified category') 415 ag.add_argument( 416 '-s', '--show', action='store_true', dest='showID', 417 help='Display the selected test cases') 418 ag.add_argument( 419 '-i', '--id', action='store_true', dest='gen_id', 420 help='Generate ID numbers for new test cases') 421 parser.add_argument( 422 '-v', '--verbose', action='count', default=0, 423 help='Show the commands that are being run') 424 parser.add_argument( 425 '--format', default='tap', const='tap', nargs='?', 426 choices=['none', 'xunit', 'tap'], 427 help='Specify the format for test results. (Default: TAP)') 428 parser.add_argument('-d', '--device', 429 help='Execute the test case in flower category') 430 parser.add_argument( 431 '-P', '--pause', action='store_true', 432 help='Pause execution just before post-suite stage') 433 return parser 434 435 436def check_default_settings(args, remaining, pm): 437 """ 438 Process any arguments overriding the default settings, 439 and ensure the settings are correct. 440 """ 441 # Allow for overriding specific settings 442 global NAMES 443 444 if args.path != None: 445 NAMES['TC'] = args.path 446 if args.device != None: 447 NAMES['DEV2'] = args.device 448 if 'TIMEOUT' not in NAMES: 449 NAMES['TIMEOUT'] = None 450 if not os.path.isfile(NAMES['TC']): 451 print("The specified tc path " + NAMES['TC'] + " does not exist.") 452 exit(1) 453 454 pm.call_check_args(args, remaining) 455 456 457def get_id_list(alltests): 458 """ 459 Generate a list of all IDs in the test cases. 460 """ 461 return [x["id"] for x in alltests] 462 463 464def check_case_id(alltests): 465 """ 466 Check for duplicate test case IDs. 467 """ 468 idl = get_id_list(alltests) 469 return [x for x in idl if idl.count(x) > 1] 470 471 472def does_id_exist(alltests, newid): 473 """ 474 Check if a given ID already exists in the list of test cases. 475 """ 476 idl = get_id_list(alltests) 477 return (any(newid == x for x in idl)) 478 479 480def generate_case_ids(alltests): 481 """ 482 If a test case has a blank ID field, generate a random hex ID for it 483 and then write the test cases back to disk. 484 """ 485 import random 486 for c in alltests: 487 if (c["id"] == ""): 488 while True: 489 newid = str('{:04x}'.format(random.randrange(16**4))) 490 if (does_id_exist(alltests, newid)): 491 continue 492 else: 493 c['id'] = newid 494 break 495 496 ufilename = [] 497 for c in alltests: 498 if ('filename' in c): 499 ufilename.append(c['filename']) 500 ufilename = get_unique_item(ufilename) 501 for f in ufilename: 502 testlist = [] 503 for t in alltests: 504 if 'filename' in t: 505 if t['filename'] == f: 506 del t['filename'] 507 testlist.append(t) 508 outfile = open(f, "w") 509 json.dump(testlist, outfile, indent=4) 510 outfile.write("\n") 511 outfile.close() 512 513def filter_tests_by_id(args, testlist): 514 ''' 515 Remove tests from testlist that are not in the named id list. 516 If id list is empty, return empty list. 517 ''' 518 newlist = list() 519 if testlist and args.execute: 520 target_ids = args.execute 521 522 if isinstance(target_ids, list) and (len(target_ids) > 0): 523 newlist = list(filter(lambda x: x['id'] in target_ids, testlist)) 524 return newlist 525 526def filter_tests_by_category(args, testlist): 527 ''' 528 Remove tests from testlist that are not in a named category. 529 ''' 530 answer = list() 531 if args.category and testlist: 532 test_ids = list() 533 for catg in set(args.category): 534 if catg == '+c': 535 continue 536 print('considering category {}'.format(catg)) 537 for tc in testlist: 538 if catg in tc['category'] and tc['id'] not in test_ids: 539 answer.append(tc) 540 test_ids.append(tc['id']) 541 542 return answer 543 544def get_test_cases(args): 545 """ 546 If a test case file is specified, retrieve tests from that file. 547 Otherwise, glob for all json files in subdirectories and load from 548 each one. 549 Also, if requested, filter by category, and add tests matching 550 certain ids. 551 """ 552 import fnmatch 553 554 flist = [] 555 testdirs = ['tc-tests'] 556 557 if args.file: 558 # at least one file was specified - remove the default directory 559 testdirs = [] 560 561 for ff in args.file: 562 if not os.path.isfile(ff): 563 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.") 564 else: 565 flist.append(os.path.abspath(ff)) 566 567 if args.directory: 568 testdirs = args.directory 569 570 for testdir in testdirs: 571 for root, dirnames, filenames in os.walk(testdir): 572 for filename in fnmatch.filter(filenames, '*.json'): 573 candidate = os.path.abspath(os.path.join(root, filename)) 574 if candidate not in testdirs: 575 flist.append(candidate) 576 577 alltestcases = list() 578 for casefile in flist: 579 alltestcases = alltestcases + (load_from_file(casefile)) 580 581 allcatlist = get_test_categories(alltestcases) 582 allidlist = get_id_list(alltestcases) 583 584 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist) 585 idtestcases = filter_tests_by_id(args, alltestcases) 586 cattestcases = filter_tests_by_category(args, alltestcases) 587 588 cat_ids = [x['id'] for x in cattestcases] 589 if args.execute: 590 if args.category: 591 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids] 592 else: 593 alltestcases = idtestcases 594 else: 595 if cat_ids: 596 alltestcases = cattestcases 597 else: 598 # just accept the existing value of alltestcases, 599 # which has been filtered by file/directory 600 pass 601 602 return allcatlist, allidlist, testcases_by_cats, alltestcases 603 604 605def set_operation_mode(pm, args): 606 """ 607 Load the test case data and process remaining arguments to determine 608 what the script should do for this run, and call the appropriate 609 function. 610 """ 611 ucat, idlist, testcases, alltests = get_test_cases(args) 612 613 if args.gen_id: 614 if (has_blank_ids(idlist)): 615 alltests = generate_case_ids(alltests) 616 else: 617 print("No empty ID fields found in test files.") 618 exit(0) 619 620 duplicate_ids = check_case_id(alltests) 621 if (len(duplicate_ids) > 0): 622 print("The following test case IDs are not unique:") 623 print(str(set(duplicate_ids))) 624 print("Please correct them before continuing.") 625 exit(1) 626 627 if args.showID: 628 for atest in alltests: 629 print_test_case(atest) 630 exit(0) 631 632 if isinstance(args.category, list) and (len(args.category) == 0): 633 print("Available categories:") 634 print_sll(ucat) 635 exit(0) 636 637 if args.list: 638 if args.list: 639 list_test_cases(alltests) 640 exit(0) 641 642 if len(alltests): 643 catresults = test_runner(pm, args, alltests) 644 if args.format == 'none': 645 print('Test results output suppression requested\n') 646 else: 647 print('\nAll test results: \n') 648 if args.format == 'xunit': 649 suffix = 'xml' 650 res = catresults.format_xunit() 651 elif args.format == 'tap': 652 suffix = 'tap' 653 res = catresults.format_tap() 654 print(res) 655 print('\n\n') 656 if not args.outfile: 657 fname = 'test-results.{}'.format(suffix) 658 else: 659 fname = args.outfile 660 with open(fname, 'w') as fh: 661 fh.write(res) 662 fh.close() 663 if os.getenv('SUDO_UID') is not None: 664 os.chown(fname, uid=int(os.getenv('SUDO_UID')), 665 gid=int(os.getenv('SUDO_GID'))) 666 else: 667 print('No tests found\n') 668 669def main(): 670 """ 671 Start of execution; set up argument parser and get the arguments, 672 and start operations. 673 """ 674 parser = args_parse() 675 parser = set_args(parser) 676 pm = PluginMgr(parser) 677 parser = pm.call_add_args(parser) 678 (args, remaining) = parser.parse_known_args() 679 args.NAMES = NAMES 680 check_default_settings(args, remaining, pm) 681 if args.verbose > 2: 682 print('args is {}'.format(args)) 683 684 set_operation_mode(pm, args) 685 686 exit(0) 687 688 689if __name__ == "__main__": 690 main() 691