1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4""" 5tdc.py - Linux tc (Traffic Control) unit test driver 6 7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 8""" 9 10import re 11import os 12import sys 13import argparse 14import importlib 15import json 16import subprocess 17import time 18from collections import OrderedDict 19from string import Template 20 21from tdc_config import * 22from tdc_helper import * 23 24import TdcPlugin 25 26USE_NS = True 27 28class PluginMgr: 29 def __init__(self, argparser): 30 super().__init__() 31 self.plugins = {} 32 self.plugin_instances = [] 33 self.args = [] 34 self.argparser = argparser 35 36 # TODO, put plugins in order 37 plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins') 38 for dirpath, dirnames, filenames in os.walk(plugindir): 39 for fn in filenames: 40 if (fn.endswith('.py') and 41 not fn == '__init__.py' and 42 not fn.startswith('#') and 43 not fn.startswith('.#')): 44 mn = fn[0:-3] 45 foo = importlib.import_module('plugins.' + mn) 46 self.plugins[mn] = foo 47 self.plugin_instances.append(foo.SubPlugin()) 48 49 def call_pre_suite(self, testcount, testidlist): 50 for pgn_inst in self.plugin_instances: 51 pgn_inst.pre_suite(testcount, testidlist) 52 53 def call_post_suite(self, index): 54 for pgn_inst in reversed(self.plugin_instances): 55 pgn_inst.post_suite(index) 56 57 def call_pre_case(self, test_ordinal, testid): 58 for pgn_inst in self.plugin_instances: 59 try: 60 pgn_inst.pre_case(test_ordinal, testid) 61 except Exception as ee: 62 print('exception {} in call to pre_case for {} plugin'. 63 format(ee, pgn_inst.__class__)) 64 print('test_ordinal is {}'.format(test_ordinal)) 65 print('testid is {}'.format(testid)) 66 raise 67 68 def call_post_case(self): 69 for pgn_inst in reversed(self.plugin_instances): 70 pgn_inst.post_case() 71 72 def call_pre_execute(self): 73 for pgn_inst in self.plugin_instances: 74 pgn_inst.pre_execute() 75 76 def call_post_execute(self): 77 for pgn_inst in reversed(self.plugin_instances): 78 pgn_inst.post_execute() 79 80 def call_add_args(self, parser): 81 for pgn_inst in self.plugin_instances: 82 parser = pgn_inst.add_args(parser) 83 return parser 84 85 def call_check_args(self, args, remaining): 86 for pgn_inst in self.plugin_instances: 87 pgn_inst.check_args(args, remaining) 88 89 def call_adjust_command(self, stage, command): 90 for pgn_inst in self.plugin_instances: 91 command = pgn_inst.adjust_command(stage, command) 92 return command 93 94 @staticmethod 95 def _make_argparser(args): 96 self.argparser = argparse.ArgumentParser( 97 description='Linux TC unit tests') 98 99 100def replace_keywords(cmd): 101 """ 102 For a given executable command, substitute any known 103 variables contained within NAMES with the correct values 104 """ 105 tcmd = Template(cmd) 106 subcmd = tcmd.safe_substitute(NAMES) 107 return subcmd 108 109 110def exec_cmd(args, pm, stage, command, nsonly=True): 111 """ 112 Perform any required modifications on an executable command, then run 113 it in a subprocess and return the results. 114 """ 115 if len(command.strip()) == 0: 116 return None, None 117 if (USE_NS and nsonly): 118 command = 'ip netns exec $NS ' + command 119 120 if '$' in command: 121 command = replace_keywords(command) 122 123 command = pm.call_adjust_command(stage, command) 124 if args.verbose > 0: 125 print('command "{}"'.format(command)) 126 proc = subprocess.Popen(command, 127 shell=True, 128 stdout=subprocess.PIPE, 129 stderr=subprocess.PIPE, 130 env=ENVIR) 131 (rawout, serr) = proc.communicate() 132 133 if proc.returncode != 0 and len(serr) > 0: 134 foutput = serr.decode("utf-8") 135 else: 136 foutput = rawout.decode("utf-8") 137 138 proc.stdout.close() 139 proc.stderr.close() 140 return proc, foutput 141 142 143def prepare_env(args, pm, stage, prefix, cmdlist): 144 """ 145 Execute the setup/teardown commands for a test case. 146 Optionally terminate test execution if the command fails. 147 """ 148 if args.verbose > 0: 149 print('{}'.format(prefix)) 150 for cmdinfo in cmdlist: 151 if isinstance(cmdinfo, list): 152 exit_codes = cmdinfo[1:] 153 cmd = cmdinfo[0] 154 else: 155 exit_codes = [0] 156 cmd = cmdinfo 157 158 if not cmd: 159 continue 160 161 (proc, foutput) = exec_cmd(args, pm, stage, cmd) 162 163 if proc and (proc.returncode not in exit_codes): 164 print('', file=sys.stderr) 165 print("{} *** Could not execute: \"{}\"".format(prefix, cmd), 166 file=sys.stderr) 167 print("\n{} *** Error message: \"{}\"".format(prefix, foutput), 168 file=sys.stderr) 169 print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr) 170 print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr) 171 print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr) 172 raise Exception('"{}" did not complete successfully'.format(prefix)) 173 174def run_one_test(pm, args, index, tidx): 175 result = True 176 tresult = "" 177 tap = "" 178 if args.verbose > 0: 179 print("\t====================\n=====> ", end="") 180 print("Test " + tidx["id"] + ": " + tidx["name"]) 181 182 pm.call_pre_case(index, tidx['id']) 183 prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"]) 184 185 if (args.verbose > 0): 186 print('-----> execute stage') 187 pm.call_pre_execute() 188 (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"]) 189 exit_code = p.returncode 190 pm.call_post_execute() 191 192 if (exit_code != int(tidx["expExitCode"])): 193 result = False 194 print("exit:", exit_code, int(tidx["expExitCode"])) 195 print(procout) 196 else: 197 if args.verbose > 0: 198 print('-----> verify stage') 199 match_pattern = re.compile( 200 str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE) 201 (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"]) 202 match_index = re.findall(match_pattern, procout) 203 if len(match_index) != int(tidx["matchCount"]): 204 result = False 205 206 if not result: 207 tresult += 'not ' 208 tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name']) 209 tap += tresult 210 211 if result == False: 212 tap += procout 213 214 prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown']) 215 pm.call_post_case() 216 217 index += 1 218 219 return tap 220 221def test_runner(pm, args, filtered_tests): 222 """ 223 Driver function for the unit tests. 224 225 Prints information about the tests being run, executes the setup and 226 teardown commands and the command under test itself. Also determines 227 success/failure based on the information in the test case and generates 228 TAP output accordingly. 229 """ 230 testlist = filtered_tests 231 tcount = len(testlist) 232 index = 1 233 tap = str(index) + ".." + str(tcount) + "\n" 234 badtest = None 235 236 pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist]) 237 238 if args.verbose > 1: 239 print('Run tests here') 240 for tidx in testlist: 241 if "flower" in tidx["category"] and args.device == None: 242 continue 243 try: 244 badtest = tidx # in case it goes bad 245 tap += run_one_test(pm, args, index, tidx) 246 except Exception as ee: 247 print('Exception {} (caught in test_runner, running test {} {} {})'. 248 format(ee, index, tidx['id'], tidx['name'])) 249 break 250 index += 1 251 252 # if we failed in setup or teardown, 253 # fill in the remaining tests with not ok 254 count = index 255 tap += 'about to flush the tap output if tests need to be skipped\n' 256 if tcount + 1 != index: 257 for tidx in testlist[index - 1:]: 258 msg = 'skipped - previous setup or teardown failed' 259 tap += 'ok {} - {} # {} {} {}\n'.format( 260 count, tidx['id'], msg, index, badtest.get('id', '--Unknown--')) 261 count += 1 262 263 tap += 'done flushing skipped test tap output\n' 264 pm.call_post_suite(index) 265 266 return tap 267 268 269def ns_create(args, pm): 270 """ 271 Create the network namespace in which the tests will be run and set up 272 the required network devices for it. 273 """ 274 if (USE_NS): 275 cmd = 'ip netns add $NS' 276 exec_cmd(args, pm, 'pre', cmd, False) 277 cmd = 'ip link add $DEV0 type veth peer name $DEV1' 278 exec_cmd(args, pm, 'pre', cmd, False) 279 cmd = 'ip link set $DEV1 netns $NS' 280 exec_cmd(args, pm, 'pre', cmd, False) 281 cmd = 'ip link set $DEV0 up' 282 exec_cmd(args, pm, 'pre', cmd, False) 283 cmd = 'ip -n $NS link set $DEV1 up' 284 exec_cmd(args, pm, 'pre', cmd, False) 285 cmd = 'ip link set $DEV2 netns $NS' 286 exec_cmd(args, pm, 'pre', cmd, False) 287 cmd = 'ip -n $NS link set $DEV2 up' 288 exec_cmd(args, pm, 'pre', cmd, False) 289 290 291def ns_destroy(args, pm): 292 """ 293 Destroy the network namespace for testing (and any associated network 294 devices as well) 295 """ 296 if (USE_NS): 297 cmd = 'ip netns delete $NS' 298 exec_cmd(args, pm, 'post', cmd, False) 299 300 301def has_blank_ids(idlist): 302 """ 303 Search the list for empty ID fields and return true/false accordingly. 304 """ 305 return not(all(k for k in idlist)) 306 307 308def load_from_file(filename): 309 """ 310 Open the JSON file containing the test cases and return them 311 as list of ordered dictionary objects. 312 """ 313 try: 314 with open(filename) as test_data: 315 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 316 except json.JSONDecodeError as jde: 317 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde)) 318 testlist = list() 319 else: 320 idlist = get_id_list(testlist) 321 if (has_blank_ids(idlist)): 322 for k in testlist: 323 k['filename'] = filename 324 return testlist 325 326 327def args_parse(): 328 """ 329 Create the argument parser. 330 """ 331 parser = argparse.ArgumentParser(description='Linux TC unit tests') 332 return parser 333 334 335def set_args(parser): 336 """ 337 Set the command line arguments for tdc. 338 """ 339 parser.add_argument( 340 '-p', '--path', type=str, 341 help='The full path to the tc executable to use') 342 sg = parser.add_argument_group( 343 'selection', 'select which test cases: ' + 344 'files plus directories; filtered by categories plus testids') 345 ag = parser.add_argument_group( 346 'action', 'select action to perform on selected test cases') 347 348 sg.add_argument( 349 '-D', '--directory', nargs='+', metavar='DIR', 350 help='Collect tests from the specified directory(ies) ' + 351 '(default [tc-tests])') 352 sg.add_argument( 353 '-f', '--file', nargs='+', metavar='FILE', 354 help='Run tests from the specified file(s)') 355 sg.add_argument( 356 '-c', '--category', nargs='*', metavar='CATG', default=['+c'], 357 help='Run tests only from the specified category/ies, ' + 358 'or if no category/ies is/are specified, list known categories.') 359 sg.add_argument( 360 '-e', '--execute', nargs='+', metavar='ID', 361 help='Execute the specified test cases with specified IDs') 362 ag.add_argument( 363 '-l', '--list', action='store_true', 364 help='List all test cases, or those only within the specified category') 365 ag.add_argument( 366 '-s', '--show', action='store_true', dest='showID', 367 help='Display the selected test cases') 368 ag.add_argument( 369 '-i', '--id', action='store_true', dest='gen_id', 370 help='Generate ID numbers for new test cases') 371 parser.add_argument( 372 '-v', '--verbose', action='count', default=0, 373 help='Show the commands that are being run') 374 parser.add_argument('-d', '--device', 375 help='Execute the test case in flower category') 376 return parser 377 378 379def check_default_settings(args, remaining, pm): 380 """ 381 Process any arguments overriding the default settings, 382 and ensure the settings are correct. 383 """ 384 # Allow for overriding specific settings 385 global NAMES 386 387 if args.path != None: 388 NAMES['TC'] = args.path 389 if args.device != None: 390 NAMES['DEV2'] = args.device 391 if not os.path.isfile(NAMES['TC']): 392 print("The specified tc path " + NAMES['TC'] + " does not exist.") 393 exit(1) 394 395 pm.call_check_args(args, remaining) 396 397 398def get_id_list(alltests): 399 """ 400 Generate a list of all IDs in the test cases. 401 """ 402 return [x["id"] for x in alltests] 403 404 405def check_case_id(alltests): 406 """ 407 Check for duplicate test case IDs. 408 """ 409 idl = get_id_list(alltests) 410 return [x for x in idl if idl.count(x) > 1] 411 412 413def does_id_exist(alltests, newid): 414 """ 415 Check if a given ID already exists in the list of test cases. 416 """ 417 idl = get_id_list(alltests) 418 return (any(newid == x for x in idl)) 419 420 421def generate_case_ids(alltests): 422 """ 423 If a test case has a blank ID field, generate a random hex ID for it 424 and then write the test cases back to disk. 425 """ 426 import random 427 for c in alltests: 428 if (c["id"] == ""): 429 while True: 430 newid = str('%04x' % random.randrange(16**4)) 431 if (does_id_exist(alltests, newid)): 432 continue 433 else: 434 c['id'] = newid 435 break 436 437 ufilename = [] 438 for c in alltests: 439 if ('filename' in c): 440 ufilename.append(c['filename']) 441 ufilename = get_unique_item(ufilename) 442 for f in ufilename: 443 testlist = [] 444 for t in alltests: 445 if 'filename' in t: 446 if t['filename'] == f: 447 del t['filename'] 448 testlist.append(t) 449 outfile = open(f, "w") 450 json.dump(testlist, outfile, indent=4) 451 outfile.close() 452 453def filter_tests_by_id(args, testlist): 454 ''' 455 Remove tests from testlist that are not in the named id list. 456 If id list is empty, return empty list. 457 ''' 458 newlist = list() 459 if testlist and args.execute: 460 target_ids = args.execute 461 462 if isinstance(target_ids, list) and (len(target_ids) > 0): 463 newlist = list(filter(lambda x: x['id'] in target_ids, testlist)) 464 return newlist 465 466def filter_tests_by_category(args, testlist): 467 ''' 468 Remove tests from testlist that are not in a named category. 469 ''' 470 answer = list() 471 if args.category and testlist: 472 test_ids = list() 473 for catg in set(args.category): 474 if catg == '+c': 475 continue 476 print('considering category {}'.format(catg)) 477 for tc in testlist: 478 if catg in tc['category'] and tc['id'] not in test_ids: 479 answer.append(tc) 480 test_ids.append(tc['id']) 481 482 return answer 483 484def get_test_cases(args): 485 """ 486 If a test case file is specified, retrieve tests from that file. 487 Otherwise, glob for all json files in subdirectories and load from 488 each one. 489 Also, if requested, filter by category, and add tests matching 490 certain ids. 491 """ 492 import fnmatch 493 494 flist = [] 495 testdirs = ['tc-tests'] 496 497 if args.file: 498 # at least one file was specified - remove the default directory 499 testdirs = [] 500 501 for ff in args.file: 502 if not os.path.isfile(ff): 503 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.") 504 else: 505 flist.append(os.path.abspath(ff)) 506 507 if args.directory: 508 testdirs = args.directory 509 510 for testdir in testdirs: 511 for root, dirnames, filenames in os.walk(testdir): 512 for filename in fnmatch.filter(filenames, '*.json'): 513 candidate = os.path.abspath(os.path.join(root, filename)) 514 if candidate not in testdirs: 515 flist.append(candidate) 516 517 alltestcases = list() 518 for casefile in flist: 519 alltestcases = alltestcases + (load_from_file(casefile)) 520 521 allcatlist = get_test_categories(alltestcases) 522 allidlist = get_id_list(alltestcases) 523 524 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist) 525 idtestcases = filter_tests_by_id(args, alltestcases) 526 cattestcases = filter_tests_by_category(args, alltestcases) 527 528 cat_ids = [x['id'] for x in cattestcases] 529 if args.execute: 530 if args.category: 531 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids] 532 else: 533 alltestcases = idtestcases 534 else: 535 if cat_ids: 536 alltestcases = cattestcases 537 else: 538 # just accept the existing value of alltestcases, 539 # which has been filtered by file/directory 540 pass 541 542 return allcatlist, allidlist, testcases_by_cats, alltestcases 543 544 545def set_operation_mode(pm, args): 546 """ 547 Load the test case data and process remaining arguments to determine 548 what the script should do for this run, and call the appropriate 549 function. 550 """ 551 ucat, idlist, testcases, alltests = get_test_cases(args) 552 553 if args.gen_id: 554 if (has_blank_ids(idlist)): 555 alltests = generate_case_ids(alltests) 556 else: 557 print("No empty ID fields found in test files.") 558 exit(0) 559 560 duplicate_ids = check_case_id(alltests) 561 if (len(duplicate_ids) > 0): 562 print("The following test case IDs are not unique:") 563 print(str(set(duplicate_ids))) 564 print("Please correct them before continuing.") 565 exit(1) 566 567 if args.showID: 568 for atest in alltests: 569 print_test_case(atest) 570 exit(0) 571 572 if isinstance(args.category, list) and (len(args.category) == 0): 573 print("Available categories:") 574 print_sll(ucat) 575 exit(0) 576 577 if args.list: 578 if args.list: 579 list_test_cases(alltests) 580 exit(0) 581 582 if (os.geteuid() != 0): 583 print("This script must be run with root privileges.\n") 584 exit(1) 585 586 ns_create(args, pm) 587 588 if len(alltests): 589 catresults = test_runner(pm, args, alltests) 590 else: 591 catresults = 'No tests found\n' 592 print('All test results: \n\n{}'.format(catresults)) 593 594 ns_destroy(args, pm) 595 596 597def main(): 598 """ 599 Start of execution; set up argument parser and get the arguments, 600 and start operations. 601 """ 602 parser = args_parse() 603 parser = set_args(parser) 604 pm = PluginMgr(parser) 605 parser = pm.call_add_args(parser) 606 (args, remaining) = parser.parse_known_args() 607 args.NAMES = NAMES 608 check_default_settings(args, remaining, pm) 609 if args.verbose > 2: 610 print('args is {}'.format(args)) 611 612 set_operation_mode(pm, args) 613 614 exit(0) 615 616 617if __name__ == "__main__": 618 main() 619