1#!/usr/bin/env python3 2 3""" 4tdc.py - Linux tc (Traffic Control) unit test driver 5 6Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com> 7""" 8 9import re 10import os 11import sys 12import argparse 13import json 14import subprocess 15from collections import OrderedDict 16from string import Template 17 18from tdc_config import * 19from tdc_helper import * 20 21 22USE_NS = True 23 24 25def replace_keywords(cmd): 26 """ 27 For a given executable command, substitute any known 28 variables contained within NAMES with the correct values 29 """ 30 tcmd = Template(cmd) 31 subcmd = tcmd.safe_substitute(NAMES) 32 return subcmd 33 34 35def exec_cmd(command, nsonly=True): 36 """ 37 Perform any required modifications on an executable command, then run 38 it in a subprocess and return the results. 39 """ 40 if (USE_NS and nsonly): 41 command = 'ip netns exec $NS ' + command 42 43 if '$' in command: 44 command = replace_keywords(command) 45 46 proc = subprocess.Popen(command, 47 shell=True, 48 stdout=subprocess.PIPE, 49 stderr=subprocess.PIPE) 50 (rawout, serr) = proc.communicate() 51 52 if proc.returncode != 0 and len(serr) > 0: 53 foutput = serr.decode("utf-8") 54 else: 55 foutput = rawout.decode("utf-8") 56 57 proc.stdout.close() 58 proc.stderr.close() 59 return proc, foutput 60 61 62def prepare_env(cmdlist): 63 """ 64 Execute the setup/teardown commands for a test case. Optionally 65 terminate test execution if the command fails. 66 """ 67 for cmdinfo in cmdlist: 68 if (type(cmdinfo) == list): 69 exit_codes = cmdinfo[1:] 70 cmd = cmdinfo[0] 71 else: 72 exit_codes = [0] 73 cmd = cmdinfo 74 75 if (len(cmd) == 0): 76 continue 77 78 (proc, foutput) = exec_cmd(cmd) 79 80 if proc.returncode not in exit_codes: 81 print 82 print("Could not execute:") 83 print(cmd) 84 print("\nError message:") 85 print(foutput) 86 print("\nAborting test run.") 87 ns_destroy() 88 exit(1) 89 90 91def test_runner(filtered_tests, args): 92 """ 93 Driver function for the unit tests. 94 95 Prints information about the tests being run, executes the setup and 96 teardown commands and the command under test itself. Also determines 97 success/failure based on the information in the test case and generates 98 TAP output accordingly. 99 """ 100 testlist = filtered_tests 101 tcount = len(testlist) 102 index = 1 103 tap = str(index) + ".." + str(tcount) + "\n" 104 105 for tidx in testlist: 106 result = True 107 tresult = "" 108 if "flower" in tidx["category"] and args.device == None: 109 continue 110 print("Test " + tidx["id"] + ": " + tidx["name"]) 111 prepare_env(tidx["setup"]) 112 (p, procout) = exec_cmd(tidx["cmdUnderTest"]) 113 exit_code = p.returncode 114 115 if (exit_code != int(tidx["expExitCode"])): 116 result = False 117 print("exit:", exit_code, int(tidx["expExitCode"])) 118 print(procout) 119 else: 120 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL) 121 (p, procout) = exec_cmd(tidx["verifyCmd"]) 122 match_index = re.findall(match_pattern, procout) 123 if len(match_index) != int(tidx["matchCount"]): 124 result = False 125 126 if result == True: 127 tresult += "ok " 128 else: 129 tresult += "not ok " 130 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n" 131 132 if result == False: 133 tap += procout 134 135 prepare_env(tidx["teardown"]) 136 index += 1 137 138 return tap 139 140 141def ns_create(): 142 """ 143 Create the network namespace in which the tests will be run and set up 144 the required network devices for it. 145 """ 146 if (USE_NS): 147 cmd = 'ip netns add $NS' 148 exec_cmd(cmd, False) 149 cmd = 'ip link add $DEV0 type veth peer name $DEV1' 150 exec_cmd(cmd, False) 151 cmd = 'ip link set $DEV1 netns $NS' 152 exec_cmd(cmd, False) 153 cmd = 'ip link set $DEV0 up' 154 exec_cmd(cmd, False) 155 cmd = 'ip -s $NS link set $DEV1 up' 156 exec_cmd(cmd, False) 157 cmd = 'ip link set $DEV2 netns $NS' 158 exec_cmd(cmd, False) 159 cmd = 'ip -s $NS link set $DEV2 up' 160 exec_cmd(cmd, False) 161 162 163def ns_destroy(): 164 """ 165 Destroy the network namespace for testing (and any associated network 166 devices as well) 167 """ 168 if (USE_NS): 169 cmd = 'ip netns delete $NS' 170 exec_cmd(cmd, False) 171 172 173def has_blank_ids(idlist): 174 """ 175 Search the list for empty ID fields and return true/false accordingly. 176 """ 177 return not(all(k for k in idlist)) 178 179 180def load_from_file(filename): 181 """ 182 Open the JSON file containing the test cases and return them 183 as list of ordered dictionary objects. 184 """ 185 try: 186 with open(filename) as test_data: 187 testlist = json.load(test_data, object_pairs_hook=OrderedDict) 188 except json.JSONDecodeError as jde: 189 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde)) 190 testlist = list() 191 else: 192 idlist = get_id_list(testlist) 193 if (has_blank_ids(idlist)): 194 for k in testlist: 195 k['filename'] = filename 196 return testlist 197 198 199def args_parse(): 200 """ 201 Create the argument parser. 202 """ 203 parser = argparse.ArgumentParser(description='Linux TC unit tests') 204 return parser 205 206 207def set_args(parser): 208 """ 209 Set the command line arguments for tdc. 210 """ 211 parser.add_argument('-p', '--path', type=str, 212 help='The full path to the tc executable to use') 213 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c', 214 help='Run tests only from the specified category, or if no category is specified, list known categories.') 215 parser.add_argument('-f', '--file', type=str, 216 help='Run tests from the specified file') 217 parser.add_argument('-l', '--list', type=str, nargs='?', const="++", metavar='CATEGORY', 218 help='List all test cases, or those only within the specified category') 219 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID', 220 help='Display the test case with specified id') 221 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID', 222 help='Execute the single test case with specified ID') 223 parser.add_argument('-i', '--id', action='store_true', dest='gen_id', 224 help='Generate ID numbers for new test cases') 225 parser.add_argument('-d', '--device', 226 help='Execute the test case in flower category') 227 return parser 228 229 230def check_default_settings(args): 231 """ 232 Process any arguments overriding the default settings, and ensure the 233 settings are correct. 234 """ 235 # Allow for overriding specific settings 236 global NAMES 237 238 if args.path != None: 239 NAMES['TC'] = args.path 240 if args.device != None: 241 NAMES['DEV2'] = args.device 242 if not os.path.isfile(NAMES['TC']): 243 print("The specified tc path " + NAMES['TC'] + " does not exist.") 244 exit(1) 245 246 247def get_id_list(alltests): 248 """ 249 Generate a list of all IDs in the test cases. 250 """ 251 return [x["id"] for x in alltests] 252 253 254def check_case_id(alltests): 255 """ 256 Check for duplicate test case IDs. 257 """ 258 idl = get_id_list(alltests) 259 return [x for x in idl if idl.count(x) > 1] 260 261 262def does_id_exist(alltests, newid): 263 """ 264 Check if a given ID already exists in the list of test cases. 265 """ 266 idl = get_id_list(alltests) 267 return (any(newid == x for x in idl)) 268 269 270def generate_case_ids(alltests): 271 """ 272 If a test case has a blank ID field, generate a random hex ID for it 273 and then write the test cases back to disk. 274 """ 275 import random 276 for c in alltests: 277 if (c["id"] == ""): 278 while True: 279 newid = str('%04x' % random.randrange(16**4)) 280 if (does_id_exist(alltests, newid)): 281 continue 282 else: 283 c['id'] = newid 284 break 285 286 ufilename = [] 287 for c in alltests: 288 if ('filename' in c): 289 ufilename.append(c['filename']) 290 ufilename = get_unique_item(ufilename) 291 for f in ufilename: 292 testlist = [] 293 for t in alltests: 294 if 'filename' in t: 295 if t['filename'] == f: 296 del t['filename'] 297 testlist.append(t) 298 outfile = open(f, "w") 299 json.dump(testlist, outfile, indent=4) 300 outfile.close() 301 302 303def get_test_cases(args): 304 """ 305 If a test case file is specified, retrieve tests from that file. 306 Otherwise, glob for all json files in subdirectories and load from 307 each one. 308 """ 309 import fnmatch 310 if args.file != None: 311 if not os.path.isfile(args.file): 312 print("The specified test case file " + args.file + " does not exist.") 313 exit(1) 314 flist = [args.file] 315 else: 316 flist = [] 317 for root, dirnames, filenames in os.walk('tc-tests'): 318 for filename in fnmatch.filter(filenames, '*.json'): 319 flist.append(os.path.join(root, filename)) 320 alltests = list() 321 for casefile in flist: 322 alltests = alltests + (load_from_file(casefile)) 323 return alltests 324 325 326def set_operation_mode(args): 327 """ 328 Load the test case data and process remaining arguments to determine 329 what the script should do for this run, and call the appropriate 330 function. 331 """ 332 alltests = get_test_cases(args) 333 334 if args.gen_id: 335 idlist = get_id_list(alltests) 336 if (has_blank_ids(idlist)): 337 alltests = generate_case_ids(alltests) 338 else: 339 print("No empty ID fields found in test files.") 340 exit(0) 341 342 duplicate_ids = check_case_id(alltests) 343 if (len(duplicate_ids) > 0): 344 print("The following test case IDs are not unique:") 345 print(str(set(duplicate_ids))) 346 print("Please correct them before continuing.") 347 exit(1) 348 349 ucat = get_test_categories(alltests) 350 351 if args.showID: 352 show_test_case_by_id(alltests, args.showID[0]) 353 exit(0) 354 355 if args.execute: 356 target_id = args.execute[0] 357 else: 358 target_id = "" 359 360 if args.category: 361 if (args.category == '+c'): 362 print("Available categories:") 363 print_sll(ucat) 364 exit(0) 365 else: 366 target_category = args.category 367 else: 368 target_category = "" 369 370 371 testcases = get_categorized_testlist(alltests, ucat) 372 373 if args.list: 374 if (args.list == "++"): 375 list_test_cases(alltests) 376 exit(0) 377 elif(len(args.list) > 0): 378 if (args.list not in ucat): 379 print("Unknown category " + args.list) 380 print("Available categories:") 381 print_sll(ucat) 382 exit(1) 383 list_test_cases(testcases[args.list]) 384 exit(0) 385 386 if (os.geteuid() != 0): 387 print("This script must be run with root privileges.\n") 388 exit(1) 389 390 ns_create() 391 392 if (len(target_category) == 0): 393 if (len(target_id) > 0): 394 alltests = list(filter(lambda x: target_id in x['id'], alltests)) 395 if (len(alltests) == 0): 396 print("Cannot find a test case with ID matching " + target_id) 397 exit(1) 398 catresults = test_runner(alltests, args) 399 print("All test results: " + "\n\n" + catresults) 400 elif (len(target_category) > 0): 401 if (target_category == "flower") and args.device == None: 402 print("Please specify a NIC device (-d) to run category flower") 403 exit(1) 404 if (target_category not in ucat): 405 print("Specified category is not present in this file.") 406 exit(1) 407 else: 408 catresults = test_runner(testcases[target_category], args) 409 print("Category " + target_category + "\n\n" + catresults) 410 411 ns_destroy() 412 413 414def main(): 415 """ 416 Start of execution; set up argument parser and get the arguments, 417 and start operations. 418 """ 419 parser = args_parse() 420 parser = set_args(parser) 421 (args, remaining) = parser.parse_known_args() 422 check_default_settings(args) 423 424 set_operation_mode(args) 425 426 exit(0) 427 428 429if __name__ == "__main__": 430 main() 431