1# SPDX-License-Identifier: GPL-2.0 2# 3# Parses KTAP test results from a kernel dmesg log and incrementally prints 4# results with reader-friendly format. Stores and returns test results in a 5# Test object. 6# 7# Copyright (C) 2019, Google LLC. 8# Author: Felix Guo <felixguoxiuping@gmail.com> 9# Author: Brendan Higgins <brendanhiggins@google.com> 10# Author: Rae Moar <rmoar@google.com> 11 12from __future__ import annotations 13from dataclasses import dataclass 14import re 15import textwrap 16 17from enum import Enum, auto 18from typing import Iterable, Iterator, List, Optional, Tuple 19 20from kunit_printer import Printer 21 22class Test: 23 """ 24 A class to represent a test parsed from KTAP results. All KTAP 25 results within a test log are stored in a main Test object as 26 subtests. 27 28 Attributes: 29 status : TestStatus - status of the test 30 name : str - name of the test 31 expected_count : int - expected number of subtests (0 if single 32 test case and None if unknown expected number of subtests) 33 subtests : List[Test] - list of subtests 34 log : List[str] - log of KTAP lines that correspond to the test 35 counts : TestCounts - counts of the test statuses and errors of 36 subtests or of the test itself if the test is a single 37 test case. 38 """ 39 def __init__(self) -> None: 40 """Creates Test object with default attributes.""" 41 self.status = TestStatus.TEST_CRASHED 42 self.name = '' 43 self.expected_count = 0 # type: Optional[int] 44 self.subtests = [] # type: List[Test] 45 self.log = [] # type: List[str] 46 self.counts = TestCounts() 47 self.skip_reason = '' 48 49 def __str__(self) -> str: 50 """Returns string representation of a Test class object.""" 51 return (f'Test({self.status}, {self.name}, {self.expected_count}, ' 52 f'{self.subtests}, {self.log}, {self.counts}, {self.skip_reason})') 53 54 def __repr__(self) -> str: 55 """Returns string representation of a Test class object.""" 56 return str(self) 57 58 def add_error(self, printer: Printer, error_message: str) -> None: 59 """Records an error that occurred while parsing this test.""" 60 self.counts.errors += 1 61 printer.print_with_timestamp(printer.red('[ERROR]') + f' Test: {self.name}: {error_message}') 62 63 def ok_status(self) -> bool: 64 """Returns true if the status was ok, i.e. passed or skipped.""" 65 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED) 66 67class TestStatus(Enum): 68 """An enumeration class to represent the status of a test.""" 69 SUCCESS = auto() 70 FAILURE = auto() 71 SKIPPED = auto() 72 TEST_CRASHED = auto() 73 NO_TESTS = auto() 74 FAILURE_TO_PARSE_TESTS = auto() 75 76@dataclass 77class TestCounts: 78 """ 79 Tracks the counts of statuses of all test cases and any errors within 80 a Test. 81 """ 82 passed: int = 0 83 failed: int = 0 84 crashed: int = 0 85 skipped: int = 0 86 errors: int = 0 87 88 def __str__(self) -> str: 89 """Returns the string representation of a TestCounts object.""" 90 statuses = [('passed', self.passed), ('failed', self.failed), 91 ('crashed', self.crashed), ('skipped', self.skipped), 92 ('errors', self.errors)] 93 return f'Ran {self.total()} tests: ' + \ 94 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0) 95 96 def total(self) -> int: 97 """Returns the total number of test cases within a test 98 object, where a test case is a test with no subtests. 99 """ 100 return (self.passed + self.failed + self.crashed + 101 self.skipped) 102 103 def add_subtest_counts(self, counts: TestCounts) -> None: 104 """ 105 Adds the counts of another TestCounts object to the current 106 TestCounts object. Used to add the counts of a subtest to the 107 parent test. 108 109 Parameters: 110 counts - a different TestCounts object whose counts 111 will be added to the counts of the TestCounts object 112 """ 113 self.passed += counts.passed 114 self.failed += counts.failed 115 self.crashed += counts.crashed 116 self.skipped += counts.skipped 117 self.errors += counts.errors 118 119 def get_status(self) -> TestStatus: 120 """Returns the aggregated status of a Test using test 121 counts. 122 """ 123 if self.total() == 0: 124 return TestStatus.NO_TESTS 125 if self.crashed: 126 # Crashes should take priority. 127 return TestStatus.TEST_CRASHED 128 if self.failed: 129 return TestStatus.FAILURE 130 if self.passed: 131 # No failures or crashes, looks good! 132 return TestStatus.SUCCESS 133 # We have only skipped tests. 134 return TestStatus.SKIPPED 135 136 def add_status(self, status: TestStatus) -> None: 137 """Increments the count for `status`.""" 138 if status == TestStatus.SUCCESS: 139 self.passed += 1 140 elif status == TestStatus.FAILURE: 141 self.failed += 1 142 elif status == TestStatus.SKIPPED: 143 self.skipped += 1 144 elif status != TestStatus.NO_TESTS: 145 self.crashed += 1 146 147class LineStream: 148 """ 149 A class to represent the lines of kernel output. 150 Provides a lazy peek()/pop() interface over an iterator of 151 (line#, text). 152 """ 153 _lines: Iterator[Tuple[int, str]] 154 _next: Tuple[int, str] 155 _need_next: bool 156 _done: bool 157 158 def __init__(self, lines: Iterator[Tuple[int, str]]): 159 """Creates a new LineStream that wraps the given iterator.""" 160 self._lines = lines 161 self._done = False 162 self._need_next = True 163 self._next = (0, '') 164 165 def _get_next(self) -> None: 166 """Advances the LineSteam to the next line, if necessary.""" 167 if not self._need_next: 168 return 169 try: 170 self._next = next(self._lines) 171 except StopIteration: 172 self._done = True 173 finally: 174 self._need_next = False 175 176 def peek(self) -> str: 177 """Returns the current line, without advancing the LineStream. 178 """ 179 self._get_next() 180 return self._next[1] 181 182 def pop(self) -> str: 183 """Returns the current line and advances the LineStream to 184 the next line. 185 """ 186 s = self.peek() 187 if self._done: 188 raise ValueError(f'LineStream: going past EOF, last line was {s}') 189 self._need_next = True 190 return s 191 192 def __bool__(self) -> bool: 193 """Returns True if stream has more lines.""" 194 self._get_next() 195 return not self._done 196 197 # Only used by kunit_tool_test.py. 198 def __iter__(self) -> Iterator[str]: 199 """Empties all lines stored in LineStream object into 200 Iterator object and returns the Iterator object. 201 """ 202 while bool(self): 203 yield self.pop() 204 205 def line_number(self) -> int: 206 """Returns the line number of the current line.""" 207 self._get_next() 208 return self._next[0] 209 210# Parsing helper methods: 211 212KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$') 213TAP_START = re.compile(r'\s*TAP version ([0-9]+)$') 214KTAP_END = re.compile(r'\s*(List of all partitions:|' 215 'Kernel panic - not syncing: VFS:|reboot: System halted)') 216EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$') 217 218def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: 219 """Extracts KTAP lines from the kernel output.""" 220 def isolate_ktap_output(kernel_output: Iterable[str]) \ 221 -> Iterator[Tuple[int, str]]: 222 line_num = 0 223 started = False 224 for line in kernel_output: 225 line_num += 1 226 line = line.rstrip() # remove trailing \n 227 if not started and KTAP_START.search(line): 228 # start extracting KTAP lines and set prefix 229 # to number of characters before version line 230 prefix_len = len( 231 line.split('KTAP version')[0]) 232 started = True 233 yield line_num, line[prefix_len:] 234 elif not started and TAP_START.search(line): 235 # start extracting KTAP lines and set prefix 236 # to number of characters before version line 237 prefix_len = len(line.split('TAP version')[0]) 238 started = True 239 yield line_num, line[prefix_len:] 240 elif started and KTAP_END.search(line): 241 # stop extracting KTAP lines 242 break 243 elif started: 244 # remove the prefix, if any. 245 line = line[prefix_len:] 246 yield line_num, line 247 elif EXECUTOR_ERROR.search(line): 248 yield line_num, line 249 return LineStream(lines=isolate_ktap_output(kernel_output)) 250 251KTAP_VERSIONS = [1] 252TAP_VERSIONS = [13, 14] 253 254def check_version(version_num: int, accepted_versions: List[int], 255 version_type: str, test: Test, printer: Printer) -> None: 256 """ 257 Adds error to test object if version number is too high or too 258 low. 259 260 Parameters: 261 version_num - The inputted version number from the parsed KTAP or TAP 262 header line 263 accepted_version - List of accepted KTAP or TAP versions 264 version_type - 'KTAP' or 'TAP' depending on the type of 265 version line. 266 test - Test object for current test being parsed 267 printer - Printer object to output error 268 """ 269 if version_num < min(accepted_versions): 270 test.add_error(printer, f'{version_type} version lower than expected!') 271 elif version_num > max(accepted_versions): 272 test.add_error(printer, f'{version_type} version higher than expected!') 273 274def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool: 275 """ 276 Parses KTAP/TAP header line and checks version number. 277 Returns False if fails to parse KTAP/TAP header line. 278 279 Accepted formats: 280 - 'KTAP version [version number]' 281 - 'TAP version [version number]' 282 283 Parameters: 284 lines - LineStream of KTAP output to parse 285 test - Test object for current test being parsed 286 printer - Printer object to output results 287 288 Return: 289 True if successfully parsed KTAP/TAP header line 290 """ 291 ktap_match = KTAP_START.match(lines.peek()) 292 tap_match = TAP_START.match(lines.peek()) 293 if ktap_match: 294 version_num = int(ktap_match.group(1)) 295 check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer) 296 elif tap_match: 297 version_num = int(tap_match.group(1)) 298 check_version(version_num, TAP_VERSIONS, 'TAP', test, printer) 299 else: 300 return False 301 lines.pop() 302 return True 303 304TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$') 305 306def parse_test_header(lines: LineStream, test: Test) -> bool: 307 """ 308 Parses test header and stores test name in test object. 309 Returns False if fails to parse test header line. 310 311 Accepted format: 312 - '# Subtest: [test name]' 313 314 Parameters: 315 lines - LineStream of KTAP output to parse 316 test - Test object for current test being parsed 317 318 Return: 319 True if successfully parsed test header line 320 """ 321 match = TEST_HEADER.match(lines.peek()) 322 if not match: 323 return False 324 test.name = match.group(1) 325 lines.pop() 326 return True 327 328TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)') 329 330def parse_test_plan(lines: LineStream, test: Test) -> bool: 331 """ 332 Parses test plan line and stores the expected number of subtests in 333 test object. Reports an error if expected count is 0. 334 Returns False and sets expected_count to None if there is no valid test 335 plan. 336 337 Accepted format: 338 - '1..[number of subtests]' 339 340 Parameters: 341 lines - LineStream of KTAP output to parse 342 test - Test object for current test being parsed 343 344 Return: 345 True if successfully parsed test plan line 346 """ 347 match = TEST_PLAN.match(lines.peek()) 348 if not match: 349 test.expected_count = None 350 return False 351 expected_count = int(match.group(1)) 352 test.expected_count = expected_count 353 lines.pop() 354 return True 355 356TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(:?- )?([^#]*)( # .*)?$') 357 358TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(:?- )?(.*) # SKIP ?(.*)$') 359 360def peek_test_name_match(lines: LineStream, test: Test) -> bool: 361 """ 362 Matches current line with the format of a test result line and checks 363 if the name matches the name of the current test. 364 Returns False if fails to match format or name. 365 366 Accepted format: 367 - '[ok|not ok] [test number] [-] [test name] [optional skip 368 directive]' 369 370 Parameters: 371 lines - LineStream of KTAP output to parse 372 test - Test object for current test being parsed 373 374 Return: 375 True if matched a test result line and the name matching the 376 expected test name 377 """ 378 line = lines.peek() 379 match = TEST_RESULT.match(line) 380 if not match: 381 return False 382 name = match.group(4) 383 if not name: 384 return False 385 return name == test.name 386 387def parse_test_result(lines: LineStream, test: Test, 388 expected_num: int, printer: Printer) -> bool: 389 """ 390 Parses test result line and stores the status and name in the test 391 object. Reports an error if the test number does not match expected 392 test number. 393 Returns False if fails to parse test result line. 394 395 Note that the SKIP directive is the only direction that causes a 396 change in status. 397 398 Accepted format: 399 - '[ok|not ok] [test number] [-] [test name] [optional skip 400 directive]' 401 402 Parameters: 403 lines - LineStream of KTAP output to parse 404 test - Test object for current test being parsed 405 expected_num - expected test number for current test 406 printer - Printer object to output results 407 408 Return: 409 True if successfully parsed a test result line. 410 """ 411 line = lines.peek() 412 match = TEST_RESULT.match(line) 413 skip_match = TEST_RESULT_SKIP.match(line) 414 415 # Check if line matches test result line format 416 if not match: 417 return False 418 lines.pop() 419 420 # Set name of test object 421 if skip_match: 422 test.name = skip_match.group(4) 423 else: 424 test.name = match.group(4) 425 426 # Check test num 427 num = int(match.group(2)) 428 if num != expected_num: 429 test.add_error(printer, f'Expected test number {expected_num} but found {num}') 430 431 # Set status of test object 432 status = match.group(1) 433 if skip_match: 434 test.status = TestStatus.SKIPPED 435 test.skip_reason = skip_match.group(5) or '' 436 elif status == 'ok': 437 test.status = TestStatus.SUCCESS 438 else: 439 test.status = TestStatus.FAILURE 440 return True 441 442def parse_diagnostic(lines: LineStream) -> List[str]: 443 """ 444 Parse lines that do not match the format of a test result line or 445 test header line and returns them in list. 446 447 Line formats that are not parsed: 448 - '# Subtest: [test name]' 449 - '[ok|not ok] [test number] [-] [test name] [optional skip 450 directive]' 451 - 'KTAP version [version number]' 452 453 Parameters: 454 lines - LineStream of KTAP output to parse 455 456 Return: 457 Log of diagnostic lines 458 """ 459 log = [] # type: List[str] 460 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN] 461 while lines and not any(re.match(lines.peek()) 462 for re in non_diagnostic_lines): 463 log.append(lines.pop()) 464 return log 465 466 467# Printing helper methods: 468 469DIVIDER = '=' * 60 470 471def format_test_divider(message: str, len_message: int) -> str: 472 """ 473 Returns string with message centered in fixed width divider. 474 475 Example: 476 '===================== message example =====================' 477 478 Parameters: 479 message - message to be centered in divider line 480 len_message - length of the message to be printed such that 481 any characters of the color codes are not counted 482 483 Return: 484 String containing message centered in fixed width divider 485 """ 486 default_count = 3 # default number of dashes 487 len_1 = default_count 488 len_2 = default_count 489 difference = len(DIVIDER) - len_message - 2 # 2 spaces added 490 if difference > 0: 491 # calculate number of dashes for each side of the divider 492 len_1 = int(difference / 2) 493 len_2 = difference - len_1 494 return ('=' * len_1) + f' {message} ' + ('=' * len_2) 495 496def print_test_header(test: Test, printer: Printer) -> None: 497 """ 498 Prints test header with test name and optionally the expected number 499 of subtests. 500 501 Example: 502 '=================== example (2 subtests) ===================' 503 504 Parameters: 505 test - Test object representing current test being printed 506 printer - Printer object to output results 507 """ 508 message = test.name 509 if message != "": 510 # Add a leading space before the subtest counts only if a test name 511 # is provided using a "# Subtest" header line. 512 message += " " 513 if test.expected_count: 514 if test.expected_count == 1: 515 message += '(1 subtest)' 516 else: 517 message += f'({test.expected_count} subtests)' 518 printer.print_with_timestamp(format_test_divider(message, len(message))) 519 520def print_log(log: Iterable[str], printer: Printer) -> None: 521 """Prints all strings in saved log for test in yellow.""" 522 formatted = textwrap.dedent('\n'.join(log)) 523 for line in formatted.splitlines(): 524 printer.print_with_timestamp(printer.yellow(line)) 525 526def format_test_result(test: Test, printer: Printer) -> str: 527 """ 528 Returns string with formatted test result with colored status and test 529 name. 530 531 Example: 532 '[PASSED] example' 533 534 Parameters: 535 test - Test object representing current test being printed 536 printer - Printer object to output results 537 538 Return: 539 String containing formatted test result 540 """ 541 if test.status == TestStatus.SUCCESS: 542 return printer.green('[PASSED] ') + test.name 543 if test.status == TestStatus.SKIPPED: 544 skip_message = printer.yellow('[SKIPPED] ') + test.name 545 if test.skip_reason != '': 546 skip_message += printer.yellow(' (' + test.skip_reason + ')') 547 return skip_message 548 if test.status == TestStatus.NO_TESTS: 549 return printer.yellow('[NO TESTS RUN] ') + test.name 550 if test.status == TestStatus.TEST_CRASHED: 551 print_log(test.log, printer) 552 return printer.red('[CRASHED] ') + test.name 553 print_log(test.log, printer) 554 return printer.red('[FAILED] ') + test.name 555 556def print_test_result(test: Test, printer: Printer) -> None: 557 """ 558 Prints result line with status of test. 559 560 Example: 561 '[PASSED] example' 562 563 Parameters: 564 test - Test object representing current test being printed 565 printer - Printer object 566 """ 567 printer.print_with_timestamp(format_test_result(test, printer)) 568 569def print_test_footer(test: Test, printer: Printer) -> None: 570 """ 571 Prints test footer with status of test. 572 573 Example: 574 '===================== [PASSED] example =====================' 575 576 Parameters: 577 test - Test object representing current test being printed 578 printer - Printer object to output results 579 """ 580 message = format_test_result(test, printer) 581 printer.print_with_timestamp(format_test_divider(message, 582 len(message) - printer.color_len())) 583 584def print_test(test: Test, failed_only: bool, printer: Printer) -> None: 585 """ 586 Prints Test object to given printer. For a child test, the result line is 587 printed. For a parent test, the test header, all child test results, and 588 the test footer are all printed. If failed_only is true, only failed/crashed 589 tests will be printed. 590 591 Parameters: 592 test - Test object to print 593 failed_only - True if only failed/crashed tests should be printed. 594 printer - Printer object to output results 595 """ 596 if test.name == "main": 597 printer.print_with_timestamp(DIVIDER) 598 for subtest in test.subtests: 599 print_test(subtest, failed_only, printer) 600 printer.print_with_timestamp(DIVIDER) 601 elif test.subtests != []: 602 if not failed_only or not test.ok_status(): 603 print_test_header(test, printer) 604 for subtest in test.subtests: 605 print_test(subtest, failed_only, printer) 606 print_test_footer(test, printer) 607 else: 608 if not failed_only or not test.ok_status(): 609 print_test_result(test, printer) 610 611def _summarize_failed_tests(test: Test) -> str: 612 """Tries to summarize all the failing subtests in `test`.""" 613 614 def failed_names(test: Test, parent_name: str) -> List[str]: 615 # Note: we use 'main' internally for the top-level test. 616 if not parent_name or parent_name == 'main': 617 full_name = test.name 618 else: 619 full_name = parent_name + '.' + test.name 620 621 if not test.subtests: # this is a leaf node 622 return [full_name] 623 624 # If all the children failed, just say this subtest failed. 625 # Don't summarize it down "the top-level test failed", though. 626 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()] 627 if parent_name and len(failed_subtests) == len(test.subtests): 628 return [full_name] 629 630 all_failures = [] # type: List[str] 631 for t in failed_subtests: 632 all_failures.extend(failed_names(t, full_name)) 633 return all_failures 634 635 failures = failed_names(test, '') 636 # If there are too many failures, printing them out will just be noisy. 637 if len(failures) > 10: # this is an arbitrary limit 638 return '' 639 640 return 'Failures: ' + ', '.join(failures) 641 642 643def print_summary_line(test: Test, printer: Printer) -> None: 644 """ 645 Prints summary line of test object. Color of line is dependent on 646 status of test. Color is green if test passes, yellow if test is 647 skipped, and red if the test fails or crashes. Summary line contains 648 counts of the statuses of the tests subtests or the test itself if it 649 has no subtests. 650 651 Example: 652 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0, 653 Errors: 0" 654 655 test - Test object representing current test being printed 656 printer - Printer object to output results 657 """ 658 if test.status == TestStatus.SUCCESS: 659 color = printer.green 660 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS): 661 color = printer.yellow 662 else: 663 color = printer.red 664 printer.print_with_timestamp(color(f'Testing complete. {test.counts}')) 665 666 # Summarize failures that might have gone off-screen since we had a lot 667 # of tests (arbitrarily defined as >=100 for now). 668 if test.ok_status() or test.counts.total() < 100: 669 return 670 summarized = _summarize_failed_tests(test) 671 if not summarized: 672 return 673 printer.print_with_timestamp(color(summarized)) 674 675# Other methods: 676 677def bubble_up_test_results(test: Test) -> None: 678 """ 679 If the test has subtests, add the test counts of the subtests to the 680 test and check if any of the tests crashed and if so set the test 681 status to crashed. Otherwise if the test has no subtests add the 682 status of the test to the test counts. 683 684 Parameters: 685 test - Test object for current test being parsed 686 """ 687 subtests = test.subtests 688 counts = test.counts 689 status = test.status 690 for t in subtests: 691 counts.add_subtest_counts(t.counts) 692 if counts.total() == 0: 693 counts.add_status(status) 694 elif test.counts.get_status() == TestStatus.TEST_CRASHED: 695 test.status = TestStatus.TEST_CRASHED 696 697 if status == TestStatus.FAILURE and test.counts.get_status() == TestStatus.SUCCESS: 698 counts.add_status(status) 699 700def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test: 701 """ 702 Finds next test to parse in LineStream, creates new Test object, 703 parses any subtests of the test, populates Test object with all 704 information (status, name) about the test and the Test objects for 705 any subtests, and then returns the Test object. The method accepts 706 three formats of tests: 707 708 Accepted test formats: 709 710 - Main KTAP/TAP header 711 712 Example: 713 714 KTAP version 1 715 1..4 716 [subtests] 717 718 - Subtest header (must include either the KTAP version line or 719 "# Subtest" header line) 720 721 Example (preferred format with both KTAP version line and 722 "# Subtest" line): 723 724 KTAP version 1 725 # Subtest: name 726 1..3 727 [subtests] 728 ok 1 name 729 730 Example (only "# Subtest" line): 731 732 # Subtest: name 733 1..3 734 [subtests] 735 ok 1 name 736 737 Example (only KTAP version line, compliant with KTAP v1 spec): 738 739 KTAP version 1 740 1..3 741 [subtests] 742 ok 1 name 743 744 - Test result line 745 746 Example: 747 748 ok 1 - test 749 750 Parameters: 751 lines - LineStream of KTAP output to parse 752 expected_num - expected test number for test to be parsed 753 log - list of strings containing any preceding diagnostic lines 754 corresponding to the current test 755 is_subtest - boolean indicating whether test is a subtest 756 printer - Printer object to output results 757 758 Return: 759 Test object populated with characteristics and any subtests 760 """ 761 test = Test() 762 test.log.extend(log) 763 764 # Parse any errors prior to parsing tests 765 err_log = parse_diagnostic(lines) 766 test.log.extend(err_log) 767 768 if not is_subtest: 769 # If parsing the main/top-level test, parse KTAP version line and 770 # test plan 771 test.name = "main" 772 parse_ktap_header(lines, test, printer) 773 test.log.extend(parse_diagnostic(lines)) 774 parse_test_plan(lines, test) 775 parent_test = True 776 else: 777 # If not the main test, attempt to parse a test header containing 778 # the KTAP version line and/or subtest header line 779 ktap_line = parse_ktap_header(lines, test, printer) 780 subtest_line = parse_test_header(lines, test) 781 test.log.extend(parse_diagnostic(lines)) 782 parse_test_plan(lines, test) 783 parent_test = (ktap_line or subtest_line) 784 if parent_test: 785 print_test_header(test, printer) 786 787 expected_count = test.expected_count 788 subtests = [] 789 test_num = 1 790 while parent_test and (expected_count is None or test_num <= expected_count): 791 # Loop to parse any subtests. 792 # Break after parsing expected number of tests or 793 # if expected number of tests is unknown break when test 794 # result line with matching name to subtest header is found 795 # or no more lines in stream. 796 sub_log = parse_diagnostic(lines) 797 sub_test = Test() 798 if not lines or (peek_test_name_match(lines, test) and 799 is_subtest): 800 if expected_count and test_num <= expected_count: 801 # If parser reaches end of test before 802 # parsing expected number of subtests, print 803 # crashed subtest and record error 804 test.add_error(printer, 'missing expected subtest!') 805 sub_test.log.extend(sub_log) 806 test.counts.add_status( 807 TestStatus.TEST_CRASHED) 808 print_test_result(sub_test, printer) 809 else: 810 test.log.extend(sub_log) 811 break 812 else: 813 sub_test = parse_test(lines, test_num, sub_log, True, printer) 814 subtests.append(sub_test) 815 test_num += 1 816 test.subtests = subtests 817 if is_subtest: 818 # If not main test, look for test result line 819 test.log.extend(parse_diagnostic(lines)) 820 if test.name != "" and not peek_test_name_match(lines, test): 821 test.add_error(printer, 'missing subtest result line!') 822 elif not lines: 823 print_log(test.log, printer) 824 test.status = TestStatus.NO_TESTS 825 test.add_error(printer, 'No more test results!') 826 else: 827 parse_test_result(lines, test, expected_num, printer) 828 829 # Check for there being no subtests within parent test 830 if parent_test and len(subtests) == 0: 831 # Don't override a bad status if this test had one reported. 832 # Assumption: no subtests means CRASHED is from Test.__init__() 833 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS): 834 print_log(test.log, printer) 835 test.status = TestStatus.NO_TESTS 836 test.add_error(printer, '0 tests run!') 837 838 # Add statuses to TestCounts attribute in Test object 839 bubble_up_test_results(test) 840 if parent_test and is_subtest: 841 # If test has subtests and is not the main test object, print 842 # footer. 843 print_test_footer(test, printer) 844 elif is_subtest: 845 print_test_result(test, printer) 846 return test 847 848def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test: 849 """ 850 Using kernel output, extract KTAP lines, parse the lines for test 851 results and print condensed test results and summary line. 852 853 Parameters: 854 kernel_output - Iterable object contains lines of kernel output 855 printer - Printer object to output results 856 857 Return: 858 Test - the main test object with all subtests. 859 """ 860 printer.print_with_timestamp(DIVIDER) 861 lines = extract_tap_lines(kernel_output) 862 test = Test() 863 if not lines: 864 test.name = '<missing>' 865 test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?\n' + 866 'Try running with the --raw_output=all option to see any log messages.') 867 test.status = TestStatus.FAILURE_TO_PARSE_TESTS 868 else: 869 test = parse_test(lines, 0, [], False, printer) 870 if test.status != TestStatus.NO_TESTS: 871 test.status = test.counts.get_status() 872 printer.print_with_timestamp(DIVIDER) 873 return test 874