1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4""" 5tdc.py - Linux tc (Traffic Control) unit test driver 6 7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 8""" 9 10import re 11import os 12import sys 13import argparse 14import importlib 15import json 16import subprocess 17import time 18from collections import OrderedDict 19from string import Template 20 21from tdc_config import * 22from tdc_helper import * 23 24import TdcPlugin 25 26class PluginMgr: 27 def __init__(self, argparser): 28 super().__init__() 29 self.plugins = {} 30 self.plugin_instances = [] 31 self.args = [] 32 self.argparser = argparser 33 34 # TODO, put plugins in order 35 plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins') 36 for dirpath, dirnames, filenames in os.walk(plugindir): 37 for fn in filenames: 38 if (fn.endswith('.py') and 39 not fn == '__init__.py' and 40 not fn.startswith('#') and 41 not fn.startswith('.#')): 42 mn = fn[0:-3] 43 foo = importlib.import_module('plugins.' + mn) 44 self.plugins[mn] = foo 45 self.plugin_instances.append(foo.SubPlugin()) 46 47 def call_pre_suite(self, testcount, testidlist): 48 for pgn_inst in self.plugin_instances: 49 pgn_inst.pre_suite(testcount, testidlist) 50 51 def call_post_suite(self, index): 52 for pgn_inst in reversed(self.plugin_instances): 53 pgn_inst.post_suite(index) 54 55 def call_pre_case(self, test_ordinal, testid): 56 for pgn_inst in self.plugin_instances: 57 try: 58 pgn_inst.pre_case(test_ordinal, testid) 59 except Exception as ee: 60 print('exception {} in call to pre_case for {} plugin'. 61 format(ee, pgn_inst.__class__)) 62 print('test_ordinal is {}'.format(test_ordinal)) 63 print('testid is {}'.format(testid)) 64 raise 65 66 def call_post_case(self): 67 for pgn_inst in reversed(self.plugin_instances): 68 pgn_inst.post_case() 69 70 def call_pre_execute(self): 71 for pgn_inst in self.plugin_instances: 72 pgn_inst.pre_execute() 73 74 def call_post_execute(self): 75 for pgn_inst in reversed(self.plugin_instances): 76 pgn_inst.post_execute() 77 78 def call_add_args(self, parser): 79 for pgn_inst in self.plugin_instances: 80 parser = pgn_inst.add_args(parser) 81 return parser 82 83 def call_check_args(self, args, remaining): 84 for pgn_inst in self.plugin_instances: 85 pgn_inst.check_args(args, remaining) 86 87 def call_adjust_command(self, stage, command): 88 for pgn_inst in self.plugin_instances: 89 command = pgn_inst.adjust_command(stage, command) 90 return command 91 92 @staticmethod 93 def _make_argparser(args): 94 self.argparser = argparse.ArgumentParser( 95 description='Linux TC unit tests') 96 97 98def replace_keywords(cmd): 99 """ 100 For a given executable command, substitute any known 101 variables contained within NAMES with the correct values 102 """ 103 tcmd = Template(cmd) 104 subcmd = tcmd.safe_substitute(NAMES) 105 return subcmd 106 107 108def exec_cmd(args, pm, stage, command): 109 """ 110 Perform any required modifications on an executable command, then run 111 it in a subprocess and return the results. 112 """ 113 if len(command.strip()) == 0: 114 return None, None 115 if '$' in command: 116 command = replace_keywords(command) 117 118 command = pm.call_adjust_command(stage, command) 119 if args.verbose > 0: 120 print('command "{}"'.format(command)) 121 proc = subprocess.Popen(command, 122 shell=True, 123 stdout=subprocess.PIPE, 124 stderr=subprocess.PIPE, 125 env=ENVIR) 126 (rawout, serr) = proc.communicate() 127 128 if proc.returncode != 0 and len(serr) > 0: 129 foutput = serr.decode("utf-8") 130 else: 131 foutput = rawout.decode("utf-8") 132 133 proc.stdout.close() 134 proc.stderr.close() 135 return proc, foutput 136 137 138def prepare_env(args, pm, stage, prefix, cmdlist): 139 """ 140 Execute the setup/teardown commands for a test case. 141 Optionally terminate test execution if the command fails. 142 """ 143 if args.verbose > 0: 144 print('{}'.format(prefix)) 145 for cmdinfo in cmdlist: 146 if isinstance(cmdinfo, list): 147 exit_codes = cmdinfo[1:] 148 cmd = cmdinfo[0] 149 else: 150 exit_codes = [0] 151 cmd = cmdinfo 152 153 if not cmd: 154 continue 155 156 (proc, foutput) = exec_cmd(args, pm, stage, cmd) 157 158 if proc and (proc.returncode not in exit_codes): 159 print('', file=sys.stderr) 160 print("{} *** Could not execute: \"{}\"".format(prefix, cmd), 161 file=sys.stderr) 162 print("\n{} *** Error message: \"{}\"".format(prefix, foutput), 163 file=sys.stderr) 164 print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr) 165 print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr) 166 print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr) 167 raise Exception('"{}" did not complete successfully'.format(prefix)) 168 169def run_one_test(pm, args, index, tidx): 170 result = True 171 tresult = "" 172 tap = "" 173 if args.verbose > 0: 174 print("\t====================\n=====> ", end="") 175 print("Test " + tidx["id"] + ": " + tidx["name"]) 176 177 pm.call_pre_case(index, tidx['id']) 178 prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"]) 179 180 if (args.verbose > 0): 181 print('-----> execute stage') 182 pm.call_pre_execute() 183 (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"]) 184 exit_code = p.returncode 185 pm.call_post_execute() 186 187 if (exit_code != int(tidx["expExitCode"])): 188 result = False 189 print("exit:", exit_code, int(tidx["expExitCode"])) 190 print(procout) 191 else: 192 if args.verbose > 0: 193 print('-----> verify stage') 194 match_pattern = re.compile( 195 str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE) 196 (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"]) 197 match_index = re.findall(match_pattern, procout) 198 if len(match_index) != int(tidx["matchCount"]): 199 result = False 200 201 if not result: 202 tresult += 'not ' 203 tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name']) 204 tap += tresult 205 206 if result == False: 207 tap += procout 208 209 prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown']) 210 pm.call_post_case() 211 212 index += 1 213 214 return tap 215 216def test_runner(pm, args, filtered_tests): 217 """ 218 Driver function for the unit tests. 219 220 Prints information about the tests being run, executes the setup and 221 teardown commands and the command under test itself. Also determines 222 success/failure based on the information in the test case and generates 223 TAP output accordingly. 224 """ 225 testlist = filtered_tests 226 tcount = len(testlist) 227 index = 1 228 tap = str(index) + ".." + str(tcount) + "\n" 229 badtest = None 230 231 pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist]) 232 233 if args.verbose > 1: 234 print('Run tests here') 235 for tidx in testlist: 236 if "flower" in tidx["category"] and args.device == None: 237 continue 238 try: 239 badtest = tidx # in case it goes bad 240 tap += run_one_test(pm, args, index, tidx) 241 except Exception as ee: 242 print('Exception {} (caught in test_runner, running test {} {} {})'. 243 format(ee, index, tidx['id'], tidx['name'])) 244 break 245 index += 1 246 247 # if we failed in setup or teardown, 248 # fill in the remaining tests with not ok 249 count = index 250 tap += 'about to flush the tap output if tests need to be skipped\n' 251 if tcount + 1 != index: 252 for tidx in testlist[index - 1:]: 253 msg = 'skipped - previous setup or teardown failed' 254 tap += 'ok {} - {} # {} {} {}\n'.format( 255 count, tidx['id'], msg, index, badtest.get('id', '--Unknown--')) 256 count += 1 257 258 tap += 'done flushing skipped test tap output\n' 259 pm.call_post_suite(index) 260 261 return tap 262 263def has_blank_ids(idlist): 264 """ 265 Search the list for empty ID fields and return true/false accordingly. 266 """ 267 return not(all(k for k in idlist)) 268 269 270def load_from_file(filename): 271 """ 272 Open the JSON file containing the test cases and return them 273 as list of ordered dictionary objects. 274 """ 275 try: 276 with open(filename) as test_data: 277 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 278 except json.JSONDecodeError as jde: 279 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde)) 280 testlist = list() 281 else: 282 idlist = get_id_list(testlist) 283 if (has_blank_ids(idlist)): 284 for k in testlist: 285 k['filename'] = filename 286 return testlist 287 288 289def args_parse(): 290 """ 291 Create the argument parser. 292 """ 293 parser = argparse.ArgumentParser(description='Linux TC unit tests') 294 return parser 295 296 297def set_args(parser): 298 """ 299 Set the command line arguments for tdc. 300 """ 301 parser.add_argument( 302 '-p', '--path', type=str, 303 help='The full path to the tc executable to use') 304 sg = parser.add_argument_group( 305 'selection', 'select which test cases: ' + 306 'files plus directories; filtered by categories plus testids') 307 ag = parser.add_argument_group( 308 'action', 'select action to perform on selected test cases') 309 310 sg.add_argument( 311 '-D', '--directory', nargs='+', metavar='DIR', 312 help='Collect tests from the specified directory(ies) ' + 313 '(default [tc-tests])') 314 sg.add_argument( 315 '-f', '--file', nargs='+', metavar='FILE', 316 help='Run tests from the specified file(s)') 317 sg.add_argument( 318 '-c', '--category', nargs='*', metavar='CATG', default=['+c'], 319 help='Run tests only from the specified category/ies, ' + 320 'or if no category/ies is/are specified, list known categories.') 321 sg.add_argument( 322 '-e', '--execute', nargs='+', metavar='ID', 323 help='Execute the specified test cases with specified IDs') 324 ag.add_argument( 325 '-l', '--list', action='store_true', 326 help='List all test cases, or those only within the specified category') 327 ag.add_argument( 328 '-s', '--show', action='store_true', dest='showID', 329 help='Display the selected test cases') 330 ag.add_argument( 331 '-i', '--id', action='store_true', dest='gen_id', 332 help='Generate ID numbers for new test cases') 333 parser.add_argument( 334 '-v', '--verbose', action='count', default=0, 335 help='Show the commands that are being run') 336 parser.add_argument('-d', '--device', 337 help='Execute the test case in flower category') 338 return parser 339 340 341def check_default_settings(args, remaining, pm): 342 """ 343 Process any arguments overriding the default settings, 344 and ensure the settings are correct. 345 """ 346 # Allow for overriding specific settings 347 global NAMES 348 349 if args.path != None: 350 NAMES['TC'] = args.path 351 if args.device != None: 352 NAMES['DEV2'] = args.device 353 if not os.path.isfile(NAMES['TC']): 354 print("The specified tc path " + NAMES['TC'] + " does not exist.") 355 exit(1) 356 357 pm.call_check_args(args, remaining) 358 359 360def get_id_list(alltests): 361 """ 362 Generate a list of all IDs in the test cases. 363 """ 364 return [x["id"] for x in alltests] 365 366 367def check_case_id(alltests): 368 """ 369 Check for duplicate test case IDs. 370 """ 371 idl = get_id_list(alltests) 372 return [x for x in idl if idl.count(x) > 1] 373 374 375def does_id_exist(alltests, newid): 376 """ 377 Check if a given ID already exists in the list of test cases. 378 """ 379 idl = get_id_list(alltests) 380 return (any(newid == x for x in idl)) 381 382 383def generate_case_ids(alltests): 384 """ 385 If a test case has a blank ID field, generate a random hex ID for it 386 and then write the test cases back to disk. 387 """ 388 import random 389 for c in alltests: 390 if (c["id"] == ""): 391 while True: 392 newid = str('%04x' % random.randrange(16**4)) 393 if (does_id_exist(alltests, newid)): 394 continue 395 else: 396 c['id'] = newid 397 break 398 399 ufilename = [] 400 for c in alltests: 401 if ('filename' in c): 402 ufilename.append(c['filename']) 403 ufilename = get_unique_item(ufilename) 404 for f in ufilename: 405 testlist = [] 406 for t in alltests: 407 if 'filename' in t: 408 if t['filename'] == f: 409 del t['filename'] 410 testlist.append(t) 411 outfile = open(f, "w") 412 json.dump(testlist, outfile, indent=4) 413 outfile.close() 414 415def filter_tests_by_id(args, testlist): 416 ''' 417 Remove tests from testlist that are not in the named id list. 418 If id list is empty, return empty list. 419 ''' 420 newlist = list() 421 if testlist and args.execute: 422 target_ids = args.execute 423 424 if isinstance(target_ids, list) and (len(target_ids) > 0): 425 newlist = list(filter(lambda x: x['id'] in target_ids, testlist)) 426 return newlist 427 428def filter_tests_by_category(args, testlist): 429 ''' 430 Remove tests from testlist that are not in a named category. 431 ''' 432 answer = list() 433 if args.category and testlist: 434 test_ids = list() 435 for catg in set(args.category): 436 if catg == '+c': 437 continue 438 print('considering category {}'.format(catg)) 439 for tc in testlist: 440 if catg in tc['category'] and tc['id'] not in test_ids: 441 answer.append(tc) 442 test_ids.append(tc['id']) 443 444 return answer 445 446def get_test_cases(args): 447 """ 448 If a test case file is specified, retrieve tests from that file. 449 Otherwise, glob for all json files in subdirectories and load from 450 each one. 451 Also, if requested, filter by category, and add tests matching 452 certain ids. 453 """ 454 import fnmatch 455 456 flist = [] 457 testdirs = ['tc-tests'] 458 459 if args.file: 460 # at least one file was specified - remove the default directory 461 testdirs = [] 462 463 for ff in args.file: 464 if not os.path.isfile(ff): 465 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.") 466 else: 467 flist.append(os.path.abspath(ff)) 468 469 if args.directory: 470 testdirs = args.directory 471 472 for testdir in testdirs: 473 for root, dirnames, filenames in os.walk(testdir): 474 for filename in fnmatch.filter(filenames, '*.json'): 475 candidate = os.path.abspath(os.path.join(root, filename)) 476 if candidate not in testdirs: 477 flist.append(candidate) 478 479 alltestcases = list() 480 for casefile in flist: 481 alltestcases = alltestcases + (load_from_file(casefile)) 482 483 allcatlist = get_test_categories(alltestcases) 484 allidlist = get_id_list(alltestcases) 485 486 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist) 487 idtestcases = filter_tests_by_id(args, alltestcases) 488 cattestcases = filter_tests_by_category(args, alltestcases) 489 490 cat_ids = [x['id'] for x in cattestcases] 491 if args.execute: 492 if args.category: 493 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids] 494 else: 495 alltestcases = idtestcases 496 else: 497 if cat_ids: 498 alltestcases = cattestcases 499 else: 500 # just accept the existing value of alltestcases, 501 # which has been filtered by file/directory 502 pass 503 504 return allcatlist, allidlist, testcases_by_cats, alltestcases 505 506 507def set_operation_mode(pm, args): 508 """ 509 Load the test case data and process remaining arguments to determine 510 what the script should do for this run, and call the appropriate 511 function. 512 """ 513 ucat, idlist, testcases, alltests = get_test_cases(args) 514 515 if args.gen_id: 516 if (has_blank_ids(idlist)): 517 alltests = generate_case_ids(alltests) 518 else: 519 print("No empty ID fields found in test files.") 520 exit(0) 521 522 duplicate_ids = check_case_id(alltests) 523 if (len(duplicate_ids) > 0): 524 print("The following test case IDs are not unique:") 525 print(str(set(duplicate_ids))) 526 print("Please correct them before continuing.") 527 exit(1) 528 529 if args.showID: 530 for atest in alltests: 531 print_test_case(atest) 532 exit(0) 533 534 if isinstance(args.category, list) and (len(args.category) == 0): 535 print("Available categories:") 536 print_sll(ucat) 537 exit(0) 538 539 if args.list: 540 if args.list: 541 list_test_cases(alltests) 542 exit(0) 543 544 if len(alltests): 545 catresults = test_runner(pm, args, alltests) 546 else: 547 catresults = 'No tests found\n' 548 print('All test results: \n\n{}'.format(catresults)) 549 550def main(): 551 """ 552 Start of execution; set up argument parser and get the arguments, 553 and start operations. 554 """ 555 parser = args_parse() 556 parser = set_args(parser) 557 pm = PluginMgr(parser) 558 parser = pm.call_add_args(parser) 559 (args, remaining) = parser.parse_known_args() 560 args.NAMES = NAMES 561 check_default_settings(args, remaining, pm) 562 if args.verbose > 2: 563 print('args is {}'.format(args)) 564 565 set_operation_mode(pm, args) 566 567 exit(0) 568 569 570if __name__ == "__main__": 571 main() 572