1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3 4""" 5tdc.py - Linux tc (Traffic Control) unit test driver 6 7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 8""" 9 10import re 11import os 12import sys 13import argparse 14import json 15import subprocess 16from collections import OrderedDict 17from string import Template 18 19from tdc_config import * 20from tdc_helper import * 21 22 23USE_NS = True 24 25 26def replace_keywords(cmd): 27 """ 28 For a given executable command, substitute any known 29 variables contained within NAMES with the correct values 30 """ 31 tcmd = Template(cmd) 32 subcmd = tcmd.safe_substitute(NAMES) 33 return subcmd 34 35 36def exec_cmd(command, nsonly=True): 37 """ 38 Perform any required modifications on an executable command, then run 39 it in a subprocess and return the results. 40 """ 41 if (USE_NS and nsonly): 42 command = 'ip netns exec $NS ' + command 43 44 if '$' in command: 45 command = replace_keywords(command) 46 47 proc = subprocess.Popen(command, 48 shell=True, 49 stdout=subprocess.PIPE, 50 stderr=subprocess.PIPE) 51 (rawout, serr) = proc.communicate() 52 53 if proc.returncode != 0 and len(serr) > 0: 54 foutput = serr.decode("utf-8") 55 else: 56 foutput = rawout.decode("utf-8") 57 58 proc.stdout.close() 59 proc.stderr.close() 60 return proc, foutput 61 62 63def prepare_env(cmdlist): 64 """ 65 Execute the setup/teardown commands for a test case. Optionally 66 terminate test execution if the command fails. 67 """ 68 for cmdinfo in cmdlist: 69 if (type(cmdinfo) == list): 70 exit_codes = cmdinfo[1:] 71 cmd = cmdinfo[0] 72 else: 73 exit_codes = [0] 74 cmd = cmdinfo 75 76 if (len(cmd) == 0): 77 continue 78 79 (proc, foutput) = exec_cmd(cmd) 80 81 if proc.returncode not in exit_codes: 82 print 83 print("Could not execute:") 84 print(cmd) 85 print("\nError message:") 86 print(foutput) 87 print("\nAborting test run.") 88 # ns_destroy() 89 raise Exception('prepare_env did not complete successfully') 90 91def run_one_test(index, tidx): 92 result = True 93 tresult = "" 94 tap = "" 95 print("Test " + tidx["id"] + ": " + tidx["name"]) 96 prepare_env(tidx["setup"]) 97 (p, procout) = exec_cmd(tidx["cmdUnderTest"]) 98 exit_code = p.returncode 99 100 if (exit_code != int(tidx["expExitCode"])): 101 result = False 102 print("exit:", exit_code, int(tidx["expExitCode"])) 103 print(procout) 104 else: 105 match_pattern = re.compile(str(tidx["matchPattern"]), 106 re.DOTALL | re.MULTILINE) 107 (p, procout) = exec_cmd(tidx["verifyCmd"]) 108 match_index = re.findall(match_pattern, procout) 109 if len(match_index) != int(tidx["matchCount"]): 110 result = False 111 112 if not result: 113 tresult += "not " 114 tresult += "ok {} - {} # {}\n".format(str(index), tidx['id'], tidx["name"]) 115 tap += tresult 116 117 if result == False: 118 tap += procout 119 120 prepare_env(tidx["teardown"]) 121 index += 1 122 123 return tap 124 125def test_runner(filtered_tests, args): 126 """ 127 Driver function for the unit tests. 128 129 Prints information about the tests being run, executes the setup and 130 teardown commands and the command under test itself. Also determines 131 success/failure based on the information in the test case and generates 132 TAP output accordingly. 133 """ 134 testlist = filtered_tests 135 tcount = len(testlist) 136 index = 1 137 tap = str(index) + ".." + str(tcount) + "\n" 138 139 for tidx in testlist: 140 if "flower" in tidx["category"] and args.device == None: 141 continue 142 try: 143 badtest = tidx # in case it goes bad 144 tap += run_one_test(index, tidx) 145 except Exception as ee: 146 print('Exception {} (caught in test_runner, running test {} {} {})'. 147 format(ee, index, tidx['id'], tidx['name'])) 148 break 149 index += 1 150 151 count = index 152 tap += 'about to flush the tap output if tests need to be skipped\n' 153 if tcount + 1 != index: 154 for tidx in testlist[index - 1:]: 155 msg = 'skipped - previous setup or teardown failed' 156 tap += 'ok {} - {} # {} {} {} \n'.format( 157 count, tidx['id'], msg, index, badtest.get('id', '--Unknown--')) 158 count += 1 159 160 tap += 'done flushing skipped test tap output\n' 161 162 return tap 163 164 165def ns_create(): 166 """ 167 Create the network namespace in which the tests will be run and set up 168 the required network devices for it. 169 """ 170 if (USE_NS): 171 cmd = 'ip netns add $NS' 172 exec_cmd(cmd, False) 173 cmd = 'ip link add $DEV0 type veth peer name $DEV1' 174 exec_cmd(cmd, False) 175 cmd = 'ip link set $DEV1 netns $NS' 176 exec_cmd(cmd, False) 177 cmd = 'ip link set $DEV0 up' 178 exec_cmd(cmd, False) 179 cmd = 'ip -n $NS link set $DEV1 up' 180 exec_cmd(cmd, False) 181 cmd = 'ip link set $DEV2 netns $NS' 182 exec_cmd(cmd, False) 183 cmd = 'ip -n $NS link set $DEV2 up' 184 exec_cmd(cmd, False) 185 186 187def ns_destroy(): 188 """ 189 Destroy the network namespace for testing (and any associated network 190 devices as well) 191 """ 192 if (USE_NS): 193 cmd = 'ip netns delete $NS' 194 exec_cmd(cmd, False) 195 196 197def has_blank_ids(idlist): 198 """ 199 Search the list for empty ID fields and return true/false accordingly. 200 """ 201 return not(all(k for k in idlist)) 202 203 204def load_from_file(filename): 205 """ 206 Open the JSON file containing the test cases and return them 207 as list of ordered dictionary objects. 208 """ 209 try: 210 with open(filename) as test_data: 211 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 212 except json.JSONDecodeError as jde: 213 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde)) 214 testlist = list() 215 else: 216 idlist = get_id_list(testlist) 217 if (has_blank_ids(idlist)): 218 for k in testlist: 219 k['filename'] = filename 220 return testlist 221 222 223def args_parse(): 224 """ 225 Create the argument parser. 226 """ 227 parser = argparse.ArgumentParser(description='Linux TC unit tests') 228 return parser 229 230 231def set_args(parser): 232 """ 233 Set the command line arguments for tdc. 234 """ 235 parser.add_argument( 236 '-p', '--path', type=str, 237 help='The full path to the tc executable to use') 238 sg = parser.add_argument_group( 239 'selection', 'select which test cases: ' + 240 'files plus directories; filtered by categories plus testids') 241 ag = parser.add_argument_group( 242 'action', 'select action to perform on selected test cases') 243 244 sg.add_argument( 245 '-D', '--directory', nargs='+', metavar='DIR', 246 help='Collect tests from the specified directory(ies) ' + 247 '(default [tc-tests])') 248 sg.add_argument( 249 '-f', '--file', nargs='+', metavar='FILE', 250 help='Run tests from the specified file(s)') 251 sg.add_argument( 252 '-c', '--category', nargs='*', metavar='CATG', default=['+c'], 253 help='Run tests only from the specified category/ies, ' + 254 'or if no category/ies is/are specified, list known categories.') 255 sg.add_argument( 256 '-e', '--execute', nargs='+', metavar='ID', 257 help='Execute the specified test cases with specified IDs') 258 ag.add_argument( 259 '-l', '--list', action='store_true', 260 help='List all test cases, or those only within the specified category') 261 ag.add_argument( 262 '-s', '--show', action='store_true', dest='showID', 263 help='Display the selected test cases') 264 ag.add_argument( 265 '-i', '--id', action='store_true', dest='gen_id', 266 help='Generate ID numbers for new test cases') 267 parser.add_argument( 268 '-v', '--verbose', action='count', default=0, 269 help='Show the commands that are being run') 270 parser.add_argument('-d', '--device', 271 help='Execute the test case in flower category') 272 return parser 273 274 275def check_default_settings(args): 276 """ 277 Process any arguments overriding the default settings, and ensure the 278 settings are correct. 279 """ 280 # Allow for overriding specific settings 281 global NAMES 282 283 if args.path != None: 284 NAMES['TC'] = args.path 285 if args.device != None: 286 NAMES['DEV2'] = args.device 287 if not os.path.isfile(NAMES['TC']): 288 print("The specified tc path " + NAMES['TC'] + " does not exist.") 289 exit(1) 290 291 292def get_id_list(alltests): 293 """ 294 Generate a list of all IDs in the test cases. 295 """ 296 return [x["id"] for x in alltests] 297 298 299def check_case_id(alltests): 300 """ 301 Check for duplicate test case IDs. 302 """ 303 idl = get_id_list(alltests) 304 # print('check_case_id: idl is {}'.format(idl)) 305 # answer = list() 306 # for x in idl: 307 # print('Looking at {}'.format(x)) 308 # print('what the heck is idl.count(x)??? {}'.format(idl.count(x))) 309 # if idl.count(x) > 1: 310 # answer.append(x) 311 # print(' ... append it {}'.format(x)) 312 return [x for x in idl if idl.count(x) > 1] 313 return answer 314 315 316def does_id_exist(alltests, newid): 317 """ 318 Check if a given ID already exists in the list of test cases. 319 """ 320 idl = get_id_list(alltests) 321 return (any(newid == x for x in idl)) 322 323 324def generate_case_ids(alltests): 325 """ 326 If a test case has a blank ID field, generate a random hex ID for it 327 and then write the test cases back to disk. 328 """ 329 import random 330 for c in alltests: 331 if (c["id"] == ""): 332 while True: 333 newid = str('%04x' % random.randrange(16**4)) 334 if (does_id_exist(alltests, newid)): 335 continue 336 else: 337 c['id'] = newid 338 break 339 340 ufilename = [] 341 for c in alltests: 342 if ('filename' in c): 343 ufilename.append(c['filename']) 344 ufilename = get_unique_item(ufilename) 345 for f in ufilename: 346 testlist = [] 347 for t in alltests: 348 if 'filename' in t: 349 if t['filename'] == f: 350 del t['filename'] 351 testlist.append(t) 352 outfile = open(f, "w") 353 json.dump(testlist, outfile, indent=4) 354 outfile.close() 355 356def filter_tests_by_id(args, testlist): 357 ''' 358 Remove tests from testlist that are not in the named id list. 359 If id list is empty, return empty list. 360 ''' 361 newlist = list() 362 if testlist and args.execute: 363 target_ids = args.execute 364 365 if isinstance(target_ids, list) and (len(target_ids) > 0): 366 newlist = list(filter(lambda x: x['id'] in target_ids, testlist)) 367 return newlist 368 369def filter_tests_by_category(args, testlist): 370 ''' 371 Remove tests from testlist that are not in a named category. 372 ''' 373 answer = list() 374 if args.category and testlist: 375 test_ids = list() 376 for catg in set(args.category): 377 if catg == '+c': 378 continue 379 print('considering category {}'.format(catg)) 380 for tc in testlist: 381 if catg in tc['category'] and tc['id'] not in test_ids: 382 answer.append(tc) 383 test_ids.append(tc['id']) 384 385 return answer 386 387def get_test_cases(args): 388 """ 389 If a test case file is specified, retrieve tests from that file. 390 Otherwise, glob for all json files in subdirectories and load from 391 each one. 392 Also, if requested, filter by category, and add tests matching 393 certain ids. 394 """ 395 import fnmatch 396 397 flist = [] 398 testdirs = ['tc-tests'] 399 400 if args.file: 401 # at least one file was specified - remove the default directory 402 testdirs = [] 403 404 for ff in args.file: 405 if not os.path.isfile(ff): 406 print("IGNORING file " + ff + " \n\tBECAUSE does not exist.") 407 else: 408 flist.append(os.path.abspath(ff)) 409 410 if args.directory: 411 testdirs = args.directory 412 413 for testdir in testdirs: 414 for root, dirnames, filenames in os.walk(testdir): 415 for filename in fnmatch.filter(filenames, '*.json'): 416 candidate = os.path.abspath(os.path.join(root, filename)) 417 if candidate not in testdirs: 418 flist.append(candidate) 419 420 alltestcases = list() 421 for casefile in flist: 422 alltestcases = alltestcases + (load_from_file(casefile)) 423 424 allcatlist = get_test_categories(alltestcases) 425 allidlist = get_id_list(alltestcases) 426 427 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist) 428 idtestcases = filter_tests_by_id(args, alltestcases) 429 cattestcases = filter_tests_by_category(args, alltestcases) 430 431 cat_ids = [x['id'] for x in cattestcases] 432 if args.execute: 433 if args.category: 434 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids] 435 else: 436 alltestcases = idtestcases 437 else: 438 if cat_ids: 439 alltestcases = cattestcases 440 else: 441 # just accept the existing value of alltestcases, 442 # which has been filtered by file/directory 443 pass 444 445 return allcatlist, allidlist, testcases_by_cats, alltestcases 446 447 448def set_operation_mode(args): 449 """ 450 Load the test case data and process remaining arguments to determine 451 what the script should do for this run, and call the appropriate 452 function. 453 """ 454 ucat, idlist, testcases, alltests = get_test_cases(args) 455 456 if args.gen_id: 457 if (has_blank_ids(idlist)): 458 alltests = generate_case_ids(alltests) 459 else: 460 print("No empty ID fields found in test files.") 461 exit(0) 462 463 duplicate_ids = check_case_id(alltests) 464 if (len(duplicate_ids) > 0): 465 print("The following test case IDs are not unique:") 466 print(str(set(duplicate_ids))) 467 print("Please correct them before continuing.") 468 exit(1) 469 470 if args.showID: 471 for atest in alltests: 472 print_test_case(atest) 473 exit(0) 474 475 if isinstance(args.category, list) and (len(args.category) == 0): 476 print("Available categories:") 477 print_sll(ucat) 478 exit(0) 479 480 if args.list: 481 if args.list: 482 list_test_cases(alltests) 483 exit(0) 484 485 if (os.geteuid() != 0): 486 print("This script must be run with root privileges.\n") 487 exit(1) 488 489 ns_create() 490 491 catresults = test_runner(alltests, args) 492 print('All test results: \n\n{}'.format(catresults)) 493 494 ns_destroy() 495 496 497def main(): 498 """ 499 Start of execution; set up argument parser and get the arguments, 500 and start operations. 501 """ 502 parser = args_parse() 503 parser = set_args(parser) 504 (args, remaining) = parser.parse_known_args() 505 check_default_settings(args) 506 507 set_operation_mode(args) 508 509 exit(0) 510 511 512if __name__ == "__main__": 513 main() 514