1# SPDX-License-Identifier: GPL-2.0 2# 3# Parses KTAP test results from a kernel dmesg log and incrementally prints 4# results with reader-friendly format. Stores and returns test results in a 5# Test object. 6# 7# Copyright (C) 2019, Google LLC. 8# Author: Felix Guo <felixguoxiuping@gmail.com> 9# Author: Brendan Higgins <brendanhiggins@google.com> 10# Author: Rae Moar <rmoar@google.com> 11 12from __future__ import annotations 13from dataclasses import dataclass 14import re 15import textwrap 16 17from enum import Enum, auto 18from typing import Iterable, Iterator, List, Optional, Tuple 19 20from kunit_printer import Printer, stdout 21 22class Test: 23 """ 24 A class to represent a test parsed from KTAP results. All KTAP 25 results within a test log are stored in a main Test object as 26 subtests. 27 28 Attributes: 29 status : TestStatus - status of the test 30 name : str - name of the test 31 expected_count : int - expected number of subtests (0 if single 32 test case and None if unknown expected number of subtests) 33 subtests : List[Test] - list of subtests 34 log : List[str] - log of KTAP lines that correspond to the test 35 counts : TestCounts - counts of the test statuses and errors of 36 subtests or of the test itself if the test is a single 37 test case. 38 """ 39 def __init__(self) -> None: 40 """Creates Test object with default attributes.""" 41 self.status = TestStatus.TEST_CRASHED 42 self.name = '' 43 self.expected_count = 0 # type: Optional[int] 44 self.subtests = [] # type: List[Test] 45 self.log = [] # type: List[str] 46 self.counts = TestCounts() 47 48 def __str__(self) -> str: 49 """Returns string representation of a Test class object.""" 50 return (f'Test({self.status}, {self.name}, {self.expected_count}, ' 51 f'{self.subtests}, {self.log}, {self.counts})') 52 53 def __repr__(self) -> str: 54 """Returns string representation of a Test class object.""" 55 return str(self) 56 57 def add_error(self, printer: Printer, error_message: str) -> None: 58 """Records an error that occurred while parsing this test.""" 59 self.counts.errors += 1 60 printer.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}') 61 62 def ok_status(self) -> bool: 63 """Returns true if the status was ok, i.e. passed or skipped.""" 64 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED) 65 66class TestStatus(Enum): 67 """An enumeration class to represent the status of a test.""" 68 SUCCESS = auto() 69 FAILURE = auto() 70 SKIPPED = auto() 71 TEST_CRASHED = auto() 72 NO_TESTS = auto() 73 FAILURE_TO_PARSE_TESTS = auto() 74 75@dataclass 76class TestCounts: 77 """ 78 Tracks the counts of statuses of all test cases and any errors within 79 a Test. 80 """ 81 passed: int = 0 82 failed: int = 0 83 crashed: int = 0 84 skipped: int = 0 85 errors: int = 0 86 87 def __str__(self) -> str: 88 """Returns the string representation of a TestCounts object.""" 89 statuses = [('passed', self.passed), ('failed', self.failed), 90 ('crashed', self.crashed), ('skipped', self.skipped), 91 ('errors', self.errors)] 92 return f'Ran {self.total()} tests: ' + \ 93 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0) 94 95 def total(self) -> int: 96 """Returns the total number of test cases within a test 97 object, where a test case is a test with no subtests. 98 """ 99 return (self.passed + self.failed + self.crashed + 100 self.skipped) 101 102 def add_subtest_counts(self, counts: TestCounts) -> None: 103 """ 104 Adds the counts of another TestCounts object to the current 105 TestCounts object. Used to add the counts of a subtest to the 106 parent test. 107 108 Parameters: 109 counts - a different TestCounts object whose counts 110 will be added to the counts of the TestCounts object 111 """ 112 self.passed += counts.passed 113 self.failed += counts.failed 114 self.crashed += counts.crashed 115 self.skipped += counts.skipped 116 self.errors += counts.errors 117 118 def get_status(self) -> TestStatus: 119 """Returns the aggregated status of a Test using test 120 counts. 121 """ 122 if self.total() == 0: 123 return TestStatus.NO_TESTS 124 if self.crashed: 125 # Crashes should take priority. 126 return TestStatus.TEST_CRASHED 127 if self.failed: 128 return TestStatus.FAILURE 129 if self.passed: 130 # No failures or crashes, looks good! 131 return TestStatus.SUCCESS 132 # We have only skipped tests. 133 return TestStatus.SKIPPED 134 135 def add_status(self, status: TestStatus) -> None: 136 """Increments the count for `status`.""" 137 if status == TestStatus.SUCCESS: 138 self.passed += 1 139 elif status == TestStatus.FAILURE: 140 self.failed += 1 141 elif status == TestStatus.SKIPPED: 142 self.skipped += 1 143 elif status != TestStatus.NO_TESTS: 144 self.crashed += 1 145 146class LineStream: 147 """ 148 A class to represent the lines of kernel output. 149 Provides a lazy peek()/pop() interface over an iterator of 150 (line#, text). 151 """ 152 _lines: Iterator[Tuple[int, str]] 153 _next: Tuple[int, str] 154 _need_next: bool 155 _done: bool 156 157 def __init__(self, lines: Iterator[Tuple[int, str]]): 158 """Creates a new LineStream that wraps the given iterator.""" 159 self._lines = lines 160 self._done = False 161 self._need_next = True 162 self._next = (0, '') 163 164 def _get_next(self) -> None: 165 """Advances the LineSteam to the next line, if necessary.""" 166 if not self._need_next: 167 return 168 try: 169 self._next = next(self._lines) 170 except StopIteration: 171 self._done = True 172 finally: 173 self._need_next = False 174 175 def peek(self) -> str: 176 """Returns the current line, without advancing the LineStream. 177 """ 178 self._get_next() 179 return self._next[1] 180 181 def pop(self) -> str: 182 """Returns the current line and advances the LineStream to 183 the next line. 184 """ 185 s = self.peek() 186 if self._done: 187 raise ValueError(f'LineStream: going past EOF, last line was {s}') 188 self._need_next = True 189 return s 190 191 def __bool__(self) -> bool: 192 """Returns True if stream has more lines.""" 193 self._get_next() 194 return not self._done 195 196 # Only used by kunit_tool_test.py. 197 def __iter__(self) -> Iterator[str]: 198 """Empties all lines stored in LineStream object into 199 Iterator object and returns the Iterator object. 200 """ 201 while bool(self): 202 yield self.pop() 203 204 def line_number(self) -> int: 205 """Returns the line number of the current line.""" 206 self._get_next() 207 return self._next[0] 208 209# Parsing helper methods: 210 211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$') 212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$') 213KTAP_END = re.compile(r'\s*(List of all partitions:|' 214 'Kernel panic - not syncing: VFS:|reboot: System halted)') 215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$') 216 217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: 218 """Extracts KTAP lines from the kernel output.""" 219 def isolate_ktap_output(kernel_output: Iterable[str]) \ 220 -> Iterator[Tuple[int, str]]: 221 line_num = 0 222 started = False 223 for line in kernel_output: 224 line_num += 1 225 line = line.rstrip() # remove trailing \n 226 if not started and KTAP_START.search(line): 227 # start extracting KTAP lines and set prefix 228 # to number of characters before version line 229 prefix_len = len( 230 line.split('KTAP version')[0]) 231 started = True 232 yield line_num, line[prefix_len:] 233 elif not started and TAP_START.search(line): 234 # start extracting KTAP lines and set prefix 235 # to number of characters before version line 236 prefix_len = len(line.split('TAP version')[0]) 237 started = True 238 yield line_num, line[prefix_len:] 239 elif started and KTAP_END.search(line): 240 # stop extracting KTAP lines 241 break 242 elif started: 243 # remove the prefix, if any. 244 line = line[prefix_len:] 245 yield line_num, line 246 elif EXECUTOR_ERROR.search(line): 247 yield line_num, line 248 return LineStream(lines=isolate_ktap_output(kernel_output)) 249 250KTAP_VERSIONS = [1] 251TAP_VERSIONS = [13, 14] 252 253def check_version(version_num: int, accepted_versions: List[int], 254 version_type: str, test: Test, printer: Printer) -> None: 255 """ 256 Adds error to test object if version number is too high or too 257 low. 258 259 Parameters: 260 version_num - The inputted version number from the parsed KTAP or TAP 261 header line 262 accepted_version - List of accepted KTAP or TAP versions 263 version_type - 'KTAP' or 'TAP' depending on the type of 264 version line. 265 test - Test object for current test being parsed 266 printer - Printer object to output error 267 """ 268 if version_num < min(accepted_versions): 269 test.add_error(printer, f'{version_type} version lower than expected!') 270 elif version_num > max(accepted_versions): 271 test.add_error(printer, f'{version_type} version higer than expected!') 272 273def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool: 274 """ 275 Parses KTAP/TAP header line and checks version number. 276 Returns False if fails to parse KTAP/TAP header line. 277 278 Accepted formats: 279 - 'KTAP version [version number]' 280 - 'TAP version [version number]' 281 282 Parameters: 283 lines - LineStream of KTAP output to parse 284 test - Test object for current test being parsed 285 printer - Printer object to output results 286 287 Return: 288 True if successfully parsed KTAP/TAP header line 289 """ 290 ktap_match = KTAP_START.match(lines.peek()) 291 tap_match = TAP_START.match(lines.peek()) 292 if ktap_match: 293 version_num = int(ktap_match.group(1)) 294 check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer) 295 elif tap_match: 296 version_num = int(tap_match.group(1)) 297 check_version(version_num, TAP_VERSIONS, 'TAP', test, printer) 298 else: 299 return False 300 lines.pop() 301 return True 302 303TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$') 304 305def parse_test_header(lines: LineStream, test: Test) -> bool: 306 """ 307 Parses test header and stores test name in test object. 308 Returns False if fails to parse test header line. 309 310 Accepted format: 311 - '# Subtest: [test name]' 312 313 Parameters: 314 lines - LineStream of KTAP output to parse 315 test - Test object for current test being parsed 316 317 Return: 318 True if successfully parsed test header line 319 """ 320 match = TEST_HEADER.match(lines.peek()) 321 if not match: 322 return False 323 test.name = match.group(1) 324 lines.pop() 325 return True 326 327TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)') 328 329def parse_test_plan(lines: LineStream, test: Test) -> bool: 330 """ 331 Parses test plan line and stores the expected number of subtests in 332 test object. Reports an error if expected count is 0. 333 Returns False and sets expected_count to None if there is no valid test 334 plan. 335 336 Accepted format: 337 - '1..[number of subtests]' 338 339 Parameters: 340 lines - LineStream of KTAP output to parse 341 test - Test object for current test being parsed 342 343 Return: 344 True if successfully parsed test plan line 345 """ 346 match = TEST_PLAN.match(lines.peek()) 347 if not match: 348 test.expected_count = None 349 return False 350 expected_count = int(match.group(1)) 351 test.expected_count = expected_count 352 lines.pop() 353 return True 354 355TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(- )?([^#]*)( # .*)?$') 356 357TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(- )?(.*) # SKIP ?(.*)$') 358 359def peek_test_name_match(lines: LineStream, test: Test) -> bool: 360 """ 361 Matches current line with the format of a test result line and checks 362 if the name matches the name of the current test. 363 Returns False if fails to match format or name. 364 365 Accepted format: 366 - '[ok|not ok] [test number] [-] [test name] [optional skip 367 directive]' 368 369 Parameters: 370 lines - LineStream of KTAP output to parse 371 test - Test object for current test being parsed 372 373 Return: 374 True if matched a test result line and the name matching the 375 expected test name 376 """ 377 line = lines.peek() 378 match = TEST_RESULT.match(line) 379 if not match: 380 return False 381 name = match.group(4) 382 if not name: 383 return False 384 return name == test.name 385 386def parse_test_result(lines: LineStream, test: Test, 387 expected_num: int, printer: Printer) -> bool: 388 """ 389 Parses test result line and stores the status and name in the test 390 object. Reports an error if the test number does not match expected 391 test number. 392 Returns False if fails to parse test result line. 393 394 Note that the SKIP directive is the only direction that causes a 395 change in status. 396 397 Accepted format: 398 - '[ok|not ok] [test number] [-] [test name] [optional skip 399 directive]' 400 401 Parameters: 402 lines - LineStream of KTAP output to parse 403 test - Test object for current test being parsed 404 expected_num - expected test number for current test 405 printer - Printer object to output results 406 407 Return: 408 True if successfully parsed a test result line. 409 """ 410 line = lines.peek() 411 match = TEST_RESULT.match(line) 412 skip_match = TEST_RESULT_SKIP.match(line) 413 414 # Check if line matches test result line format 415 if not match: 416 return False 417 lines.pop() 418 419 # Set name of test object 420 if skip_match: 421 test.name = skip_match.group(4) or skip_match.group(5) 422 else: 423 test.name = match.group(4) 424 425 # Check test num 426 num = int(match.group(2)) 427 if num != expected_num: 428 test.add_error(printer, f'Expected test number {expected_num} but found {num}') 429 430 # Set status of test object 431 status = match.group(1) 432 if skip_match: 433 test.status = TestStatus.SKIPPED 434 elif status == 'ok': 435 test.status = TestStatus.SUCCESS 436 else: 437 test.status = TestStatus.FAILURE 438 return True 439 440def parse_diagnostic(lines: LineStream) -> List[str]: 441 """ 442 Parse lines that do not match the format of a test result line or 443 test header line and returns them in list. 444 445 Line formats that are not parsed: 446 - '# Subtest: [test name]' 447 - '[ok|not ok] [test number] [-] [test name] [optional skip 448 directive]' 449 - 'KTAP version [version number]' 450 451 Parameters: 452 lines - LineStream of KTAP output to parse 453 454 Return: 455 Log of diagnostic lines 456 """ 457 log = [] # type: List[str] 458 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN] 459 while lines and not any(re.match(lines.peek()) 460 for re in non_diagnostic_lines): 461 log.append(lines.pop()) 462 return log 463 464 465# Printing helper methods: 466 467DIVIDER = '=' * 60 468 469def format_test_divider(message: str, len_message: int) -> str: 470 """ 471 Returns string with message centered in fixed width divider. 472 473 Example: 474 '===================== message example =====================' 475 476 Parameters: 477 message - message to be centered in divider line 478 len_message - length of the message to be printed such that 479 any characters of the color codes are not counted 480 481 Return: 482 String containing message centered in fixed width divider 483 """ 484 default_count = 3 # default number of dashes 485 len_1 = default_count 486 len_2 = default_count 487 difference = len(DIVIDER) - len_message - 2 # 2 spaces added 488 if difference > 0: 489 # calculate number of dashes for each side of the divider 490 len_1 = int(difference / 2) 491 len_2 = difference - len_1 492 return ('=' * len_1) + f' {message} ' + ('=' * len_2) 493 494def print_test_header(test: Test, printer: Printer) -> None: 495 """ 496 Prints test header with test name and optionally the expected number 497 of subtests. 498 499 Example: 500 '=================== example (2 subtests) ===================' 501 502 Parameters: 503 test - Test object representing current test being printed 504 printer - Printer object to output results 505 """ 506 message = test.name 507 if message != "": 508 # Add a leading space before the subtest counts only if a test name 509 # is provided using a "# Subtest" header line. 510 message += " " 511 if test.expected_count: 512 if test.expected_count == 1: 513 message += '(1 subtest)' 514 else: 515 message += f'({test.expected_count} subtests)' 516 printer.print_with_timestamp(format_test_divider(message, len(message))) 517 518def print_log(log: Iterable[str], printer: Printer) -> None: 519 """Prints all strings in saved log for test in yellow.""" 520 formatted = textwrap.dedent('\n'.join(log)) 521 for line in formatted.splitlines(): 522 printer.print_with_timestamp(printer.yellow(line)) 523 524def format_test_result(test: Test, printer: Printer) -> str: 525 """ 526 Returns string with formatted test result with colored status and test 527 name. 528 529 Example: 530 '[PASSED] example' 531 532 Parameters: 533 test - Test object representing current test being printed 534 printer - Printer object to output results 535 536 Return: 537 String containing formatted test result 538 """ 539 if test.status == TestStatus.SUCCESS: 540 return printer.green('[PASSED] ') + test.name 541 if test.status == TestStatus.SKIPPED: 542 return printer.yellow('[SKIPPED] ') + test.name 543 if test.status == TestStatus.NO_TESTS: 544 return printer.yellow('[NO TESTS RUN] ') + test.name 545 if test.status == TestStatus.TEST_CRASHED: 546 print_log(test.log, printer) 547 return stdout.red('[CRASHED] ') + test.name 548 print_log(test.log, printer) 549 return printer.red('[FAILED] ') + test.name 550 551def print_test_result(test: Test, printer: Printer) -> None: 552 """ 553 Prints result line with status of test. 554 555 Example: 556 '[PASSED] example' 557 558 Parameters: 559 test - Test object representing current test being printed 560 printer - Printer object 561 """ 562 printer.print_with_timestamp(format_test_result(test, printer)) 563 564def print_test_footer(test: Test, printer: Printer) -> None: 565 """ 566 Prints test footer with status of test. 567 568 Example: 569 '===================== [PASSED] example =====================' 570 571 Parameters: 572 test - Test object representing current test being printed 573 printer - Printer object to output results 574 """ 575 message = format_test_result(test, printer) 576 printer.print_with_timestamp(format_test_divider(message, 577 len(message) - printer.color_len())) 578 579def print_test(test: Test, failed_only: bool, printer: Printer) -> None: 580 """ 581 Prints Test object to given printer. For a child test, the result line is 582 printed. For a parent test, the test header, all child test results, and 583 the test footer are all printed. If failed_only is true, only failed/crashed 584 tests will be printed. 585 586 Parameters: 587 test - Test object to print 588 failed_only - True if only failed/crashed tests should be printed. 589 printer - Printer object to output results 590 """ 591 if test.name == "main": 592 printer.print_with_timestamp(DIVIDER) 593 for subtest in test.subtests: 594 print_test(subtest, failed_only, printer) 595 printer.print_with_timestamp(DIVIDER) 596 elif test.subtests != []: 597 if not failed_only or not test.ok_status(): 598 print_test_header(test, printer) 599 for subtest in test.subtests: 600 print_test(subtest, failed_only, printer) 601 print_test_footer(test, printer) 602 else: 603 if not failed_only or not test.ok_status(): 604 print_test_result(test, printer) 605 606def _summarize_failed_tests(test: Test) -> str: 607 """Tries to summarize all the failing subtests in `test`.""" 608 609 def failed_names(test: Test, parent_name: str) -> List[str]: 610 # Note: we use 'main' internally for the top-level test. 611 if not parent_name or parent_name == 'main': 612 full_name = test.name 613 else: 614 full_name = parent_name + '.' + test.name 615 616 if not test.subtests: # this is a leaf node 617 return [full_name] 618 619 # If all the children failed, just say this subtest failed. 620 # Don't summarize it down "the top-level test failed", though. 621 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()] 622 if parent_name and len(failed_subtests) == len(test.subtests): 623 return [full_name] 624 625 all_failures = [] # type: List[str] 626 for t in failed_subtests: 627 all_failures.extend(failed_names(t, full_name)) 628 return all_failures 629 630 failures = failed_names(test, '') 631 # If there are too many failures, printing them out will just be noisy. 632 if len(failures) > 10: # this is an arbitrary limit 633 return '' 634 635 return 'Failures: ' + ', '.join(failures) 636 637 638def print_summary_line(test: Test, printer: Printer) -> None: 639 """ 640 Prints summary line of test object. Color of line is dependent on 641 status of test. Color is green if test passes, yellow if test is 642 skipped, and red if the test fails or crashes. Summary line contains 643 counts of the statuses of the tests subtests or the test itself if it 644 has no subtests. 645 646 Example: 647 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0, 648 Errors: 0" 649 650 test - Test object representing current test being printed 651 printer - Printer object to output results 652 """ 653 if test.status == TestStatus.SUCCESS: 654 color = stdout.green 655 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS): 656 color = stdout.yellow 657 else: 658 color = stdout.red 659 printer.print_with_timestamp(color(f'Testing complete. {test.counts}')) 660 661 # Summarize failures that might have gone off-screen since we had a lot 662 # of tests (arbitrarily defined as >=100 for now). 663 if test.ok_status() or test.counts.total() < 100: 664 return 665 summarized = _summarize_failed_tests(test) 666 if not summarized: 667 return 668 printer.print_with_timestamp(color(summarized)) 669 670# Other methods: 671 672def bubble_up_test_results(test: Test) -> None: 673 """ 674 If the test has subtests, add the test counts of the subtests to the 675 test and check if any of the tests crashed and if so set the test 676 status to crashed. Otherwise if the test has no subtests add the 677 status of the test to the test counts. 678 679 Parameters: 680 test - Test object for current test being parsed 681 """ 682 subtests = test.subtests 683 counts = test.counts 684 status = test.status 685 for t in subtests: 686 counts.add_subtest_counts(t.counts) 687 if counts.total() == 0: 688 counts.add_status(status) 689 elif test.counts.get_status() == TestStatus.TEST_CRASHED: 690 test.status = TestStatus.TEST_CRASHED 691 692 if status == TestStatus.FAILURE and test.counts.get_status() == TestStatus.SUCCESS: 693 counts.add_status(status) 694 695def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test: 696 """ 697 Finds next test to parse in LineStream, creates new Test object, 698 parses any subtests of the test, populates Test object with all 699 information (status, name) about the test and the Test objects for 700 any subtests, and then returns the Test object. The method accepts 701 three formats of tests: 702 703 Accepted test formats: 704 705 - Main KTAP/TAP header 706 707 Example: 708 709 KTAP version 1 710 1..4 711 [subtests] 712 713 - Subtest header (must include either the KTAP version line or 714 "# Subtest" header line) 715 716 Example (preferred format with both KTAP version line and 717 "# Subtest" line): 718 719 KTAP version 1 720 # Subtest: name 721 1..3 722 [subtests] 723 ok 1 name 724 725 Example (only "# Subtest" line): 726 727 # Subtest: name 728 1..3 729 [subtests] 730 ok 1 name 731 732 Example (only KTAP version line, compliant with KTAP v1 spec): 733 734 KTAP version 1 735 1..3 736 [subtests] 737 ok 1 name 738 739 - Test result line 740 741 Example: 742 743 ok 1 - test 744 745 Parameters: 746 lines - LineStream of KTAP output to parse 747 expected_num - expected test number for test to be parsed 748 log - list of strings containing any preceding diagnostic lines 749 corresponding to the current test 750 is_subtest - boolean indicating whether test is a subtest 751 printer - Printer object to output results 752 753 Return: 754 Test object populated with characteristics and any subtests 755 """ 756 test = Test() 757 test.log.extend(log) 758 759 # Parse any errors prior to parsing tests 760 err_log = parse_diagnostic(lines) 761 test.log.extend(err_log) 762 763 if not is_subtest: 764 # If parsing the main/top-level test, parse KTAP version line and 765 # test plan 766 test.name = "main" 767 parse_ktap_header(lines, test, printer) 768 test.log.extend(parse_diagnostic(lines)) 769 parse_test_plan(lines, test) 770 parent_test = True 771 else: 772 # If not the main test, attempt to parse a test header containing 773 # the KTAP version line and/or subtest header line 774 ktap_line = parse_ktap_header(lines, test, printer) 775 subtest_line = parse_test_header(lines, test) 776 test.log.extend(parse_diagnostic(lines)) 777 parse_test_plan(lines, test) 778 parent_test = (ktap_line or subtest_line) 779 if parent_test: 780 print_test_header(test, printer) 781 782 expected_count = test.expected_count 783 subtests = [] 784 test_num = 1 785 while parent_test and (expected_count is None or test_num <= expected_count): 786 # Loop to parse any subtests. 787 # Break after parsing expected number of tests or 788 # if expected number of tests is unknown break when test 789 # result line with matching name to subtest header is found 790 # or no more lines in stream. 791 sub_log = parse_diagnostic(lines) 792 sub_test = Test() 793 if not lines or (peek_test_name_match(lines, test) and 794 is_subtest): 795 if expected_count and test_num <= expected_count: 796 # If parser reaches end of test before 797 # parsing expected number of subtests, print 798 # crashed subtest and record error 799 test.add_error(printer, 'missing expected subtest!') 800 sub_test.log.extend(sub_log) 801 test.counts.add_status( 802 TestStatus.TEST_CRASHED) 803 print_test_result(sub_test, printer) 804 else: 805 test.log.extend(sub_log) 806 break 807 else: 808 sub_test = parse_test(lines, test_num, sub_log, True, printer) 809 subtests.append(sub_test) 810 test_num += 1 811 test.subtests = subtests 812 if is_subtest: 813 # If not main test, look for test result line 814 test.log.extend(parse_diagnostic(lines)) 815 if test.name != "" and not peek_test_name_match(lines, test): 816 test.add_error(printer, 'missing subtest result line!') 817 elif not lines: 818 print_log(test.log, printer) 819 test.status = TestStatus.NO_TESTS 820 test.add_error(printer, 'No more test results!') 821 else: 822 parse_test_result(lines, test, expected_num, printer) 823 824 # Check for there being no subtests within parent test 825 if parent_test and len(subtests) == 0: 826 # Don't override a bad status if this test had one reported. 827 # Assumption: no subtests means CRASHED is from Test.__init__() 828 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS): 829 print_log(test.log, printer) 830 test.status = TestStatus.NO_TESTS 831 test.add_error(printer, '0 tests run!') 832 833 # Add statuses to TestCounts attribute in Test object 834 bubble_up_test_results(test) 835 if parent_test and is_subtest: 836 # If test has subtests and is not the main test object, print 837 # footer. 838 print_test_footer(test, printer) 839 elif is_subtest: 840 print_test_result(test, printer) 841 return test 842 843def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test: 844 """ 845 Using kernel output, extract KTAP lines, parse the lines for test 846 results and print condensed test results and summary line. 847 848 Parameters: 849 kernel_output - Iterable object contains lines of kernel output 850 printer - Printer object to output results 851 852 Return: 853 Test - the main test object with all subtests. 854 """ 855 printer.print_with_timestamp(DIVIDER) 856 lines = extract_tap_lines(kernel_output) 857 test = Test() 858 if not lines: 859 test.name = '<missing>' 860 test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?') 861 test.status = TestStatus.FAILURE_TO_PARSE_TESTS 862 else: 863 test = parse_test(lines, 0, [], False, printer) 864 if test.status != TestStatus.NO_TESTS: 865 test.status = test.counts.get_status() 866 printer.print_with_timestamp(DIVIDER) 867 return test 868