xref: /linux/tools/testing/kunit/kunit_parser.py (revision 6b3f7af57881f6d6250c6dcc4d910fe8e855a607)
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13from dataclasses import dataclass
14import re
15import textwrap
16
17from enum import Enum, auto
18from typing import Iterable, Iterator, List, Optional, Tuple
19
20from kunit_printer import Printer
21
22class Test:
23	"""
24	A class to represent a test parsed from KTAP results. All KTAP
25	results within a test log are stored in a main Test object as
26	subtests.
27
28	Attributes:
29	status : TestStatus - status of the test
30	name : str - name of the test
31	expected_count : int - expected number of subtests (0 if single
32		test case and None if unknown expected number of subtests)
33	subtests : List[Test] - list of subtests
34	log : List[str] - log of KTAP lines that correspond to the test
35	counts : TestCounts - counts of the test statuses and errors of
36		subtests or of the test itself if the test is a single
37		test case.
38	"""
39	def __init__(self) -> None:
40		"""Creates Test object with default attributes."""
41		self.status = TestStatus.TEST_CRASHED
42		self.name = ''
43		self.expected_count = 0  # type: Optional[int]
44		self.subtests = []  # type: List[Test]
45		self.log = []  # type: List[str]
46		self.counts = TestCounts()
47		self.skip_reason = ''
48
49	def __str__(self) -> str:
50		"""Returns string representation of a Test class object."""
51		return (f'Test({self.status}, {self.name}, {self.expected_count}, '
52			f'{self.subtests}, {self.log}, {self.counts}, {self.skip_reason})')
53
54	def __repr__(self) -> str:
55		"""Returns string representation of a Test class object."""
56		return str(self)
57
58	def add_error(self, printer: Printer, error_message: str) -> None:
59		"""Records an error that occurred while parsing this test."""
60		self.counts.errors += 1
61		printer.print_with_timestamp(printer.red('[ERROR]') + f' Test: {self.name}: {error_message}')
62
63	def ok_status(self) -> bool:
64		"""Returns true if the status was ok, i.e. passed or skipped."""
65		return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
66
67class TestStatus(Enum):
68	"""An enumeration class to represent the status of a test."""
69	SUCCESS = auto()
70	FAILURE = auto()
71	SKIPPED = auto()
72	TEST_CRASHED = auto()
73	NO_TESTS = auto()
74	FAILURE_TO_PARSE_TESTS = auto()
75
76@dataclass
77class TestCounts:
78	"""
79	Tracks the counts of statuses of all test cases and any errors within
80	a Test.
81	"""
82	passed: int = 0
83	failed: int = 0
84	crashed: int = 0
85	skipped: int = 0
86	errors: int = 0
87
88	def __str__(self) -> str:
89		"""Returns the string representation of a TestCounts object."""
90		statuses = [('passed', self.passed), ('failed', self.failed),
91			('crashed', self.crashed), ('skipped', self.skipped),
92			('errors', self.errors)]
93		return f'Ran {self.total()} tests: ' + \
94			', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
95
96	def total(self) -> int:
97		"""Returns the total number of test cases within a test
98		object, where a test case is a test with no subtests.
99		"""
100		return (self.passed + self.failed + self.crashed +
101			self.skipped)
102
103	def add_subtest_counts(self, counts: TestCounts) -> None:
104		"""
105		Adds the counts of another TestCounts object to the current
106		TestCounts object. Used to add the counts of a subtest to the
107		parent test.
108
109		Parameters:
110		counts - a different TestCounts object whose counts
111			will be added to the counts of the TestCounts object
112		"""
113		self.passed += counts.passed
114		self.failed += counts.failed
115		self.crashed += counts.crashed
116		self.skipped += counts.skipped
117		self.errors += counts.errors
118
119	def get_status(self) -> TestStatus:
120		"""Returns the aggregated status of a Test using test
121		counts.
122		"""
123		if self.total() == 0:
124			return TestStatus.NO_TESTS
125		if self.crashed:
126			# Crashes should take priority.
127			return TestStatus.TEST_CRASHED
128		if self.failed:
129			return TestStatus.FAILURE
130		if self.passed:
131			# No failures or crashes, looks good!
132			return TestStatus.SUCCESS
133		# We have only skipped tests.
134		return TestStatus.SKIPPED
135
136	def add_status(self, status: TestStatus) -> None:
137		"""Increments the count for `status`."""
138		if status == TestStatus.SUCCESS:
139			self.passed += 1
140		elif status == TestStatus.FAILURE:
141			self.failed += 1
142		elif status == TestStatus.SKIPPED:
143			self.skipped += 1
144		elif status != TestStatus.NO_TESTS:
145			self.crashed += 1
146
147class LineStream:
148	"""
149	A class to represent the lines of kernel output.
150	Provides a lazy peek()/pop() interface over an iterator of
151	(line#, text).
152	"""
153	_lines: Iterator[Tuple[int, str]]
154	_next: Tuple[int, str]
155	_need_next: bool
156	_done: bool
157
158	def __init__(self, lines: Iterator[Tuple[int, str]]):
159		"""Creates a new LineStream that wraps the given iterator."""
160		self._lines = lines
161		self._done = False
162		self._need_next = True
163		self._next = (0, '')
164
165	def _get_next(self) -> None:
166		"""Advances the LineSteam to the next line, if necessary."""
167		if not self._need_next:
168			return
169		try:
170			self._next = next(self._lines)
171		except StopIteration:
172			self._done = True
173		finally:
174			self._need_next = False
175
176	def peek(self) -> str:
177		"""Returns the current line, without advancing the LineStream.
178		"""
179		self._get_next()
180		return self._next[1]
181
182	def pop(self) -> str:
183		"""Returns the current line and advances the LineStream to
184		the next line.
185		"""
186		s = self.peek()
187		if self._done:
188			raise ValueError(f'LineStream: going past EOF, last line was {s}')
189		self._need_next = True
190		return s
191
192	def __bool__(self) -> bool:
193		"""Returns True if stream has more lines."""
194		self._get_next()
195		return not self._done
196
197	# Only used by kunit_tool_test.py.
198	def __iter__(self) -> Iterator[str]:
199		"""Empties all lines stored in LineStream object into
200		Iterator object and returns the Iterator object.
201		"""
202		while bool(self):
203			yield self.pop()
204
205	def line_number(self) -> int:
206		"""Returns the line number of the current line."""
207		self._get_next()
208		return self._next[0]
209
210# Parsing helper methods:
211
212KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
213TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
214KTAP_END = re.compile(r'\s*(List of all partitions:|'
215	'Kernel panic - not syncing: VFS:|reboot: System halted)')
216EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
217
218def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
219	"""Extracts KTAP lines from the kernel output."""
220	def isolate_ktap_output(kernel_output: Iterable[str]) \
221			-> Iterator[Tuple[int, str]]:
222		line_num = 0
223		started = False
224		for line in kernel_output:
225			line_num += 1
226			line = line.rstrip()  # remove trailing \n
227			if not started and KTAP_START.search(line):
228				# start extracting KTAP lines and set prefix
229				# to number of characters before version line
230				prefix_len = len(
231					line.split('KTAP version')[0])
232				started = True
233				yield line_num, line[prefix_len:]
234			elif not started and TAP_START.search(line):
235				# start extracting KTAP lines and set prefix
236				# to number of characters before version line
237				prefix_len = len(line.split('TAP version')[0])
238				started = True
239				yield line_num, line[prefix_len:]
240			elif started and KTAP_END.search(line):
241				# stop extracting KTAP lines
242				break
243			elif started:
244				# remove the prefix, if any.
245				line = line[prefix_len:]
246				yield line_num, line
247			elif EXECUTOR_ERROR.search(line):
248				yield line_num, line
249	return LineStream(lines=isolate_ktap_output(kernel_output))
250
251KTAP_VERSIONS = [1]
252TAP_VERSIONS = [13, 14]
253
254def check_version(version_num: int, accepted_versions: List[int],
255			version_type: str, test: Test, printer: Printer) -> None:
256	"""
257	Adds error to test object if version number is too high or too
258	low.
259
260	Parameters:
261	version_num - The inputted version number from the parsed KTAP or TAP
262		header line
263	accepted_version - List of accepted KTAP or TAP versions
264	version_type - 'KTAP' or 'TAP' depending on the type of
265		version line.
266	test - Test object for current test being parsed
267	printer - Printer object to output error
268	"""
269	if version_num < min(accepted_versions):
270		test.add_error(printer, f'{version_type} version lower than expected!')
271	elif version_num > max(accepted_versions):
272		test.add_error(printer, f'{version_type} version higher than expected!')
273
274def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool:
275	"""
276	Parses KTAP/TAP header line and checks version number.
277	Returns False if fails to parse KTAP/TAP header line.
278
279	Accepted formats:
280	- 'KTAP version [version number]'
281	- 'TAP version [version number]'
282
283	Parameters:
284	lines - LineStream of KTAP output to parse
285	test - Test object for current test being parsed
286	printer - Printer object to output results
287
288	Return:
289	True if successfully parsed KTAP/TAP header line
290	"""
291	ktap_match = KTAP_START.match(lines.peek())
292	tap_match = TAP_START.match(lines.peek())
293	if ktap_match:
294		version_num = int(ktap_match.group(1))
295		check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer)
296	elif tap_match:
297		version_num = int(tap_match.group(1))
298		check_version(version_num, TAP_VERSIONS, 'TAP', test, printer)
299	else:
300		return False
301	lines.pop()
302	return True
303
304TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
305
306def parse_test_header(lines: LineStream, test: Test) -> bool:
307	"""
308	Parses test header and stores test name in test object.
309	Returns False if fails to parse test header line.
310
311	Accepted format:
312	- '# Subtest: [test name]'
313
314	Parameters:
315	lines - LineStream of KTAP output to parse
316	test - Test object for current test being parsed
317
318	Return:
319	True if successfully parsed test header line
320	"""
321	match = TEST_HEADER.match(lines.peek())
322	if not match:
323		return False
324	test.name = match.group(1)
325	lines.pop()
326	return True
327
328TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
329
330def parse_test_plan(lines: LineStream, test: Test) -> bool:
331	"""
332	Parses test plan line and stores the expected number of subtests in
333	test object. Reports an error if expected count is 0.
334	Returns False and sets expected_count to None if there is no valid test
335	plan.
336
337	Accepted format:
338	- '1..[number of subtests]'
339
340	Parameters:
341	lines - LineStream of KTAP output to parse
342	test - Test object for current test being parsed
343
344	Return:
345	True if successfully parsed test plan line
346	"""
347	match = TEST_PLAN.match(lines.peek())
348	if not match:
349		test.expected_count = None
350		return False
351	expected_count = int(match.group(1))
352	test.expected_count = expected_count
353	lines.pop()
354	return True
355
356TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(:?- )?([^#]*)( # .*)?$')
357
358TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(:?- )?(.*) # SKIP ?(.*)$')
359
360def peek_test_name_match(lines: LineStream, test: Test) -> bool:
361	"""
362	Matches current line with the format of a test result line and checks
363	if the name matches the name of the current test.
364	Returns False if fails to match format or name.
365
366	Accepted format:
367	- '[ok|not ok] [test number] [-] [test name] [optional skip
368		directive]'
369
370	Parameters:
371	lines - LineStream of KTAP output to parse
372	test - Test object for current test being parsed
373
374	Return:
375	True if matched a test result line and the name matching the
376		expected test name
377	"""
378	line = lines.peek()
379	match = TEST_RESULT.match(line)
380	if not match:
381		return False
382	name = match.group(4)
383	if not name:
384		return False
385	return name == test.name
386
387def parse_test_result(lines: LineStream, test: Test,
388			expected_num: int, printer: Printer) -> bool:
389	"""
390	Parses test result line and stores the status and name in the test
391	object. Reports an error if the test number does not match expected
392	test number.
393	Returns False if fails to parse test result line.
394
395	Note that the SKIP directive is the only direction that causes a
396	change in status.
397
398	Accepted format:
399	- '[ok|not ok] [test number] [-] [test name] [optional skip
400		directive]'
401
402	Parameters:
403	lines - LineStream of KTAP output to parse
404	test - Test object for current test being parsed
405	expected_num - expected test number for current test
406	printer - Printer object to output results
407
408	Return:
409	True if successfully parsed a test result line.
410	"""
411	line = lines.peek()
412	match = TEST_RESULT.match(line)
413	skip_match = TEST_RESULT_SKIP.match(line)
414
415	# Check if line matches test result line format
416	if not match:
417		return False
418	lines.pop()
419
420	# Set name of test object
421	if skip_match:
422		test.name = skip_match.group(4)
423	else:
424		test.name = match.group(4)
425
426	# Check test num
427	num = int(match.group(2))
428	if num != expected_num:
429		test.add_error(printer, f'Expected test number {expected_num} but found {num}')
430
431	# Set status of test object
432	status = match.group(1)
433	if skip_match:
434		test.status = TestStatus.SKIPPED
435		test.skip_reason = skip_match.group(5) or ''
436	elif status == 'ok':
437		test.status = TestStatus.SUCCESS
438	else:
439		test.status = TestStatus.FAILURE
440	return True
441
442def parse_diagnostic(lines: LineStream) -> List[str]:
443	"""
444	Parse lines that do not match the format of a test result line or
445	test header line and returns them in list.
446
447	Line formats that are not parsed:
448	- '# Subtest: [test name]'
449	- '[ok|not ok] [test number] [-] [test name] [optional skip
450		directive]'
451	- 'KTAP version [version number]'
452
453	Parameters:
454	lines - LineStream of KTAP output to parse
455
456	Return:
457	Log of diagnostic lines
458	"""
459	log = []  # type: List[str]
460	non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
461	while lines and not any(re.match(lines.peek())
462			for re in non_diagnostic_lines):
463		log.append(lines.pop())
464	return log
465
466
467# Printing helper methods:
468
469DIVIDER = '=' * 60
470
471def format_test_divider(message: str, len_message: int) -> str:
472	"""
473	Returns string with message centered in fixed width divider.
474
475	Example:
476	'===================== message example ====================='
477
478	Parameters:
479	message - message to be centered in divider line
480	len_message - length of the message to be printed such that
481		any characters of the color codes are not counted
482
483	Return:
484	String containing message centered in fixed width divider
485	"""
486	default_count = 3  # default number of dashes
487	len_1 = default_count
488	len_2 = default_count
489	difference = len(DIVIDER) - len_message - 2  # 2 spaces added
490	if difference > 0:
491		# calculate number of dashes for each side of the divider
492		len_1 = int(difference / 2)
493		len_2 = difference - len_1
494	return ('=' * len_1) + f' {message} ' + ('=' * len_2)
495
496def print_test_header(test: Test, printer: Printer) -> None:
497	"""
498	Prints test header with test name and optionally the expected number
499	of subtests.
500
501	Example:
502	'=================== example (2 subtests) ==================='
503
504	Parameters:
505	test - Test object representing current test being printed
506	printer - Printer object to output results
507	"""
508	message = test.name
509	if message != "":
510		# Add a leading space before the subtest counts only if a test name
511		# is provided using a "# Subtest" header line.
512		message += " "
513	if test.expected_count:
514		if test.expected_count == 1:
515			message += '(1 subtest)'
516		else:
517			message += f'({test.expected_count} subtests)'
518	printer.print_with_timestamp(format_test_divider(message, len(message)))
519
520def print_log(log: Iterable[str], printer: Printer) -> None:
521	"""Prints all strings in saved log for test in yellow."""
522	formatted = textwrap.dedent('\n'.join(log))
523	for line in formatted.splitlines():
524		printer.print_with_timestamp(printer.yellow(line))
525
526def format_test_result(test: Test, printer: Printer) -> str:
527	"""
528	Returns string with formatted test result with colored status and test
529	name.
530
531	Example:
532	'[PASSED] example'
533
534	Parameters:
535	test - Test object representing current test being printed
536	printer - Printer object to output results
537
538	Return:
539	String containing formatted test result
540	"""
541	if test.status == TestStatus.SUCCESS:
542		return printer.green('[PASSED] ') + test.name
543	if test.status == TestStatus.SKIPPED:
544		skip_message = printer.yellow('[SKIPPED] ') + test.name
545		if test.skip_reason != '':
546			skip_message += printer.yellow(' (' + test.skip_reason + ')')
547		return skip_message
548	if test.status == TestStatus.NO_TESTS:
549		return printer.yellow('[NO TESTS RUN] ') + test.name
550	if test.status == TestStatus.TEST_CRASHED:
551		print_log(test.log, printer)
552		return printer.red('[CRASHED] ') + test.name
553	print_log(test.log, printer)
554	return printer.red('[FAILED] ') + test.name
555
556def print_test_result(test: Test, printer: Printer) -> None:
557	"""
558	Prints result line with status of test.
559
560	Example:
561	'[PASSED] example'
562
563	Parameters:
564	test - Test object representing current test being printed
565	printer - Printer object
566	"""
567	printer.print_with_timestamp(format_test_result(test, printer))
568
569def print_test_footer(test: Test, printer: Printer) -> None:
570	"""
571	Prints test footer with status of test.
572
573	Example:
574	'===================== [PASSED] example ====================='
575
576	Parameters:
577	test - Test object representing current test being printed
578	printer - Printer object to output results
579	"""
580	message = format_test_result(test, printer)
581	printer.print_with_timestamp(format_test_divider(message,
582		len(message) - printer.color_len()))
583
584def print_test(test: Test, failed_only: bool, printer: Printer) -> None:
585	"""
586	Prints Test object to given printer. For a child test, the result line is
587	printed. For a parent test, the test header, all child test results, and
588	the test footer are all printed. If failed_only is true, only failed/crashed
589	tests will be printed.
590
591	Parameters:
592	test - Test object to print
593	failed_only - True if only failed/crashed tests should be printed.
594	printer - Printer object to output results
595	"""
596	if test.name == "main":
597		printer.print_with_timestamp(DIVIDER)
598		for subtest in test.subtests:
599			print_test(subtest, failed_only, printer)
600		printer.print_with_timestamp(DIVIDER)
601	elif test.subtests != []:
602		if not failed_only or not test.ok_status():
603			print_test_header(test, printer)
604			for subtest in test.subtests:
605				print_test(subtest, failed_only, printer)
606			print_test_footer(test, printer)
607	else:
608		if not failed_only or not test.ok_status():
609			print_test_result(test, printer)
610
611def _summarize_failed_tests(test: Test) -> str:
612	"""Tries to summarize all the failing subtests in `test`."""
613
614	def failed_names(test: Test, parent_name: str) -> List[str]:
615		# Note: we use 'main' internally for the top-level test.
616		if not parent_name or parent_name == 'main':
617			full_name = test.name
618		else:
619			full_name = parent_name + '.' + test.name
620
621		if not test.subtests:  # this is a leaf node
622			return [full_name]
623
624		# If all the children failed, just say this subtest failed.
625		# Don't summarize it down "the top-level test failed", though.
626		failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
627		if parent_name and len(failed_subtests) ==  len(test.subtests):
628			return [full_name]
629
630		all_failures = []  # type: List[str]
631		for t in failed_subtests:
632			all_failures.extend(failed_names(t, full_name))
633		return all_failures
634
635	failures = failed_names(test, '')
636	# If there are too many failures, printing them out will just be noisy.
637	if len(failures) > 10:  # this is an arbitrary limit
638		return ''
639
640	return 'Failures: ' + ', '.join(failures)
641
642
643def print_summary_line(test: Test, printer: Printer) -> None:
644	"""
645	Prints summary line of test object. Color of line is dependent on
646	status of test. Color is green if test passes, yellow if test is
647	skipped, and red if the test fails or crashes. Summary line contains
648	counts of the statuses of the tests subtests or the test itself if it
649	has no subtests.
650
651	Example:
652	"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
653	Errors: 0"
654
655	test - Test object representing current test being printed
656	printer - Printer object to output results
657	"""
658	if test.status == TestStatus.SUCCESS:
659		color = printer.green
660	elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
661		color = printer.yellow
662	else:
663		color = printer.red
664	printer.print_with_timestamp(color(f'Testing complete. {test.counts}'))
665
666	# Summarize failures that might have gone off-screen since we had a lot
667	# of tests (arbitrarily defined as >=100 for now).
668	if test.ok_status() or test.counts.total() < 100:
669		return
670	summarized = _summarize_failed_tests(test)
671	if not summarized:
672		return
673	printer.print_with_timestamp(color(summarized))
674
675# Other methods:
676
677def bubble_up_test_results(test: Test) -> None:
678	"""
679	If the test has subtests, add the test counts of the subtests to the
680	test and check if any of the tests crashed and if so set the test
681	status to crashed. Otherwise if the test has no subtests add the
682	status of the test to the test counts.
683
684	Parameters:
685	test - Test object for current test being parsed
686	"""
687	subtests = test.subtests
688	counts = test.counts
689	status = test.status
690	for t in subtests:
691		counts.add_subtest_counts(t.counts)
692	if counts.total() == 0:
693		counts.add_status(status)
694	elif test.counts.get_status() == TestStatus.TEST_CRASHED:
695		test.status = TestStatus.TEST_CRASHED
696
697	if status == TestStatus.FAILURE and test.counts.get_status() == TestStatus.SUCCESS:
698		counts.add_status(status)
699
700def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test:
701	"""
702	Finds next test to parse in LineStream, creates new Test object,
703	parses any subtests of the test, populates Test object with all
704	information (status, name) about the test and the Test objects for
705	any subtests, and then returns the Test object. The method accepts
706	three formats of tests:
707
708	Accepted test formats:
709
710	- Main KTAP/TAP header
711
712	Example:
713
714	KTAP version 1
715	1..4
716	[subtests]
717
718	- Subtest header (must include either the KTAP version line or
719	  "# Subtest" header line)
720
721	Example (preferred format with both KTAP version line and
722	"# Subtest" line):
723
724	KTAP version 1
725	# Subtest: name
726	1..3
727	[subtests]
728	ok 1 name
729
730	Example (only "# Subtest" line):
731
732	# Subtest: name
733	1..3
734	[subtests]
735	ok 1 name
736
737	Example (only KTAP version line, compliant with KTAP v1 spec):
738
739	KTAP version 1
740	1..3
741	[subtests]
742	ok 1 name
743
744	- Test result line
745
746	Example:
747
748	ok 1 - test
749
750	Parameters:
751	lines - LineStream of KTAP output to parse
752	expected_num - expected test number for test to be parsed
753	log - list of strings containing any preceding diagnostic lines
754		corresponding to the current test
755	is_subtest - boolean indicating whether test is a subtest
756	printer - Printer object to output results
757
758	Return:
759	Test object populated with characteristics and any subtests
760	"""
761	test = Test()
762	test.log.extend(log)
763
764	# Parse any errors prior to parsing tests
765	err_log = parse_diagnostic(lines)
766	test.log.extend(err_log)
767
768	if not is_subtest:
769		# If parsing the main/top-level test, parse KTAP version line and
770		# test plan
771		test.name = "main"
772		parse_ktap_header(lines, test, printer)
773		test.log.extend(parse_diagnostic(lines))
774		parse_test_plan(lines, test)
775		parent_test = True
776	else:
777		# If not the main test, attempt to parse a test header containing
778		# the KTAP version line and/or subtest header line
779		ktap_line = parse_ktap_header(lines, test, printer)
780		subtest_line = parse_test_header(lines, test)
781		test.log.extend(parse_diagnostic(lines))
782		parse_test_plan(lines, test)
783		parent_test = (ktap_line or subtest_line)
784		if parent_test:
785			print_test_header(test, printer)
786
787	expected_count = test.expected_count
788	subtests = []
789	test_num = 1
790	while parent_test and (expected_count is None or test_num <= expected_count):
791		# Loop to parse any subtests.
792		# Break after parsing expected number of tests or
793		# if expected number of tests is unknown break when test
794		# result line with matching name to subtest header is found
795		# or no more lines in stream.
796		sub_log = parse_diagnostic(lines)
797		sub_test = Test()
798		if not lines or (peek_test_name_match(lines, test) and
799				is_subtest):
800			if expected_count and test_num <= expected_count:
801				# If parser reaches end of test before
802				# parsing expected number of subtests, print
803				# crashed subtest and record error
804				test.add_error(printer, 'missing expected subtest!')
805				sub_test.log.extend(sub_log)
806				test.counts.add_status(
807					TestStatus.TEST_CRASHED)
808				print_test_result(sub_test, printer)
809			else:
810				test.log.extend(sub_log)
811				break
812		else:
813			sub_test = parse_test(lines, test_num, sub_log, True, printer)
814		subtests.append(sub_test)
815		test_num += 1
816	test.subtests = subtests
817	if is_subtest:
818		# If not main test, look for test result line
819		test.log.extend(parse_diagnostic(lines))
820		if test.name != "" and not peek_test_name_match(lines, test):
821			test.add_error(printer, 'missing subtest result line!')
822		elif not lines:
823			print_log(test.log, printer)
824			test.status = TestStatus.NO_TESTS
825			test.add_error(printer, 'No more test results!')
826		else:
827			parse_test_result(lines, test, expected_num, printer)
828
829	# Check for there being no subtests within parent test
830	if parent_test and len(subtests) == 0:
831		# Don't override a bad status if this test had one reported.
832		# Assumption: no subtests means CRASHED is from Test.__init__()
833		if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
834			print_log(test.log, printer)
835			test.status = TestStatus.NO_TESTS
836			test.add_error(printer, '0 tests run!')
837
838	# Add statuses to TestCounts attribute in Test object
839	bubble_up_test_results(test)
840	if parent_test and is_subtest:
841		# If test has subtests and is not the main test object, print
842		# footer.
843		print_test_footer(test, printer)
844	elif is_subtest:
845		print_test_result(test, printer)
846	return test
847
848def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test:
849	"""
850	Using kernel output, extract KTAP lines, parse the lines for test
851	results and print condensed test results and summary line.
852
853	Parameters:
854	kernel_output - Iterable object contains lines of kernel output
855	printer - Printer object to output results
856
857	Return:
858	Test - the main test object with all subtests.
859	"""
860	printer.print_with_timestamp(DIVIDER)
861	lines = extract_tap_lines(kernel_output)
862	test = Test()
863	if not lines:
864		test.name = '<missing>'
865		test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?\n' +
866			'Try running with the --raw_output=all option to see any log messages.')
867		test.status = TestStatus.FAILURE_TO_PARSE_TESTS
868	else:
869		test = parse_test(lines, 0, [], False, printer)
870		if test.status != TestStatus.NO_TESTS:
871			test.status = test.counts.get_status()
872	printer.print_with_timestamp(DIVIDER)
873	return test
874