1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4""" 5tdc.py - Linux tc (Traffic Control) unit test driver 6 7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 8""" 9 10import re 11import os 12import sys 13import argparse 14import importlib 15import json 16import subprocess 17import time 18import traceback 19from collections import OrderedDict 20from string import Template 21 22from tdc_config import * 23from tdc_helper import * 24 25import TdcPlugin 26 27 28class PluginMgrTestFail(Exception): 29 def __init__(self, stage, output, message): 30 self.stage = stage 31 self.output = output 32 self.message = message 33 34class PluginMgr: 35 def __init__(self, argparser): 36 super().__init__() 37 self.plugins = {} 38 self.plugin_instances = [] 39 self.args = [] 40 self.argparser = argparser 41 42 # TODO, put plugins in order 43 plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins') 44 for dirpath, dirnames, filenames in os.walk(plugindir): 45 for fn in filenames: 46 if (fn.endswith('.py') and 47 not fn == '__init__.py' and 48 not fn.startswith('#') and 49 not fn.startswith('.#')): 50 mn = fn[0:-3] 51 foo = importlib.import_module('plugins.' + mn) 52 self.plugins[mn] = foo 53 self.plugin_instances.append(foo.SubPlugin()) 54 55 def call_pre_suite(self, testcount, testidlist): 56 for pgn_inst in self.plugin_instances: 57 pgn_inst.pre_suite(testcount, testidlist) 58 59 def call_post_suite(self, index): 60 for pgn_inst in reversed(self.plugin_instances): 61 pgn_inst.post_suite(index) 62 63 def call_pre_case(self, test_ordinal, testid): 64 for pgn_inst in self.plugin_instances: 65 try: 66 pgn_inst.pre_case(test_ordinal, testid) 67 except Exception as ee: 68 print('exception {} in call to pre_case for {} plugin'. 69 format(ee, pgn_inst.__class__)) 70 print('test_ordinal is {}'.format(test_ordinal)) 71 print('testid is {}'.format(testid)) 72 raise 73 74 def call_post_case(self): 75 for pgn_inst in reversed(self.plugin_instances): 76 pgn_inst.post_case() 77 78 def call_pre_execute(self): 79 for pgn_inst in self.plugin_instances: 80 pgn_inst.pre_execute() 81 82 def call_post_execute(self): 83 for pgn_inst in reversed(self.plugin_instances): 84 pgn_inst.post_execute() 85 86 def call_add_args(self, parser): 87 for pgn_inst in self.plugin_instances: 88 parser = pgn_inst.add_args(parser) 89 return parser 90 91 def call_check_args(self, args, remaining): 92 for pgn_inst in self.plugin_instances: 93 pgn_inst.check_args(args, remaining) 94 95 def call_adjust_command(self, stage, command): 96 for pgn_inst in self.plugin_instances: 97 command = pgn_inst.adjust_command(stage, command) 98 return command 99 100 @staticmethod 101 def _make_argparser(args): 102 self.argparser = argparse.ArgumentParser( 103 description='Linux TC unit tests') 104 105 106def replace_keywords(cmd): 107 """ 108 For a given executable command, substitute any known 109 variables contained within NAMES with the correct values 110 """ 111 tcmd = Template(cmd) 112 subcmd = tcmd.safe_substitute(NAMES) 113 return subcmd 114 115 116def exec_cmd(args, pm, stage, command): 117 """ 118 Perform any required modifications on an executable command, then run 119 it in a subprocess and return the results. 120 """ 121 if len(command.strip()) == 0: 122 return None, None 123 if '$' in command: 124 command = replace_keywords(command) 125 126 command = pm.call_adjust_command(stage, command) 127 if args.verbose > 0: 128 print('command "{}"'.format(command)) 129 proc = subprocess.Popen(command, 130 shell=True, 131 stdout=subprocess.PIPE, 132 stderr=subprocess.PIPE, 133 env=ENVIR) 134 (rawout, serr) = proc.communicate() 135 136 if proc.returncode != 0 and len(serr) > 0: 137 foutput = serr.decode("utf-8") 138 else: 139 foutput = rawout.decode("utf-8") 140 141 proc.stdout.close() 142 proc.stderr.close() 143 return proc, foutput 144 145 146def prepare_env(args, pm, stage, prefix, cmdlist, output = None): 147 """ 148 Execute the setup/teardown commands for a test case. 149 Optionally terminate test execution if the command fails. 150 """ 151 if args.verbose > 0: 152 print('{}'.format(prefix)) 153 for cmdinfo in cmdlist: 154 if isinstance(cmdinfo, list): 155 exit_codes = cmdinfo[1:] 156 cmd = cmdinfo[0] 157 else: 158 exit_codes = [0] 159 cmd = cmdinfo 160 161 if not cmd: 162 continue 163 164 (proc, foutput) = exec_cmd(args, pm, stage, cmd) 165 166 if proc and (proc.returncode not in exit_codes): 167 print('', file=sys.stderr) 168 print("{} *** Could not execute: \"{}\"".format(prefix, cmd), 169 file=sys.stderr) 170 print("\n{} *** Error message: \"{}\"".format(prefix, foutput), 171 file=sys.stderr) 172 print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr) 173 print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr) 174 print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr) 175 raise PluginMgrTestFail( 176 stage, output, 177 '"{}" did not complete successfully'.format(prefix)) 178 179def run_one_test(pm, args, index, tidx): 180 result = True 181 tresult = "" 182 tap = "" 183 if args.verbose > 0: 184 print("\t====================\n=====> ", end="") 185 print("Test " + tidx["id"] + ": " + tidx["name"]) 186 187 pm.call_pre_case(index, tidx['id']) 188 prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"]) 189 190 if (args.verbose > 0): 191 print('-----> execute stage') 192 pm.call_pre_execute() 193 (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"]) 194 exit_code = p.returncode 195 pm.call_post_execute() 196 197 if (exit_code != int(tidx["expExitCode"])): 198 result = False 199 print("exit:", exit_code, int(tidx["expExitCode"])) 200 print(procout) 201 else: 202 if args.verbose > 0: 203 print('-----> verify stage') 204 match_pattern = re.compile( 205 str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE) 206 (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"]) 207 if procout: 208 match_index = re.findall(match_pattern, procout) 209 if len(match_index) != int(tidx["matchCount"]): 210 result = False 211 elif int(tidx["matchCount"]) != 0: 212 result = False 213 214 if not result: 215 tresult += 'not ' 216 tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name']) 217 tap += tresult 218 219 if result == False: 220 if procout: 221 tap += procout 222 else: 223 tap += 'No output!\n' 224 225 prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout) 226 pm.call_post_case() 227 228 index += 1 229 230 return tap 231 232def test_runner(pm, args, filtered_tests): 233 """ 234 Driver function for the unit tests. 235 236 Prints information about the tests being run, executes the setup and 237 teardown commands and the command under test itself. Also determines 238 success/failure based on the information in the test case and generates 239 TAP output accordingly. 240 """ 241 testlist = filtered_tests 242 tcount = len(testlist) 243 index = 1 244 tap = str(index) + ".." + str(tcount) + "\n" 245 badtest = None 246 stage = None 247 emergency_exit = False 248 emergency_exit_message = '' 249 250 try: 251 pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist]) 252 except Exception as ee: 253 ex_type, ex, ex_tb = sys.exc_info() 254 print('Exception {} {} (caught in pre_suite).'. 255 format(ex_type, ex)) 256 # when the extra print statements are uncommented, 257 # the traceback does not appear between them 258 # (it appears way earlier in the tdc.py output) 259 # so don't bother ... 260 # print('--------------------(') 261 # print('traceback') 262 traceback.print_tb(ex_tb) 263 # print('--------------------)') 264 emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex) 265 emergency_exit = True 266 stage = 'pre-SUITE' 267 268 if emergency_exit: 269 pm.call_post_suite(index) 270 return emergency_exit_message 271 if args.verbose > 1: 272 print('give test rig 2 seconds to stabilize') 273 time.sleep(2) 274 for tidx in testlist: 275 if "flower" in tidx["category"] and args.device == None: 276 if args.verbose > 1: 277 print('Not executing test {} {} because DEV2 not defined'. 278 format(tidx['id'], tidx['name'])) 279 continue 280 try: 281 badtest = tidx # in case it goes bad 282 tap += run_one_test(pm, args, index, tidx) 283 except PluginMgrTestFail as pmtf: 284 ex_type, ex, ex_tb = sys.exc_info() 285 stage = pmtf.stage 286 message = pmtf.message 287 output = pmtf.output 288 print(message) 289 print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'. 290 format(ex_type, ex, index, tidx['id'], tidx['name'], stage)) 291 print('---------------') 292 print('traceback') 293 traceback.print_tb(ex_tb) 294 print('---------------') 295 if stage == 'teardown': 296 print('accumulated output for this test:') 297 if pmtf.output: 298 print(pmtf.output) 299 print('---------------') 300 break 301 index += 1 302 303 # if we failed in setup or teardown, 304 # fill in the remaining tests with ok-skipped 305 count = index 306 tap += 'about to flush the tap output if tests need to be skipped\n' 307 if tcount + 1 != index: 308 for tidx in testlist[index - 1:]: 309 msg = 'skipped - previous {} failed'.format(stage) 310 tap += 'ok {} - {} # {} {} {}\n'.format( 311 count, tidx['id'], msg, index, badtest.get('id', '--Unknown--')) 312 count += 1 313 314 tap += 'done flushing skipped test tap output\n' 315 pm.call_post_suite(index) 316 317 return tap 318 319def has_blank_ids(idlist): 320 """ 321 Search the list for empty ID fields and return true/false accordingly. 322 """ 323 return not(all(k for k in idlist)) 324 325 326def load_from_file(filename): 327 """ 328 Open the JSON file containing the test cases and return them 329 as list of ordered dictionary objects. 330 """ 331 try: 332 with open(filename) as test_data: 333 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 334 except json.JSONDecodeError as jde: 335 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde)) 336 testlist = list() 337 else: 338 idlist = get_id_list(testlist) 339 if (has_blank_ids(idlist)): 340 for k in testlist: 341 k['filename'] = filename 342 return testlist 343 344 345def args_parse(): 346 """ 347 Create the argument parser. 348 """ 349 parser = argparse.ArgumentParser(description='Linux TC unit tests') 350 return parser 351 352 353def set_args(parser): 354 """ 355 Set the command line arguments for tdc. 356 """ 357 parser.add_argument( 358 '-p', '--path', type=str, 359 help='The full path to the tc executable to use') 360 sg = parser.add_argument_group( 361 'selection', 'select which test cases: ' + 362 'files plus directories; filtered by categories plus testids') 363 ag = parser.add_argument_group( 364 'action', 'select action to perform on selected test cases') 365 366 sg.add_argument( 367 '-D', '--directory', nargs='+', metavar='DIR', 368 help='Collect tests from the specified directory(ies) ' + 369 '(default [tc-tests])') 370 sg.add_argument( 371 '-f', '--file', nargs='+', metavar='FILE', 372 help='Run tests from the specified file(s)') 373 sg.add_argument( 374 '-c', '--category', nargs='*', metavar='CATG', default=['+c'], 375 help='Run tests only from the specified category/ies, ' + 376 'or if no category/ies is/are specified, list known categories.') 377 sg.add_argument( 378 '-e', '--execute', nargs='+', metavar='ID', 379 help='Execute the specified test cases with specified IDs') 380 ag.add_argument( 381 '-l', '--list', action='store_true', 382 help='List all test cases, or those only within the specified category') 383 ag.add_argument( 384 '-s', '--show', action='store_true', dest='showID', 385 help='Display the selected test cases') 386 ag.add_argument( 387 '-i', '--id', action='store_true', dest='gen_id', 388 help='Generate ID numbers for new test cases') 389 parser.add_argument( 390 '-v', '--verbose', action='count', default=0, 391 help='Show the commands that are being run') 392 parser.add_argument('-d', '--device', 393 help='Execute the test case in flower category') 394 return parser 395 396 397def check_default_settings(args, remaining, pm): 398 """ 399 Process any arguments overriding the default settings, 400 and ensure the settings are correct. 401 """ 402 # Allow for overriding specific settings 403 global NAMES 404 405 if args.path != None: 406 NAMES['TC'] = args.path 407 if args.device != None: 408 NAMES['DEV2'] = args.device 409 if not os.path.isfile(NAMES['TC']): 410 print("The specified tc path " + NAMES['TC'] + " does not exist.") 411 exit(1) 412 413 pm.call_check_args(args, remaining) 414 415 416def get_id_list(alltests): 417 """ 418 Generate a list of all IDs in the test cases. 419 """ 420 return [x["id"] for x in alltests] 421 422 423def check_case_id(alltests): 424 """ 425 Check for duplicate test case IDs. 426 """ 427 idl = get_id_list(alltests) 428 return [x for x in idl if idl.count(x) > 1] 429 430 431def does_id_exist(alltests, newid): 432 """ 433 Check if a given ID already exists in the list of test cases. 434 """ 435 idl = get_id_list(alltests) 436 return (any(newid == x for x in idl)) 437 438 439def generate_case_ids(alltests): 440 """ 441 If a test case has a blank ID field, generate a random hex ID for it 442 and then write the test cases back to disk. 443 """ 444 import random 445 for c in alltests: 446 if (c["id"] == ""): 447 while True: 448 newid = str('{:04x}'.format(random.randrange(16**4))) 449 if (does_id_exist(alltests, newid)): 450 continue 451 else: 452 c['id'] = newid 453 break 454 455 ufilename = [] 456 for c in alltests: 457 if ('filename' in c): 458 ufilename.append(c['filename']) 459 ufilename = get_unique_item(ufilename) 460 for f in ufilename: 461 testlist = [] 462 for t in alltests: 463 if 'filename' in t: 464 if t['filename'] == f: 465 del t['filename'] 466 testlist.append(t) 467 outfile = open(f, "w") 468 json.dump(testlist, outfile, indent=4) 469 outfile.close() 470 471def filter_tests_by_id(args, testlist): 472 ''' 473 Remove tests from testlist that are not in the named id list. 474 If id list is empty, return empty list. 475 ''' 476 newlist = list() 477 if testlist and args.execute: 478 target_ids = args.execute 479 480 if isinstance(target_ids, list) and (len(target_ids) > 0): 481 newlist = list(filter(lambda x: x['id'] in target_ids, testlist)) 482 return newlist 483 484def filter_tests_by_category(args, testlist): 485 ''' 486 Remove tests from testlist that are not in a named category. 487 ''' 488 answer = list() 489 if args.category and testlist: 490 test_ids = list() 491 for catg in set(args.category): 492 if catg == '+c': 493 continue 494 print('considering category {}'.format(catg)) 495 for tc in testlist: 496 if catg in tc['category'] and tc['id'] not in test_ids: 497 answer.append(tc) 498 test_ids.append(tc['id']) 499 500 return answer 501 502def get_test_cases(args): 503 """ 504 If a test case file is specified, retrieve tests from that file. 505 Otherwise, glob for all json files in subdirectories and load from 506 each one. 507 Also, if requested, filter by category, and add tests matching 508 certain ids. 509 """ 510 import fnmatch 511 512 flist = [] 513 testdirs = ['tc-tests'] 514 515 if args.file: 516 # at least one file was specified - remove the default directory 517 testdirs = [] 518 519 for ff in args.file: 520 if not os.path.isfile(ff): 521 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.") 522 else: 523 flist.append(os.path.abspath(ff)) 524 525 if args.directory: 526 testdirs = args.directory 527 528 for testdir in testdirs: 529 for root, dirnames, filenames in os.walk(testdir): 530 for filename in fnmatch.filter(filenames, '*.json'): 531 candidate = os.path.abspath(os.path.join(root, filename)) 532 if candidate not in testdirs: 533 flist.append(candidate) 534 535 alltestcases = list() 536 for casefile in flist: 537 alltestcases = alltestcases + (load_from_file(casefile)) 538 539 allcatlist = get_test_categories(alltestcases) 540 allidlist = get_id_list(alltestcases) 541 542 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist) 543 idtestcases = filter_tests_by_id(args, alltestcases) 544 cattestcases = filter_tests_by_category(args, alltestcases) 545 546 cat_ids = [x['id'] for x in cattestcases] 547 if args.execute: 548 if args.category: 549 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids] 550 else: 551 alltestcases = idtestcases 552 else: 553 if cat_ids: 554 alltestcases = cattestcases 555 else: 556 # just accept the existing value of alltestcases, 557 # which has been filtered by file/directory 558 pass 559 560 return allcatlist, allidlist, testcases_by_cats, alltestcases 561 562 563def set_operation_mode(pm, args): 564 """ 565 Load the test case data and process remaining arguments to determine 566 what the script should do for this run, and call the appropriate 567 function. 568 """ 569 ucat, idlist, testcases, alltests = get_test_cases(args) 570 571 if args.gen_id: 572 if (has_blank_ids(idlist)): 573 alltests = generate_case_ids(alltests) 574 else: 575 print("No empty ID fields found in test files.") 576 exit(0) 577 578 duplicate_ids = check_case_id(alltests) 579 if (len(duplicate_ids) > 0): 580 print("The following test case IDs are not unique:") 581 print(str(set(duplicate_ids))) 582 print("Please correct them before continuing.") 583 exit(1) 584 585 if args.showID: 586 for atest in alltests: 587 print_test_case(atest) 588 exit(0) 589 590 if isinstance(args.category, list) and (len(args.category) == 0): 591 print("Available categories:") 592 print_sll(ucat) 593 exit(0) 594 595 if args.list: 596 if args.list: 597 list_test_cases(alltests) 598 exit(0) 599 600 if len(alltests): 601 catresults = test_runner(pm, args, alltests) 602 else: 603 catresults = 'No tests found\n' 604 print('All test results: \n\n{}'.format(catresults)) 605 606def main(): 607 """ 608 Start of execution; set up argument parser and get the arguments, 609 and start operations. 610 """ 611 parser = args_parse() 612 parser = set_args(parser) 613 pm = PluginMgr(parser) 614 parser = pm.call_add_args(parser) 615 (args, remaining) = parser.parse_known_args() 616 args.NAMES = NAMES 617 check_default_settings(args, remaining, pm) 618 if args.verbose > 2: 619 print('args is {}'.format(args)) 620 621 set_operation_mode(pm, args) 622 623 exit(0) 624 625 626if __name__ == "__main__": 627 main() 628