xref: /linux/tools/testing/kunit/kunit_parser.py (revision 30bbcb44707a97fcb62246bebc8b413b5ab293f8)
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13from dataclasses import dataclass
14import re
15import textwrap
16
17from enum import Enum, auto
18from typing import Iterable, Iterator, List, Optional, Tuple
19
20from kunit_printer import Printer, stdout
21
22class Test:
23	"""
24	A class to represent a test parsed from KTAP results. All KTAP
25	results within a test log are stored in a main Test object as
26	subtests.
27
28	Attributes:
29	status : TestStatus - status of the test
30	name : str - name of the test
31	expected_count : int - expected number of subtests (0 if single
32		test case and None if unknown expected number of subtests)
33	subtests : List[Test] - list of subtests
34	log : List[str] - log of KTAP lines that correspond to the test
35	counts : TestCounts - counts of the test statuses and errors of
36		subtests or of the test itself if the test is a single
37		test case.
38	"""
39	def __init__(self) -> None:
40		"""Creates Test object with default attributes."""
41		self.status = TestStatus.TEST_CRASHED
42		self.name = ''
43		self.expected_count = 0  # type: Optional[int]
44		self.subtests = []  # type: List[Test]
45		self.log = []  # type: List[str]
46		self.counts = TestCounts()
47
48	def __str__(self) -> str:
49		"""Returns string representation of a Test class object."""
50		return (f'Test({self.status}, {self.name}, {self.expected_count}, '
51			f'{self.subtests}, {self.log}, {self.counts})')
52
53	def __repr__(self) -> str:
54		"""Returns string representation of a Test class object."""
55		return str(self)
56
57	def add_error(self, printer: Printer, error_message: str) -> None:
58		"""Records an error that occurred while parsing this test."""
59		self.counts.errors += 1
60		printer.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
61
62	def ok_status(self) -> bool:
63		"""Returns true if the status was ok, i.e. passed or skipped."""
64		return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
65
66class TestStatus(Enum):
67	"""An enumeration class to represent the status of a test."""
68	SUCCESS = auto()
69	FAILURE = auto()
70	SKIPPED = auto()
71	TEST_CRASHED = auto()
72	NO_TESTS = auto()
73	FAILURE_TO_PARSE_TESTS = auto()
74
75@dataclass
76class TestCounts:
77	"""
78	Tracks the counts of statuses of all test cases and any errors within
79	a Test.
80	"""
81	passed: int = 0
82	failed: int = 0
83	crashed: int = 0
84	skipped: int = 0
85	errors: int = 0
86
87	def __str__(self) -> str:
88		"""Returns the string representation of a TestCounts object."""
89		statuses = [('passed', self.passed), ('failed', self.failed),
90			('crashed', self.crashed), ('skipped', self.skipped),
91			('errors', self.errors)]
92		return f'Ran {self.total()} tests: ' + \
93			', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
94
95	def total(self) -> int:
96		"""Returns the total number of test cases within a test
97		object, where a test case is a test with no subtests.
98		"""
99		return (self.passed + self.failed + self.crashed +
100			self.skipped)
101
102	def add_subtest_counts(self, counts: TestCounts) -> None:
103		"""
104		Adds the counts of another TestCounts object to the current
105		TestCounts object. Used to add the counts of a subtest to the
106		parent test.
107
108		Parameters:
109		counts - a different TestCounts object whose counts
110			will be added to the counts of the TestCounts object
111		"""
112		self.passed += counts.passed
113		self.failed += counts.failed
114		self.crashed += counts.crashed
115		self.skipped += counts.skipped
116		self.errors += counts.errors
117
118	def get_status(self) -> TestStatus:
119		"""Returns the aggregated status of a Test using test
120		counts.
121		"""
122		if self.total() == 0:
123			return TestStatus.NO_TESTS
124		if self.crashed:
125			# Crashes should take priority.
126			return TestStatus.TEST_CRASHED
127		if self.failed:
128			return TestStatus.FAILURE
129		if self.passed:
130			# No failures or crashes, looks good!
131			return TestStatus.SUCCESS
132		# We have only skipped tests.
133		return TestStatus.SKIPPED
134
135	def add_status(self, status: TestStatus) -> None:
136		"""Increments the count for `status`."""
137		if status == TestStatus.SUCCESS:
138			self.passed += 1
139		elif status == TestStatus.FAILURE:
140			self.failed += 1
141		elif status == TestStatus.SKIPPED:
142			self.skipped += 1
143		elif status != TestStatus.NO_TESTS:
144			self.crashed += 1
145
146class LineStream:
147	"""
148	A class to represent the lines of kernel output.
149	Provides a lazy peek()/pop() interface over an iterator of
150	(line#, text).
151	"""
152	_lines: Iterator[Tuple[int, str]]
153	_next: Tuple[int, str]
154	_need_next: bool
155	_done: bool
156
157	def __init__(self, lines: Iterator[Tuple[int, str]]):
158		"""Creates a new LineStream that wraps the given iterator."""
159		self._lines = lines
160		self._done = False
161		self._need_next = True
162		self._next = (0, '')
163
164	def _get_next(self) -> None:
165		"""Advances the LineSteam to the next line, if necessary."""
166		if not self._need_next:
167			return
168		try:
169			self._next = next(self._lines)
170		except StopIteration:
171			self._done = True
172		finally:
173			self._need_next = False
174
175	def peek(self) -> str:
176		"""Returns the current line, without advancing the LineStream.
177		"""
178		self._get_next()
179		return self._next[1]
180
181	def pop(self) -> str:
182		"""Returns the current line and advances the LineStream to
183		the next line.
184		"""
185		s = self.peek()
186		if self._done:
187			raise ValueError(f'LineStream: going past EOF, last line was {s}')
188		self._need_next = True
189		return s
190
191	def __bool__(self) -> bool:
192		"""Returns True if stream has more lines."""
193		self._get_next()
194		return not self._done
195
196	# Only used by kunit_tool_test.py.
197	def __iter__(self) -> Iterator[str]:
198		"""Empties all lines stored in LineStream object into
199		Iterator object and returns the Iterator object.
200		"""
201		while bool(self):
202			yield self.pop()
203
204	def line_number(self) -> int:
205		"""Returns the line number of the current line."""
206		self._get_next()
207		return self._next[0]
208
209# Parsing helper methods:
210
211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213KTAP_END = re.compile(r'\s*(List of all partitions:|'
214	'Kernel panic - not syncing: VFS:|reboot: System halted)')
215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218	"""Extracts KTAP lines from the kernel output."""
219	def isolate_ktap_output(kernel_output: Iterable[str]) \
220			-> Iterator[Tuple[int, str]]:
221		line_num = 0
222		started = False
223		for line in kernel_output:
224			line_num += 1
225			line = line.rstrip()  # remove trailing \n
226			if not started and KTAP_START.search(line):
227				# start extracting KTAP lines and set prefix
228				# to number of characters before version line
229				prefix_len = len(
230					line.split('KTAP version')[0])
231				started = True
232				yield line_num, line[prefix_len:]
233			elif not started and TAP_START.search(line):
234				# start extracting KTAP lines and set prefix
235				# to number of characters before version line
236				prefix_len = len(line.split('TAP version')[0])
237				started = True
238				yield line_num, line[prefix_len:]
239			elif started and KTAP_END.search(line):
240				# stop extracting KTAP lines
241				break
242			elif started:
243				# remove the prefix, if any.
244				line = line[prefix_len:]
245				yield line_num, line
246			elif EXECUTOR_ERROR.search(line):
247				yield line_num, line
248	return LineStream(lines=isolate_ktap_output(kernel_output))
249
250KTAP_VERSIONS = [1]
251TAP_VERSIONS = [13, 14]
252
253def check_version(version_num: int, accepted_versions: List[int],
254			version_type: str, test: Test, printer: Printer) -> None:
255	"""
256	Adds error to test object if version number is too high or too
257	low.
258
259	Parameters:
260	version_num - The inputted version number from the parsed KTAP or TAP
261		header line
262	accepted_version - List of accepted KTAP or TAP versions
263	version_type - 'KTAP' or 'TAP' depending on the type of
264		version line.
265	test - Test object for current test being parsed
266	printer - Printer object to output error
267	"""
268	if version_num < min(accepted_versions):
269		test.add_error(printer, f'{version_type} version lower than expected!')
270	elif version_num > max(accepted_versions):
271		test.add_error(printer, f'{version_type} version higer than expected!')
272
273def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool:
274	"""
275	Parses KTAP/TAP header line and checks version number.
276	Returns False if fails to parse KTAP/TAP header line.
277
278	Accepted formats:
279	- 'KTAP version [version number]'
280	- 'TAP version [version number]'
281
282	Parameters:
283	lines - LineStream of KTAP output to parse
284	test - Test object for current test being parsed
285	printer - Printer object to output results
286
287	Return:
288	True if successfully parsed KTAP/TAP header line
289	"""
290	ktap_match = KTAP_START.match(lines.peek())
291	tap_match = TAP_START.match(lines.peek())
292	if ktap_match:
293		version_num = int(ktap_match.group(1))
294		check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer)
295	elif tap_match:
296		version_num = int(tap_match.group(1))
297		check_version(version_num, TAP_VERSIONS, 'TAP', test, printer)
298	else:
299		return False
300	lines.pop()
301	return True
302
303TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
304
305def parse_test_header(lines: LineStream, test: Test) -> bool:
306	"""
307	Parses test header and stores test name in test object.
308	Returns False if fails to parse test header line.
309
310	Accepted format:
311	- '# Subtest: [test name]'
312
313	Parameters:
314	lines - LineStream of KTAP output to parse
315	test - Test object for current test being parsed
316
317	Return:
318	True if successfully parsed test header line
319	"""
320	match = TEST_HEADER.match(lines.peek())
321	if not match:
322		return False
323	test.name = match.group(1)
324	lines.pop()
325	return True
326
327TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
328
329def parse_test_plan(lines: LineStream, test: Test) -> bool:
330	"""
331	Parses test plan line and stores the expected number of subtests in
332	test object. Reports an error if expected count is 0.
333	Returns False and sets expected_count to None if there is no valid test
334	plan.
335
336	Accepted format:
337	- '1..[number of subtests]'
338
339	Parameters:
340	lines - LineStream of KTAP output to parse
341	test - Test object for current test being parsed
342
343	Return:
344	True if successfully parsed test plan line
345	"""
346	match = TEST_PLAN.match(lines.peek())
347	if not match:
348		test.expected_count = None
349		return False
350	expected_count = int(match.group(1))
351	test.expected_count = expected_count
352	lines.pop()
353	return True
354
355TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(- )?([^#]*)( # .*)?$')
356
357TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(- )?(.*) # SKIP ?(.*)$')
358
359def peek_test_name_match(lines: LineStream, test: Test) -> bool:
360	"""
361	Matches current line with the format of a test result line and checks
362	if the name matches the name of the current test.
363	Returns False if fails to match format or name.
364
365	Accepted format:
366	- '[ok|not ok] [test number] [-] [test name] [optional skip
367		directive]'
368
369	Parameters:
370	lines - LineStream of KTAP output to parse
371	test - Test object for current test being parsed
372
373	Return:
374	True if matched a test result line and the name matching the
375		expected test name
376	"""
377	line = lines.peek()
378	match = TEST_RESULT.match(line)
379	if not match:
380		return False
381	name = match.group(4)
382	if not name:
383		return False
384	return name == test.name
385
386def parse_test_result(lines: LineStream, test: Test,
387			expected_num: int, printer: Printer) -> bool:
388	"""
389	Parses test result line and stores the status and name in the test
390	object. Reports an error if the test number does not match expected
391	test number.
392	Returns False if fails to parse test result line.
393
394	Note that the SKIP directive is the only direction that causes a
395	change in status.
396
397	Accepted format:
398	- '[ok|not ok] [test number] [-] [test name] [optional skip
399		directive]'
400
401	Parameters:
402	lines - LineStream of KTAP output to parse
403	test - Test object for current test being parsed
404	expected_num - expected test number for current test
405	printer - Printer object to output results
406
407	Return:
408	True if successfully parsed a test result line.
409	"""
410	line = lines.peek()
411	match = TEST_RESULT.match(line)
412	skip_match = TEST_RESULT_SKIP.match(line)
413
414	# Check if line matches test result line format
415	if not match:
416		return False
417	lines.pop()
418
419	# Set name of test object
420	if skip_match:
421		test.name = skip_match.group(4) or skip_match.group(5)
422	else:
423		test.name = match.group(4)
424
425	# Check test num
426	num = int(match.group(2))
427	if num != expected_num:
428		test.add_error(printer, f'Expected test number {expected_num} but found {num}')
429
430	# Set status of test object
431	status = match.group(1)
432	if skip_match:
433		test.status = TestStatus.SKIPPED
434	elif status == 'ok':
435		test.status = TestStatus.SUCCESS
436	else:
437		test.status = TestStatus.FAILURE
438	return True
439
440def parse_diagnostic(lines: LineStream) -> List[str]:
441	"""
442	Parse lines that do not match the format of a test result line or
443	test header line and returns them in list.
444
445	Line formats that are not parsed:
446	- '# Subtest: [test name]'
447	- '[ok|not ok] [test number] [-] [test name] [optional skip
448		directive]'
449	- 'KTAP version [version number]'
450
451	Parameters:
452	lines - LineStream of KTAP output to parse
453
454	Return:
455	Log of diagnostic lines
456	"""
457	log = []  # type: List[str]
458	non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
459	while lines and not any(re.match(lines.peek())
460			for re in non_diagnostic_lines):
461		log.append(lines.pop())
462	return log
463
464
465# Printing helper methods:
466
467DIVIDER = '=' * 60
468
469def format_test_divider(message: str, len_message: int) -> str:
470	"""
471	Returns string with message centered in fixed width divider.
472
473	Example:
474	'===================== message example ====================='
475
476	Parameters:
477	message - message to be centered in divider line
478	len_message - length of the message to be printed such that
479		any characters of the color codes are not counted
480
481	Return:
482	String containing message centered in fixed width divider
483	"""
484	default_count = 3  # default number of dashes
485	len_1 = default_count
486	len_2 = default_count
487	difference = len(DIVIDER) - len_message - 2  # 2 spaces added
488	if difference > 0:
489		# calculate number of dashes for each side of the divider
490		len_1 = int(difference / 2)
491		len_2 = difference - len_1
492	return ('=' * len_1) + f' {message} ' + ('=' * len_2)
493
494def print_test_header(test: Test, printer: Printer) -> None:
495	"""
496	Prints test header with test name and optionally the expected number
497	of subtests.
498
499	Example:
500	'=================== example (2 subtests) ==================='
501
502	Parameters:
503	test - Test object representing current test being printed
504	printer - Printer object to output results
505	"""
506	message = test.name
507	if message != "":
508		# Add a leading space before the subtest counts only if a test name
509		# is provided using a "# Subtest" header line.
510		message += " "
511	if test.expected_count:
512		if test.expected_count == 1:
513			message += '(1 subtest)'
514		else:
515			message += f'({test.expected_count} subtests)'
516	printer.print_with_timestamp(format_test_divider(message, len(message)))
517
518def print_log(log: Iterable[str], printer: Printer) -> None:
519	"""Prints all strings in saved log for test in yellow."""
520	formatted = textwrap.dedent('\n'.join(log))
521	for line in formatted.splitlines():
522		printer.print_with_timestamp(printer.yellow(line))
523
524def format_test_result(test: Test, printer: Printer) -> str:
525	"""
526	Returns string with formatted test result with colored status and test
527	name.
528
529	Example:
530	'[PASSED] example'
531
532	Parameters:
533	test - Test object representing current test being printed
534	printer - Printer object to output results
535
536	Return:
537	String containing formatted test result
538	"""
539	if test.status == TestStatus.SUCCESS:
540		return printer.green('[PASSED] ') + test.name
541	if test.status == TestStatus.SKIPPED:
542		return printer.yellow('[SKIPPED] ') + test.name
543	if test.status == TestStatus.NO_TESTS:
544		return printer.yellow('[NO TESTS RUN] ') + test.name
545	if test.status == TestStatus.TEST_CRASHED:
546		print_log(test.log, printer)
547		return stdout.red('[CRASHED] ') + test.name
548	print_log(test.log, printer)
549	return printer.red('[FAILED] ') + test.name
550
551def print_test_result(test: Test, printer: Printer) -> None:
552	"""
553	Prints result line with status of test.
554
555	Example:
556	'[PASSED] example'
557
558	Parameters:
559	test - Test object representing current test being printed
560	printer - Printer object
561	"""
562	printer.print_with_timestamp(format_test_result(test, printer))
563
564def print_test_footer(test: Test, printer: Printer) -> None:
565	"""
566	Prints test footer with status of test.
567
568	Example:
569	'===================== [PASSED] example ====================='
570
571	Parameters:
572	test - Test object representing current test being printed
573	printer - Printer object to output results
574	"""
575	message = format_test_result(test, printer)
576	printer.print_with_timestamp(format_test_divider(message,
577		len(message) - printer.color_len()))
578
579def print_test(test: Test, failed_only: bool, printer: Printer) -> None:
580	"""
581	Prints Test object to given printer. For a child test, the result line is
582	printed. For a parent test, the test header, all child test results, and
583	the test footer are all printed. If failed_only is true, only failed/crashed
584	tests will be printed.
585
586	Parameters:
587	test - Test object to print
588	failed_only - True if only failed/crashed tests should be printed.
589	printer - Printer object to output results
590	"""
591	if test.name == "main":
592		printer.print_with_timestamp(DIVIDER)
593		for subtest in test.subtests:
594			print_test(subtest, failed_only, printer)
595		printer.print_with_timestamp(DIVIDER)
596	elif test.subtests != []:
597		if not failed_only or not test.ok_status():
598			print_test_header(test, printer)
599			for subtest in test.subtests:
600				print_test(subtest, failed_only, printer)
601			print_test_footer(test, printer)
602	else:
603		if not failed_only or not test.ok_status():
604			print_test_result(test, printer)
605
606def _summarize_failed_tests(test: Test) -> str:
607	"""Tries to summarize all the failing subtests in `test`."""
608
609	def failed_names(test: Test, parent_name: str) -> List[str]:
610		# Note: we use 'main' internally for the top-level test.
611		if not parent_name or parent_name == 'main':
612			full_name = test.name
613		else:
614			full_name = parent_name + '.' + test.name
615
616		if not test.subtests:  # this is a leaf node
617			return [full_name]
618
619		# If all the children failed, just say this subtest failed.
620		# Don't summarize it down "the top-level test failed", though.
621		failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
622		if parent_name and len(failed_subtests) ==  len(test.subtests):
623			return [full_name]
624
625		all_failures = []  # type: List[str]
626		for t in failed_subtests:
627			all_failures.extend(failed_names(t, full_name))
628		return all_failures
629
630	failures = failed_names(test, '')
631	# If there are too many failures, printing them out will just be noisy.
632	if len(failures) > 10:  # this is an arbitrary limit
633		return ''
634
635	return 'Failures: ' + ', '.join(failures)
636
637
638def print_summary_line(test: Test, printer: Printer) -> None:
639	"""
640	Prints summary line of test object. Color of line is dependent on
641	status of test. Color is green if test passes, yellow if test is
642	skipped, and red if the test fails or crashes. Summary line contains
643	counts of the statuses of the tests subtests or the test itself if it
644	has no subtests.
645
646	Example:
647	"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
648	Errors: 0"
649
650	test - Test object representing current test being printed
651	printer - Printer object to output results
652	"""
653	if test.status == TestStatus.SUCCESS:
654		color = stdout.green
655	elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
656		color = stdout.yellow
657	else:
658		color = stdout.red
659	printer.print_with_timestamp(color(f'Testing complete. {test.counts}'))
660
661	# Summarize failures that might have gone off-screen since we had a lot
662	# of tests (arbitrarily defined as >=100 for now).
663	if test.ok_status() or test.counts.total() < 100:
664		return
665	summarized = _summarize_failed_tests(test)
666	if not summarized:
667		return
668	printer.print_with_timestamp(color(summarized))
669
670# Other methods:
671
672def bubble_up_test_results(test: Test) -> None:
673	"""
674	If the test has subtests, add the test counts of the subtests to the
675	test and check if any of the tests crashed and if so set the test
676	status to crashed. Otherwise if the test has no subtests add the
677	status of the test to the test counts.
678
679	Parameters:
680	test - Test object for current test being parsed
681	"""
682	subtests = test.subtests
683	counts = test.counts
684	status = test.status
685	for t in subtests:
686		counts.add_subtest_counts(t.counts)
687	if counts.total() == 0:
688		counts.add_status(status)
689	elif test.counts.get_status() == TestStatus.TEST_CRASHED:
690		test.status = TestStatus.TEST_CRASHED
691
692def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test:
693	"""
694	Finds next test to parse in LineStream, creates new Test object,
695	parses any subtests of the test, populates Test object with all
696	information (status, name) about the test and the Test objects for
697	any subtests, and then returns the Test object. The method accepts
698	three formats of tests:
699
700	Accepted test formats:
701
702	- Main KTAP/TAP header
703
704	Example:
705
706	KTAP version 1
707	1..4
708	[subtests]
709
710	- Subtest header (must include either the KTAP version line or
711	  "# Subtest" header line)
712
713	Example (preferred format with both KTAP version line and
714	"# Subtest" line):
715
716	KTAP version 1
717	# Subtest: name
718	1..3
719	[subtests]
720	ok 1 name
721
722	Example (only "# Subtest" line):
723
724	# Subtest: name
725	1..3
726	[subtests]
727	ok 1 name
728
729	Example (only KTAP version line, compliant with KTAP v1 spec):
730
731	KTAP version 1
732	1..3
733	[subtests]
734	ok 1 name
735
736	- Test result line
737
738	Example:
739
740	ok 1 - test
741
742	Parameters:
743	lines - LineStream of KTAP output to parse
744	expected_num - expected test number for test to be parsed
745	log - list of strings containing any preceding diagnostic lines
746		corresponding to the current test
747	is_subtest - boolean indicating whether test is a subtest
748	printer - Printer object to output results
749
750	Return:
751	Test object populated with characteristics and any subtests
752	"""
753	test = Test()
754	test.log.extend(log)
755
756	# Parse any errors prior to parsing tests
757	err_log = parse_diagnostic(lines)
758	test.log.extend(err_log)
759
760	if not is_subtest:
761		# If parsing the main/top-level test, parse KTAP version line and
762		# test plan
763		test.name = "main"
764		parse_ktap_header(lines, test, printer)
765		test.log.extend(parse_diagnostic(lines))
766		parse_test_plan(lines, test)
767		parent_test = True
768	else:
769		# If not the main test, attempt to parse a test header containing
770		# the KTAP version line and/or subtest header line
771		ktap_line = parse_ktap_header(lines, test, printer)
772		subtest_line = parse_test_header(lines, test)
773		test.log.extend(parse_diagnostic(lines))
774		parse_test_plan(lines, test)
775		parent_test = (ktap_line or subtest_line)
776		if parent_test:
777			print_test_header(test, printer)
778
779	expected_count = test.expected_count
780	subtests = []
781	test_num = 1
782	while parent_test and (expected_count is None or test_num <= expected_count):
783		# Loop to parse any subtests.
784		# Break after parsing expected number of tests or
785		# if expected number of tests is unknown break when test
786		# result line with matching name to subtest header is found
787		# or no more lines in stream.
788		sub_log = parse_diagnostic(lines)
789		sub_test = Test()
790		if not lines or (peek_test_name_match(lines, test) and
791				is_subtest):
792			if expected_count and test_num <= expected_count:
793				# If parser reaches end of test before
794				# parsing expected number of subtests, print
795				# crashed subtest and record error
796				test.add_error(printer, 'missing expected subtest!')
797				sub_test.log.extend(sub_log)
798				test.counts.add_status(
799					TestStatus.TEST_CRASHED)
800				print_test_result(sub_test, printer)
801			else:
802				test.log.extend(sub_log)
803				break
804		else:
805			sub_test = parse_test(lines, test_num, sub_log, True, printer)
806		subtests.append(sub_test)
807		test_num += 1
808	test.subtests = subtests
809	if is_subtest:
810		# If not main test, look for test result line
811		test.log.extend(parse_diagnostic(lines))
812		if test.name != "" and not peek_test_name_match(lines, test):
813			test.add_error(printer, 'missing subtest result line!')
814		elif not lines:
815			print_log(test.log, printer)
816			test.status = TestStatus.NO_TESTS
817			test.add_error(printer, 'No more test results!')
818		else:
819			parse_test_result(lines, test, expected_num, printer)
820
821	# Check for there being no subtests within parent test
822	if parent_test and len(subtests) == 0:
823		# Don't override a bad status if this test had one reported.
824		# Assumption: no subtests means CRASHED is from Test.__init__()
825		if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
826			print_log(test.log, printer)
827			test.status = TestStatus.NO_TESTS
828			test.add_error(printer, '0 tests run!')
829
830	# Add statuses to TestCounts attribute in Test object
831	bubble_up_test_results(test)
832	if parent_test and is_subtest:
833		# If test has subtests and is not the main test object, print
834		# footer.
835		print_test_footer(test, printer)
836	elif is_subtest:
837		print_test_result(test, printer)
838	return test
839
840def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test:
841	"""
842	Using kernel output, extract KTAP lines, parse the lines for test
843	results and print condensed test results and summary line.
844
845	Parameters:
846	kernel_output - Iterable object contains lines of kernel output
847	printer - Printer object to output results
848
849	Return:
850	Test - the main test object with all subtests.
851	"""
852	printer.print_with_timestamp(DIVIDER)
853	lines = extract_tap_lines(kernel_output)
854	test = Test()
855	if not lines:
856		test.name = '<missing>'
857		test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?')
858		test.status = TestStatus.FAILURE_TO_PARSE_TESTS
859	else:
860		test = parse_test(lines, 0, [], False, printer)
861		if test.status != TestStatus.NO_TESTS:
862			test.status = test.counts.get_status()
863	printer.print_with_timestamp(DIVIDER)
864	return test
865