xref: /linux/tools/perf/scripts/python/parallel-perf.py (revision 001821b0e79716c4e17c71d8e053a23599a7a508)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3#
4# Run a perf script command multiple times in parallel, using perf script
5# options --cpu and --time so that each job processes a different chunk
6# of the data.
7#
8# Copyright (c) 2024, Intel Corporation.
9
10import subprocess
11import argparse
12import pathlib
13import shlex
14import time
15import copy
16import sys
17import os
18import re
19
20glb_prog_name = "parallel-perf.py"
21glb_min_interval = 10.0
22glb_min_samples = 64
23
24class Verbosity():
25
26	def __init__(self, quiet=False, verbose=False, debug=False):
27		self.normal    = True
28		self.verbose   = verbose
29		self.debug     = debug
30		self.self_test = True
31		if self.debug:
32			self.verbose = True
33		if self.verbose:
34			quiet = False
35		if quiet:
36			self.normal = False
37
38# Manage work (Start/Wait/Kill), as represented by a subprocess.Popen command
39class Work():
40
41	def __init__(self, cmd, pipe_to, output_dir="."):
42		self.popen = None
43		self.consumer = None
44		self.cmd = cmd
45		self.pipe_to = pipe_to
46		self.output_dir = output_dir
47		self.cmdout_name = f"{output_dir}/cmd.txt"
48		self.stdout_name = f"{output_dir}/out.txt"
49		self.stderr_name = f"{output_dir}/err.txt"
50
51	def Command(self):
52		sh_cmd = [ shlex.quote(x) for x in self.cmd ]
53		return " ".join(self.cmd)
54
55	def Stdout(self):
56		return open(self.stdout_name, "w")
57
58	def Stderr(self):
59		return open(self.stderr_name, "w")
60
61	def CreateOutputDir(self):
62		pathlib.Path(self.output_dir).mkdir(parents=True, exist_ok=True)
63
64	def Start(self):
65		if self.popen:
66			return
67		self.CreateOutputDir()
68		with open(self.cmdout_name, "w") as f:
69			f.write(self.Command())
70			f.write("\n")
71		stdout = self.Stdout()
72		stderr = self.Stderr()
73		if self.pipe_to:
74			self.popen = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=stderr)
75			args = shlex.split(self.pipe_to)
76			self.consumer = subprocess.Popen(args, stdin=self.popen.stdout, stdout=stdout, stderr=stderr)
77		else:
78			self.popen = subprocess.Popen(self.cmd, stdout=stdout, stderr=stderr)
79
80	def RemoveEmptyErrFile(self):
81		if os.path.exists(self.stderr_name):
82			if os.path.getsize(self.stderr_name) == 0:
83				os.unlink(self.stderr_name)
84
85	def Errors(self):
86		if os.path.exists(self.stderr_name):
87			if os.path.getsize(self.stderr_name) != 0:
88				return [ f"Non-empty error file {self.stderr_name}" ]
89		return []
90
91	def TidyUp(self):
92		self.RemoveEmptyErrFile()
93
94	def RawPollWait(self, p, wait):
95		if wait:
96			return p.wait()
97		return p.poll()
98
99	def Poll(self, wait=False):
100		if not self.popen:
101			return None
102		result = self.RawPollWait(self.popen, wait)
103		if self.consumer:
104			res = result
105			result = self.RawPollWait(self.consumer, wait)
106			if result != None and res == None:
107				self.popen.kill()
108				result = None
109			elif result == 0 and res != None and res != 0:
110				result = res
111		if result != None:
112			self.TidyUp()
113		return result
114
115	def Wait(self):
116		return self.Poll(wait=True)
117
118	def Kill(self):
119		if not self.popen:
120			return
121		self.popen.kill()
122		if self.consumer:
123			self.consumer.kill()
124
125def KillWork(worklist, verbosity):
126	for w in worklist:
127		w.Kill()
128	for w in worklist:
129		w.Wait()
130
131def NumberOfCPUs():
132	return os.sysconf("SC_NPROCESSORS_ONLN")
133
134def NanoSecsToSecsStr(x):
135	if x == None:
136		return ""
137	x = str(x)
138	if len(x) < 10:
139		x = "0" * (10 - len(x)) + x
140	return x[:len(x) - 9] + "." + x[-9:]
141
142def InsertOptionAfter(cmd, option, after):
143	try:
144		pos = cmd.index(after)
145		cmd.insert(pos + 1, option)
146	except:
147		cmd.append(option)
148
149def CreateWorkList(cmd, pipe_to, output_dir, cpus, time_ranges_by_cpu):
150	max_len = len(str(cpus[-1]))
151	cpu_dir_fmt = f"cpu-%.{max_len}u"
152	worklist = []
153	pos = 0
154	for cpu in cpus:
155		if cpu >= 0:
156			cpu_dir = os.path.join(output_dir, cpu_dir_fmt % cpu)
157			cpu_option = f"--cpu={cpu}"
158		else:
159			cpu_dir = output_dir
160			cpu_option = None
161
162		tr_dir_fmt = "time-range"
163
164		if len(time_ranges_by_cpu) > 1:
165			time_ranges = time_ranges_by_cpu[pos]
166			tr_dir_fmt += f"-{pos}"
167			pos += 1
168		else:
169			time_ranges = time_ranges_by_cpu[0]
170
171		max_len = len(str(len(time_ranges)))
172		tr_dir_fmt += f"-%.{max_len}u"
173
174		i = 0
175		for r in time_ranges:
176			if r == [None, None]:
177				time_option = None
178				work_output_dir = cpu_dir
179			else:
180				time_option = "--time=" + NanoSecsToSecsStr(r[0]) + "," + NanoSecsToSecsStr(r[1])
181				work_output_dir = os.path.join(cpu_dir, tr_dir_fmt % i)
182				i += 1
183			work_cmd = list(cmd)
184			if time_option != None:
185				InsertOptionAfter(work_cmd, time_option, "script")
186			if cpu_option != None:
187				InsertOptionAfter(work_cmd, cpu_option, "script")
188			w = Work(work_cmd, pipe_to, work_output_dir)
189			worklist.append(w)
190	return worklist
191
192def DoRunWork(worklist, nr_jobs, verbosity):
193	nr_to_do = len(worklist)
194	not_started = list(worklist)
195	running = []
196	done = []
197	chg = False
198	while True:
199		nr_done = len(done)
200		if chg and verbosity.normal:
201			nr_run = len(running)
202			print(f"\rThere are {nr_to_do} jobs: {nr_done} completed, {nr_run} running", flush=True, end=" ")
203			if verbosity.verbose:
204				print()
205			chg = False
206		if nr_done == nr_to_do:
207			break
208		while len(running) < nr_jobs and len(not_started):
209			w = not_started.pop(0)
210			running.append(w)
211			if verbosity.verbose:
212				print("Starting:", w.Command())
213			w.Start()
214			chg = True
215		if len(running):
216			time.sleep(0.1)
217		finished = []
218		not_finished = []
219		while len(running):
220			w = running.pop(0)
221			r = w.Poll()
222			if r == None:
223				not_finished.append(w)
224				continue
225			if r == 0:
226				if verbosity.verbose:
227					print("Finished:", w.Command())
228				finished.append(w)
229				chg = True
230				continue
231			if verbosity.normal and not verbosity.verbose:
232				print()
233			print("Job failed!\n    return code:", r, "\n    command:    ", w.Command())
234			if w.pipe_to:
235				print("    piped to:   ", w.pipe_to)
236			print("Killing outstanding jobs")
237			KillWork(not_finished, verbosity)
238			KillWork(running, verbosity)
239			return False
240		running = not_finished
241		done += finished
242	errorlist = []
243	for w in worklist:
244		errorlist += w.Errors()
245	if len(errorlist):
246		print("Errors:")
247		for e in errorlist:
248			print(e)
249	elif verbosity.normal:
250		print("\r"," "*50, "\rAll jobs finished successfully", flush=True)
251	return True
252
253def RunWork(worklist, nr_jobs=NumberOfCPUs(), verbosity=Verbosity()):
254	try:
255		return DoRunWork(worklist, nr_jobs, verbosity)
256	except:
257		for w in worklist:
258			w.Kill()
259		raise
260	return True
261
262def ReadHeader(perf, file_name):
263	return subprocess.Popen([perf, "script", "--header-only", "--input", file_name], stdout=subprocess.PIPE).stdout.read().decode("utf-8")
264
265def ParseHeader(hdr):
266	result = {}
267	lines = hdr.split("\n")
268	for line in lines:
269		if ":" in line and line[0] == "#":
270			pos = line.index(":")
271			name = line[1:pos-1].strip()
272			value = line[pos+1:].strip()
273			if name in result:
274				orig_name = name
275				nr = 2
276				while True:
277					name = f"{orig_name} {nr}"
278					if name not in result:
279						break
280					nr += 1
281			result[name] = value
282	return result
283
284def HeaderField(hdr_dict, hdr_fld):
285	if hdr_fld not in hdr_dict:
286		raise Exception(f"'{hdr_fld}' missing from header information")
287	return hdr_dict[hdr_fld]
288
289# Represent the position of an option within a command string
290# and provide the option value and/or remove the option
291class OptPos():
292
293	def Init(self, opt_element=-1, value_element=-1, opt_pos=-1, value_pos=-1, error=None):
294		self.opt_element = opt_element		# list element that contains option
295		self.value_element = value_element	# list element that contains option value
296		self.opt_pos = opt_pos			# string position of option
297		self.value_pos = value_pos		# string position of value
298		self.error = error			# error message string
299
300	def __init__(self, args, short_name, long_name, default=None):
301		self.args = list(args)
302		self.default = default
303		n = 2 + len(long_name)
304		m = len(short_name)
305		pos = -1
306		for opt in args:
307			pos += 1
308			if m and opt[:2] == f"-{short_name}":
309				if len(opt) == 2:
310					if pos + 1 < len(args):
311						self.Init(pos, pos + 1, 0, 0)
312					else:
313						self.Init(error = f"-{short_name} option missing value")
314				else:
315					self.Init(pos, pos, 0, 2)
316				return
317			if opt[:n] == f"--{long_name}":
318				if len(opt) == n:
319					if pos + 1 < len(args):
320						self.Init(pos, pos + 1, 0, 0)
321					else:
322						self.Init(error = f"--{long_name} option missing value")
323				elif opt[n] == "=":
324					self.Init(pos, pos, 0, n + 1)
325				else:
326					self.Init(error = f"--{long_name} option expected '='")
327				return
328			if m and opt[:1] == "-" and opt[:2] != "--" and short_name in opt:
329				ipos = opt.index(short_name)
330				if "-" in opt[1:]:
331					hpos = opt[1:].index("-")
332					if hpos < ipos:
333						continue
334				if ipos + 1 == len(opt):
335					if pos + 1 < len(args):
336						self.Init(pos, pos + 1, ipos, 0)
337					else:
338						self.Init(error = f"-{short_name} option missing value")
339				else:
340					self.Init(pos, pos, ipos, ipos + 1)
341				return
342		self.Init()
343
344	def Value(self):
345		if self.opt_element >= 0:
346			if self.opt_element != self.value_element:
347				return self.args[self.value_element]
348			else:
349				return self.args[self.value_element][self.value_pos:]
350		return self.default
351
352	def Remove(self, args):
353		if self.opt_element == -1:
354			return
355		if self.opt_element != self.value_element:
356			del args[self.value_element]
357		if self.opt_pos:
358			args[self.opt_element] = args[self.opt_element][:self.opt_pos]
359		else:
360			del args[self.opt_element]
361
362def DetermineInputFileName(cmd):
363	p = OptPos(cmd, "i", "input", "perf.data")
364	if p.error:
365		raise Exception(f"perf command {p.error}")
366	file_name = p.Value()
367	if not os.path.exists(file_name):
368		raise Exception(f"perf command input file '{file_name}' not found")
369	return file_name
370
371def ReadOption(args, short_name, long_name, err_prefix, remove=False):
372	p = OptPos(args, short_name, long_name)
373	if p.error:
374		raise Exception(f"{err_prefix}{p.error}")
375	value = p.Value()
376	if remove:
377		p.Remove(args)
378	return value
379
380def ExtractOption(args, short_name, long_name, err_prefix):
381	return ReadOption(args, short_name, long_name, err_prefix, True)
382
383def ReadPerfOption(args, short_name, long_name):
384	return ReadOption(args, short_name, long_name, "perf command ")
385
386def ExtractPerfOption(args, short_name, long_name):
387	return ExtractOption(args, short_name, long_name, "perf command ")
388
389def PerfDoubleQuickCommands(cmd, file_name):
390	cpu_str = ReadPerfOption(cmd, "C", "cpu")
391	time_str = ReadPerfOption(cmd, "", "time")
392	# Use double-quick sampling to determine trace data density
393	times_cmd = ["perf", "script", "--ns", "--input", file_name, "--itrace=qqi"]
394	if cpu_str != None and cpu_str != "":
395		times_cmd.append(f"--cpu={cpu_str}")
396	if time_str != None and time_str != "":
397		times_cmd.append(f"--time={time_str}")
398	cnts_cmd = list(times_cmd)
399	cnts_cmd.append("-Fcpu")
400	times_cmd.append("-Fcpu,time")
401	return cnts_cmd, times_cmd
402
403class CPUTimeRange():
404	def __init__(self, cpu):
405		self.cpu = cpu
406		self.sample_cnt = 0
407		self.time_ranges = None
408		self.interval = 0
409		self.interval_remaining = 0
410		self.remaining = 0
411		self.tr_pos = 0
412
413def CalcTimeRangesByCPU(line, cpu, cpu_time_ranges, max_time):
414	cpu_time_range = cpu_time_ranges[cpu]
415	cpu_time_range.remaining -= 1
416	cpu_time_range.interval_remaining -= 1
417	if cpu_time_range.remaining == 0:
418		cpu_time_range.time_ranges[cpu_time_range.tr_pos][1] = max_time
419		return
420	if cpu_time_range.interval_remaining == 0:
421		time = TimeVal(line[1][:-1], 0)
422		time_ranges = cpu_time_range.time_ranges
423		time_ranges[cpu_time_range.tr_pos][1] = time - 1
424		time_ranges.append([time, max_time])
425		cpu_time_range.tr_pos += 1
426		cpu_time_range.interval_remaining = cpu_time_range.interval
427
428def CountSamplesByCPU(line, cpu, cpu_time_ranges):
429	try:
430		cpu_time_ranges[cpu].sample_cnt += 1
431	except:
432		print("exception")
433		print("cpu", cpu)
434		print("len(cpu_time_ranges)", len(cpu_time_ranges))
435		raise
436
437def ProcessCommandOutputLines(cmd, per_cpu, fn, *x):
438	# Assume CPU number is at beginning of line and enclosed by []
439	pat = re.compile(r"\s*\[[0-9]+\]")
440	p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
441	while True:
442		if line := p.stdout.readline():
443			line = line.decode("utf-8")
444			if pat.match(line):
445				line = line.split()
446				if per_cpu:
447					# Assumes CPU number is enclosed by []
448					cpu = int(line[0][1:-1])
449				else:
450					cpu = 0
451				fn(line, cpu, *x)
452		else:
453			break
454	p.wait()
455
456def IntersectTimeRanges(new_time_ranges, time_ranges):
457	pos = 0
458	new_pos = 0
459	# Can assume len(time_ranges) != 0 and len(new_time_ranges) != 0
460	# Note also, there *must* be at least one intersection.
461	while pos < len(time_ranges) and new_pos < len(new_time_ranges):
462		# new end < old start => no intersection, remove new
463		if new_time_ranges[new_pos][1] < time_ranges[pos][0]:
464			del new_time_ranges[new_pos]
465			continue
466		# new start > old end => no intersection, check next
467		if new_time_ranges[new_pos][0] > time_ranges[pos][1]:
468			pos += 1
469			if pos < len(time_ranges):
470				continue
471			# no next, so remove remaining
472			while new_pos < len(new_time_ranges):
473				del new_time_ranges[new_pos]
474			return
475		# Found an intersection
476		# new start < old start => adjust new start = old start
477		if new_time_ranges[new_pos][0] < time_ranges[pos][0]:
478			new_time_ranges[new_pos][0] = time_ranges[pos][0]
479		# new end > old end => keep the overlap, insert the remainder
480		if new_time_ranges[new_pos][1] > time_ranges[pos][1]:
481			r = [ time_ranges[pos][1] + 1, new_time_ranges[new_pos][1] ]
482			new_time_ranges[new_pos][1] = time_ranges[pos][1]
483			new_pos += 1
484			new_time_ranges.insert(new_pos, r)
485			continue
486		# new [start, end] is within old [start, end]
487		new_pos += 1
488
489def SplitTimeRangesByTraceDataDensity(time_ranges, cpus, nr, cmd, file_name, per_cpu, min_size, min_interval, verbosity):
490	if verbosity.normal:
491		print("\rAnalyzing...", flush=True, end=" ")
492		if verbosity.verbose:
493			print()
494	cnts_cmd, times_cmd = PerfDoubleQuickCommands(cmd, file_name)
495
496	nr_cpus = cpus[-1] + 1 if per_cpu else 1
497	if per_cpu:
498		nr_cpus = cpus[-1] + 1
499		cpu_time_ranges = [ CPUTimeRange(cpu) for cpu in range(nr_cpus) ]
500	else:
501		nr_cpus = 1
502		cpu_time_ranges = [ CPUTimeRange(-1) ]
503
504	if verbosity.debug:
505		print("nr_cpus", nr_cpus)
506		print("cnts_cmd", cnts_cmd)
507		print("times_cmd", times_cmd)
508
509	# Count the number of "double quick" samples per CPU
510	ProcessCommandOutputLines(cnts_cmd, per_cpu, CountSamplesByCPU, cpu_time_ranges)
511
512	tot = 0
513	mx = 0
514	for cpu_time_range in cpu_time_ranges:
515		cnt = cpu_time_range.sample_cnt
516		tot += cnt
517		if cnt > mx:
518			mx = cnt
519		if verbosity.debug:
520			print("cpu:", cpu_time_range.cpu, "sample_cnt", cnt)
521
522	if min_size < 1:
523		min_size = 1
524
525	if mx < min_size:
526		# Too little data to be worth splitting
527		if verbosity.debug:
528			print("Too little data to split by time")
529		if nr == 0:
530			nr = 1
531		return [ SplitTimeRangesIntoN(time_ranges, nr, min_interval) ]
532
533	if nr:
534		divisor = nr
535		min_size = 1
536	else:
537		divisor = NumberOfCPUs()
538
539	interval = int(round(tot / divisor, 0))
540	if interval < min_size:
541		interval = min_size
542
543	if verbosity.debug:
544		print("divisor", divisor)
545		print("min_size", min_size)
546		print("interval", interval)
547
548	min_time = time_ranges[0][0]
549	max_time = time_ranges[-1][1]
550
551	for cpu_time_range in cpu_time_ranges:
552		cnt = cpu_time_range.sample_cnt
553		if cnt == 0:
554			cpu_time_range.time_ranges = copy.deepcopy(time_ranges)
555			continue
556		# Adjust target interval for CPU to give approximately equal interval sizes
557		# Determine number of intervals, rounding to nearest integer
558		n = int(round(cnt / interval, 0))
559		if n < 1:
560			n = 1
561		# Determine interval size, rounding up
562		d, m = divmod(cnt, n)
563		if m:
564			d += 1
565		cpu_time_range.interval = d
566		cpu_time_range.interval_remaining = d
567		cpu_time_range.remaining = cnt
568		# Init. time ranges for each CPU with the start time
569		cpu_time_range.time_ranges = [ [min_time, max_time] ]
570
571	# Set time ranges so that the same number of "double quick" samples
572	# will fall into each time range.
573	ProcessCommandOutputLines(times_cmd, per_cpu, CalcTimeRangesByCPU, cpu_time_ranges, max_time)
574
575	for cpu_time_range in cpu_time_ranges:
576		if cpu_time_range.sample_cnt:
577			IntersectTimeRanges(cpu_time_range.time_ranges, time_ranges)
578
579	return [cpu_time_ranges[cpu].time_ranges for cpu in cpus]
580
581def SplitSingleTimeRangeIntoN(time_range, n):
582	if n <= 1:
583		return [time_range]
584	start = time_range[0]
585	end   = time_range[1]
586	duration = int((end - start + 1) / n)
587	if duration < 1:
588		return [time_range]
589	time_ranges = []
590	for i in range(n):
591		time_ranges.append([start, start + duration - 1])
592		start += duration
593	time_ranges[-1][1] = end
594	return time_ranges
595
596def TimeRangeDuration(r):
597	return r[1] - r[0] + 1
598
599def TotalDuration(time_ranges):
600	duration = 0
601	for r in time_ranges:
602		duration += TimeRangeDuration(r)
603	return duration
604
605def SplitTimeRangesByInterval(time_ranges, interval):
606	new_ranges = []
607	for r in time_ranges:
608		duration = TimeRangeDuration(r)
609		n = duration / interval
610		n = int(round(n, 0))
611		new_ranges += SplitSingleTimeRangeIntoN(r, n)
612	return new_ranges
613
614def SplitTimeRangesIntoN(time_ranges, n, min_interval):
615	if n <= len(time_ranges):
616		return time_ranges
617	duration = TotalDuration(time_ranges)
618	interval = duration / n
619	if interval < min_interval:
620		interval = min_interval
621	return SplitTimeRangesByInterval(time_ranges, interval)
622
623def RecombineTimeRanges(tr):
624	new_tr = copy.deepcopy(tr)
625	n = len(new_tr)
626	i = 1
627	while i < len(new_tr):
628		# if prev end + 1 == cur start, combine them
629		if new_tr[i - 1][1] + 1 == new_tr[i][0]:
630			new_tr[i][0] = new_tr[i - 1][0]
631			del new_tr[i - 1]
632		else:
633			i += 1
634	return new_tr
635
636def OpenTimeRangeEnds(time_ranges, min_time, max_time):
637	if time_ranges[0][0] <= min_time:
638		time_ranges[0][0] = None
639	if time_ranges[-1][1] >= max_time:
640		time_ranges[-1][1] = None
641
642def BadTimeStr(time_str):
643	raise Exception(f"perf command bad time option: '{time_str}'\nCheck also 'time of first sample' and 'time of last sample' in perf script --header-only")
644
645def ValidateTimeRanges(time_ranges, time_str):
646	n = len(time_ranges)
647	for i in range(n):
648		start = time_ranges[i][0]
649		end   = time_ranges[i][1]
650		if i != 0 and start <= time_ranges[i - 1][1]:
651			BadTimeStr(time_str)
652		if start > end:
653			BadTimeStr(time_str)
654
655def TimeVal(s, dflt):
656	s = s.strip()
657	if s == "":
658		return dflt
659	a = s.split(".")
660	if len(a) > 2:
661		raise Exception(f"Bad time value'{s}'")
662	x = int(a[0])
663	if x < 0:
664		raise Exception("Negative time not allowed")
665	x *= 1000000000
666	if len(a) > 1:
667		x += int((a[1] + "000000000")[:9])
668	return x
669
670def BadCPUStr(cpu_str):
671	raise Exception(f"perf command bad cpu option: '{cpu_str}'\nCheck also 'nrcpus avail' in perf script --header-only")
672
673def ParseTimeStr(time_str, min_time, max_time):
674	if time_str == None or time_str == "":
675		return [[min_time, max_time]]
676	time_ranges = []
677	for r in time_str.split():
678		a = r.split(",")
679		if len(a) != 2:
680			BadTimeStr(time_str)
681		try:
682			start = TimeVal(a[0], min_time)
683			end   = TimeVal(a[1], max_time)
684		except:
685			BadTimeStr(time_str)
686		time_ranges.append([start, end])
687	ValidateTimeRanges(time_ranges, time_str)
688	return time_ranges
689
690def ParseCPUStr(cpu_str, nr_cpus):
691	if cpu_str == None or cpu_str == "":
692		return [-1]
693	cpus = []
694	for r in cpu_str.split(","):
695		a = r.split("-")
696		if len(a) < 1 or len(a) > 2:
697			BadCPUStr(cpu_str)
698		try:
699			start = int(a[0].strip())
700			if len(a) > 1:
701				end = int(a[1].strip())
702			else:
703				end = start
704		except:
705			BadCPUStr(cpu_str)
706		if start < 0 or end < 0 or end < start or end >= nr_cpus:
707			BadCPUStr(cpu_str)
708		cpus.extend(range(start, end + 1))
709	cpus = list(set(cpus)) # Remove duplicates
710	cpus.sort()
711	return cpus
712
713class ParallelPerf():
714
715	def __init__(self, a):
716		for arg_name in vars(a):
717			setattr(self, arg_name, getattr(a, arg_name))
718		self.orig_nr = self.nr
719		self.orig_cmd = list(self.cmd)
720		self.perf = self.cmd[0]
721		if os.path.exists(self.output_dir):
722			raise Exception(f"Output '{self.output_dir}' already exists")
723		if self.jobs < 0 or self.nr < 0 or self.interval < 0:
724			raise Exception("Bad options (negative values): try -h option for help")
725		if self.nr != 0 and self.interval != 0:
726			raise Exception("Cannot specify number of time subdivisions and time interval")
727		if self.jobs == 0:
728			self.jobs = NumberOfCPUs()
729		if self.nr == 0 and self.interval == 0:
730			if self.per_cpu:
731				self.nr = 1
732			else:
733				self.nr = self.jobs
734
735	def Init(self):
736		if self.verbosity.debug:
737			print("cmd", self.cmd)
738		self.file_name = DetermineInputFileName(self.cmd)
739		self.hdr = ReadHeader(self.perf, self.file_name)
740		self.hdr_dict = ParseHeader(self.hdr)
741		self.cmd_line = HeaderField(self.hdr_dict, "cmdline")
742
743	def ExtractTimeInfo(self):
744		self.min_time = TimeVal(HeaderField(self.hdr_dict, "time of first sample"), 0)
745		self.max_time = TimeVal(HeaderField(self.hdr_dict, "time of last sample"), 0)
746		self.time_str = ExtractPerfOption(self.cmd, "", "time")
747		self.time_ranges = ParseTimeStr(self.time_str, self.min_time, self.max_time)
748		if self.verbosity.debug:
749			print("time_ranges", self.time_ranges)
750
751	def ExtractCPUInfo(self):
752		if self.per_cpu:
753			nr_cpus = int(HeaderField(self.hdr_dict, "nrcpus avail"))
754			self.cpu_str = ExtractPerfOption(self.cmd, "C", "cpu")
755			if self.cpu_str == None or self.cpu_str == "":
756				self.cpus = [ x for x in range(nr_cpus) ]
757			else:
758				self.cpus = ParseCPUStr(self.cpu_str, nr_cpus)
759		else:
760			self.cpu_str = None
761			self.cpus = [-1]
762		if self.verbosity.debug:
763			print("cpus", self.cpus)
764
765	def IsIntelPT(self):
766		return self.cmd_line.find("intel_pt") >= 0
767
768	def SplitTimeRanges(self):
769		if self.IsIntelPT() and self.interval == 0:
770			self.split_time_ranges_for_each_cpu = \
771				SplitTimeRangesByTraceDataDensity(self.time_ranges, self.cpus, self.orig_nr,
772								  self.orig_cmd, self.file_name, self.per_cpu,
773								  self.min_size, self.min_interval, self.verbosity)
774		elif self.nr:
775			self.split_time_ranges_for_each_cpu = [ SplitTimeRangesIntoN(self.time_ranges, self.nr, self.min_interval) ]
776		else:
777			self.split_time_ranges_for_each_cpu = [ SplitTimeRangesByInterval(self.time_ranges, self.interval) ]
778
779	def CheckTimeRanges(self):
780		for tr in self.split_time_ranges_for_each_cpu:
781			# Re-combined time ranges should be the same
782			new_tr = RecombineTimeRanges(tr)
783			if new_tr != self.time_ranges:
784				if self.verbosity.debug:
785					print("tr", tr)
786					print("new_tr", new_tr)
787				raise Exception("Self test failed!")
788
789	def OpenTimeRangeEnds(self):
790		for time_ranges in self.split_time_ranges_for_each_cpu:
791			OpenTimeRangeEnds(time_ranges, self.min_time, self.max_time)
792
793	def CreateWorkList(self):
794		self.worklist = CreateWorkList(self.cmd, self.pipe_to, self.output_dir, self.cpus, self.split_time_ranges_for_each_cpu)
795
796	def PerfDataRecordedPerCPU(self):
797		if "--per-thread" in self.cmd_line.split():
798			return False
799		return True
800
801	def DefaultToPerCPU(self):
802		# --no-per-cpu option takes precedence
803		if self.no_per_cpu:
804			return False
805		if not self.PerfDataRecordedPerCPU():
806			return False
807		# Default to per-cpu for Intel PT data that was recorded per-cpu,
808		# because decoding can be done for each CPU separately.
809		if self.IsIntelPT():
810			return True
811		return False
812
813	def Config(self):
814		self.Init()
815		self.ExtractTimeInfo()
816		if not self.per_cpu:
817			self.per_cpu = self.DefaultToPerCPU()
818		if self.verbosity.debug:
819			print("per_cpu", self.per_cpu)
820		self.ExtractCPUInfo()
821		self.SplitTimeRanges()
822		if self.verbosity.self_test:
823			self.CheckTimeRanges()
824		# Prefer open-ended time range to starting / ending with min_time / max_time resp.
825		self.OpenTimeRangeEnds()
826		self.CreateWorkList()
827
828	def Run(self):
829		if self.dry_run:
830			print(len(self.worklist),"jobs:")
831			for w in self.worklist:
832				print(w.Command())
833			return True
834		result = RunWork(self.worklist, self.jobs, verbosity=self.verbosity)
835		if self.verbosity.verbose:
836			print(glb_prog_name, "done")
837		return result
838
839def RunParallelPerf(a):
840	pp = ParallelPerf(a)
841	pp.Config()
842	return pp.Run()
843
844def Main(args):
845	ap = argparse.ArgumentParser(
846		prog=glb_prog_name, formatter_class = argparse.RawDescriptionHelpFormatter,
847		description =
848"""
849Run a perf script command multiple times in parallel, using perf script options
850--cpu and --time so that each job processes a different chunk of the data.
851""",
852		epilog =
853"""
854Follow the options by '--' and then the perf script command e.g.
855
856	$ perf record -a -- sleep 10
857	$ parallel-perf.py --nr=4 -- perf script --ns
858	All jobs finished successfully
859	$ tree parallel-perf-output/
860	parallel-perf-output/
861	├── time-range-0
862	│   ├── cmd.txt
863	│   └── out.txt
864	├── time-range-1
865	│   ├── cmd.txt
866	│   └── out.txt
867	├── time-range-2
868	│   ├── cmd.txt
869	│   └── out.txt
870	└── time-range-3
871	    ├── cmd.txt
872	    └── out.txt
873	$ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
874	parallel-perf-output/time-range-0/cmd.txt:perf script --time=,9466.504461499 --ns
875	parallel-perf-output/time-range-1/cmd.txt:perf script --time=9466.504461500,9469.005396999 --ns
876	parallel-perf-output/time-range-2/cmd.txt:perf script --time=9469.005397000,9471.506332499 --ns
877	parallel-perf-output/time-range-3/cmd.txt:perf script --time=9471.506332500, --ns
878
879Any perf script command can be used, including the use of perf script options
880--dlfilter and --script, so that the benefit of running parallel jobs
881naturally extends to them also.
882
883If option --pipe-to is used, standard output is first piped through that
884command. Beware, if the command fails (e.g. grep with no matches), it will be
885considered a fatal error.
886
887Final standard output is redirected to files named out.txt in separate
888subdirectories under the output directory. Similarly, standard error is
889written to files named err.txt. In addition, files named cmd.txt contain the
890corresponding perf script command. After processing, err.txt files are removed
891if they are empty.
892
893If any job exits with a non-zero exit code, then all jobs are killed and no
894more are started. A message is printed if any job results in a non-empty
895err.txt file.
896
897There is a separate output subdirectory for each time range. If the --per-cpu
898option is used, these are further grouped under cpu-n subdirectories, e.g.
899
900	$ parallel-perf.py --per-cpu --nr=2 -- perf script --ns --cpu=0,1
901	All jobs finished successfully
902	$ tree parallel-perf-output
903	parallel-perf-output/
904	├── cpu-0
905	│   ├── time-range-0
906	│   │   ├── cmd.txt
907	│   │   └── out.txt
908	│   └── time-range-1
909	│       ├── cmd.txt
910	│       └── out.txt
911	└── cpu-1
912	    ├── time-range-0
913	    │   ├── cmd.txt
914	    │   └── out.txt
915	    └── time-range-1
916	        ├── cmd.txt
917	        └── out.txt
918	$ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
919	parallel-perf-output/cpu-0/time-range-0/cmd.txt:perf script --cpu=0 --time=,9469.005396999 --ns
920	parallel-perf-output/cpu-0/time-range-1/cmd.txt:perf script --cpu=0 --time=9469.005397000, --ns
921	parallel-perf-output/cpu-1/time-range-0/cmd.txt:perf script --cpu=1 --time=,9469.005396999 --ns
922	parallel-perf-output/cpu-1/time-range-1/cmd.txt:perf script --cpu=1 --time=9469.005397000, --ns
923
924Subdivisions of time range, and cpus if the --per-cpu option is used, are
925expressed by the --time and --cpu perf script options respectively. If the
926supplied perf script command has a --time option, then that time range is
927subdivided, otherwise the time range given by 'time of first sample' to
928'time of last sample' is used (refer perf script --header-only). Similarly, the
929supplied perf script command may provide a --cpu option, and only those CPUs
930will be processed.
931
932To prevent time intervals becoming too small, the --min-interval option can
933be used.
934
935Note there is special handling for processing Intel PT traces. If an interval is
936not specified and the perf record command contained the intel_pt event, then the
937time range will be subdivided in order to produce subdivisions that contain
938approximately the same amount of trace data. That is accomplished by counting
939double-quick (--itrace=qqi) samples, and choosing time ranges that encompass
940approximately the same number of samples. In that case, time ranges may not be
941the same for each CPU processed. For Intel PT, --per-cpu is the default, but
942that can be overridden by --no-per-cpu. Note, for Intel PT, double-quick
943decoding produces 1 sample for each PSB synchronization packet, which in turn
944come after a certain number of bytes output, determined by psb_period (refer
945perf Intel PT documentation). The minimum number of double-quick samples that
946will define a time range can be set by the --min_size option, which defaults to
94764.
948""")
949	ap.add_argument("-o", "--output-dir", default="parallel-perf-output", help="output directory (default 'parallel-perf-output')")
950	ap.add_argument("-j", "--jobs", type=int, default=0, help="maximum number of jobs to run in parallel at one time (default is the number of CPUs)")
951	ap.add_argument("-n", "--nr", type=int, default=0, help="number of time subdivisions (default is the number of jobs)")
952	ap.add_argument("-i", "--interval", type=float, default=0, help="subdivide the time range using this time interval (in seconds e.g. 0.1 for a tenth of a second)")
953	ap.add_argument("-c", "--per-cpu", action="store_true", help="process data for each CPU in parallel")
954	ap.add_argument("-m", "--min-interval", type=float, default=glb_min_interval, help=f"minimum interval (default {glb_min_interval} seconds)")
955	ap.add_argument("-p", "--pipe-to", help="command to pipe output to (optional)")
956	ap.add_argument("-N", "--no-per-cpu", action="store_true", help="do not process data for each CPU in parallel")
957	ap.add_argument("-b", "--min_size", type=int, default=glb_min_samples, help="minimum data size (for Intel PT in PSBs)")
958	ap.add_argument("-D", "--dry-run", action="store_true", help="do not run any jobs, just show the perf script commands")
959	ap.add_argument("-q", "--quiet", action="store_true", help="do not print any messages except errors")
960	ap.add_argument("-v", "--verbose", action="store_true", help="print more messages")
961	ap.add_argument("-d", "--debug", action="store_true", help="print debugging messages")
962	cmd_line = list(args)
963	try:
964		split_pos = cmd_line.index("--")
965		cmd = cmd_line[split_pos + 1:]
966		args = cmd_line[:split_pos]
967	except:
968		cmd = None
969		args = cmd_line
970	a = ap.parse_args(args=args[1:])
971	a.cmd = cmd
972	a.verbosity = Verbosity(a.quiet, a.verbose, a.debug)
973	try:
974		if a.cmd == None:
975			if len(args) <= 1:
976				ap.print_help()
977				return True
978			raise Exception("Command line must contain '--' before perf command")
979		return RunParallelPerf(a)
980	except Exception as e:
981		print("Fatal error: ", str(e))
982		if a.debug:
983			raise
984		return False
985
986if __name__ == "__main__":
987	if not Main(sys.argv):
988		sys.exit(1)
989