xref: /linux/tools/perf/scripts/python/parallel-perf.py (revision a1ff5a7d78a036d6c2178ee5acd6ba4946243800)
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: GPL-2.0
3 #
4 # Run a perf script command multiple times in parallel, using perf script
5 # options --cpu and --time so that each job processes a different chunk
6 # of the data.
7 #
8 # Copyright (c) 2024, Intel Corporation.
9 
10 import subprocess
11 import argparse
12 import pathlib
13 import shlex
14 import time
15 import copy
16 import sys
17 import os
18 import re
19 
20 glb_prog_name = "parallel-perf.py"
21 glb_min_interval = 10.0
22 glb_min_samples = 64
23 
24 class Verbosity():
25 
26 	def __init__(self, quiet=False, verbose=False, debug=False):
27 		self.normal    = True
28 		self.verbose   = verbose
29 		self.debug     = debug
30 		self.self_test = True
31 		if self.debug:
32 			self.verbose = True
33 		if self.verbose:
34 			quiet = False
35 		if quiet:
36 			self.normal = False
37 
38 # Manage work (Start/Wait/Kill), as represented by a subprocess.Popen command
39 class Work():
40 
41 	def __init__(self, cmd, pipe_to, output_dir="."):
42 		self.popen = None
43 		self.consumer = None
44 		self.cmd = cmd
45 		self.pipe_to = pipe_to
46 		self.output_dir = output_dir
47 		self.cmdout_name = f"{output_dir}/cmd.txt"
48 		self.stdout_name = f"{output_dir}/out.txt"
49 		self.stderr_name = f"{output_dir}/err.txt"
50 
51 	def Command(self):
52 		sh_cmd = [ shlex.quote(x) for x in self.cmd ]
53 		return " ".join(self.cmd)
54 
55 	def Stdout(self):
56 		return open(self.stdout_name, "w")
57 
58 	def Stderr(self):
59 		return open(self.stderr_name, "w")
60 
61 	def CreateOutputDir(self):
62 		pathlib.Path(self.output_dir).mkdir(parents=True, exist_ok=True)
63 
64 	def Start(self):
65 		if self.popen:
66 			return
67 		self.CreateOutputDir()
68 		with open(self.cmdout_name, "w") as f:
69 			f.write(self.Command())
70 			f.write("\n")
71 		stdout = self.Stdout()
72 		stderr = self.Stderr()
73 		if self.pipe_to:
74 			self.popen = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=stderr)
75 			args = shlex.split(self.pipe_to)
76 			self.consumer = subprocess.Popen(args, stdin=self.popen.stdout, stdout=stdout, stderr=stderr)
77 		else:
78 			self.popen = subprocess.Popen(self.cmd, stdout=stdout, stderr=stderr)
79 
80 	def RemoveEmptyErrFile(self):
81 		if os.path.exists(self.stderr_name):
82 			if os.path.getsize(self.stderr_name) == 0:
83 				os.unlink(self.stderr_name)
84 
85 	def Errors(self):
86 		if os.path.exists(self.stderr_name):
87 			if os.path.getsize(self.stderr_name) != 0:
88 				return [ f"Non-empty error file {self.stderr_name}" ]
89 		return []
90 
91 	def TidyUp(self):
92 		self.RemoveEmptyErrFile()
93 
94 	def RawPollWait(self, p, wait):
95 		if wait:
96 			return p.wait()
97 		return p.poll()
98 
99 	def Poll(self, wait=False):
100 		if not self.popen:
101 			return None
102 		result = self.RawPollWait(self.popen, wait)
103 		if self.consumer:
104 			res = result
105 			result = self.RawPollWait(self.consumer, wait)
106 			if result != None and res == None:
107 				self.popen.kill()
108 				result = None
109 			elif result == 0 and res != None and res != 0:
110 				result = res
111 		if result != None:
112 			self.TidyUp()
113 		return result
114 
115 	def Wait(self):
116 		return self.Poll(wait=True)
117 
118 	def Kill(self):
119 		if not self.popen:
120 			return
121 		self.popen.kill()
122 		if self.consumer:
123 			self.consumer.kill()
124 
125 def KillWork(worklist, verbosity):
126 	for w in worklist:
127 		w.Kill()
128 	for w in worklist:
129 		w.Wait()
130 
131 def NumberOfCPUs():
132 	return os.sysconf("SC_NPROCESSORS_ONLN")
133 
134 def NanoSecsToSecsStr(x):
135 	if x == None:
136 		return ""
137 	x = str(x)
138 	if len(x) < 10:
139 		x = "0" * (10 - len(x)) + x
140 	return x[:len(x) - 9] + "." + x[-9:]
141 
142 def InsertOptionAfter(cmd, option, after):
143 	try:
144 		pos = cmd.index(after)
145 		cmd.insert(pos + 1, option)
146 	except:
147 		cmd.append(option)
148 
149 def CreateWorkList(cmd, pipe_to, output_dir, cpus, time_ranges_by_cpu):
150 	max_len = len(str(cpus[-1]))
151 	cpu_dir_fmt = f"cpu-%.{max_len}u"
152 	worklist = []
153 	pos = 0
154 	for cpu in cpus:
155 		if cpu >= 0:
156 			cpu_dir = os.path.join(output_dir, cpu_dir_fmt % cpu)
157 			cpu_option = f"--cpu={cpu}"
158 		else:
159 			cpu_dir = output_dir
160 			cpu_option = None
161 
162 		tr_dir_fmt = "time-range"
163 
164 		if len(time_ranges_by_cpu) > 1:
165 			time_ranges = time_ranges_by_cpu[pos]
166 			tr_dir_fmt += f"-{pos}"
167 			pos += 1
168 		else:
169 			time_ranges = time_ranges_by_cpu[0]
170 
171 		max_len = len(str(len(time_ranges)))
172 		tr_dir_fmt += f"-%.{max_len}u"
173 
174 		i = 0
175 		for r in time_ranges:
176 			if r == [None, None]:
177 				time_option = None
178 				work_output_dir = cpu_dir
179 			else:
180 				time_option = "--time=" + NanoSecsToSecsStr(r[0]) + "," + NanoSecsToSecsStr(r[1])
181 				work_output_dir = os.path.join(cpu_dir, tr_dir_fmt % i)
182 				i += 1
183 			work_cmd = list(cmd)
184 			if time_option != None:
185 				InsertOptionAfter(work_cmd, time_option, "script")
186 			if cpu_option != None:
187 				InsertOptionAfter(work_cmd, cpu_option, "script")
188 			w = Work(work_cmd, pipe_to, work_output_dir)
189 			worklist.append(w)
190 	return worklist
191 
192 def DoRunWork(worklist, nr_jobs, verbosity):
193 	nr_to_do = len(worklist)
194 	not_started = list(worklist)
195 	running = []
196 	done = []
197 	chg = False
198 	while True:
199 		nr_done = len(done)
200 		if chg and verbosity.normal:
201 			nr_run = len(running)
202 			print(f"\rThere are {nr_to_do} jobs: {nr_done} completed, {nr_run} running", flush=True, end=" ")
203 			if verbosity.verbose:
204 				print()
205 			chg = False
206 		if nr_done == nr_to_do:
207 			break
208 		while len(running) < nr_jobs and len(not_started):
209 			w = not_started.pop(0)
210 			running.append(w)
211 			if verbosity.verbose:
212 				print("Starting:", w.Command())
213 			w.Start()
214 			chg = True
215 		if len(running):
216 			time.sleep(0.1)
217 		finished = []
218 		not_finished = []
219 		while len(running):
220 			w = running.pop(0)
221 			r = w.Poll()
222 			if r == None:
223 				not_finished.append(w)
224 				continue
225 			if r == 0:
226 				if verbosity.verbose:
227 					print("Finished:", w.Command())
228 				finished.append(w)
229 				chg = True
230 				continue
231 			if verbosity.normal and not verbosity.verbose:
232 				print()
233 			print("Job failed!\n    return code:", r, "\n    command:    ", w.Command())
234 			if w.pipe_to:
235 				print("    piped to:   ", w.pipe_to)
236 			print("Killing outstanding jobs")
237 			KillWork(not_finished, verbosity)
238 			KillWork(running, verbosity)
239 			return False
240 		running = not_finished
241 		done += finished
242 	errorlist = []
243 	for w in worklist:
244 		errorlist += w.Errors()
245 	if len(errorlist):
246 		print("Errors:")
247 		for e in errorlist:
248 			print(e)
249 	elif verbosity.normal:
250 		print("\r"," "*50, "\rAll jobs finished successfully", flush=True)
251 	return True
252 
253 def RunWork(worklist, nr_jobs=NumberOfCPUs(), verbosity=Verbosity()):
254 	try:
255 		return DoRunWork(worklist, nr_jobs, verbosity)
256 	except:
257 		for w in worklist:
258 			w.Kill()
259 		raise
260 	return True
261 
262 def ReadHeader(perf, file_name):
263 	return subprocess.Popen([perf, "script", "--header-only", "--input", file_name], stdout=subprocess.PIPE).stdout.read().decode("utf-8")
264 
265 def ParseHeader(hdr):
266 	result = {}
267 	lines = hdr.split("\n")
268 	for line in lines:
269 		if ":" in line and line[0] == "#":
270 			pos = line.index(":")
271 			name = line[1:pos-1].strip()
272 			value = line[pos+1:].strip()
273 			if name in result:
274 				orig_name = name
275 				nr = 2
276 				while True:
277 					name = f"{orig_name} {nr}"
278 					if name not in result:
279 						break
280 					nr += 1
281 			result[name] = value
282 	return result
283 
284 def HeaderField(hdr_dict, hdr_fld):
285 	if hdr_fld not in hdr_dict:
286 		raise Exception(f"'{hdr_fld}' missing from header information")
287 	return hdr_dict[hdr_fld]
288 
289 # Represent the position of an option within a command string
290 # and provide the option value and/or remove the option
291 class OptPos():
292 
293 	def Init(self, opt_element=-1, value_element=-1, opt_pos=-1, value_pos=-1, error=None):
294 		self.opt_element = opt_element		# list element that contains option
295 		self.value_element = value_element	# list element that contains option value
296 		self.opt_pos = opt_pos			# string position of option
297 		self.value_pos = value_pos		# string position of value
298 		self.error = error			# error message string
299 
300 	def __init__(self, args, short_name, long_name, default=None):
301 		self.args = list(args)
302 		self.default = default
303 		n = 2 + len(long_name)
304 		m = len(short_name)
305 		pos = -1
306 		for opt in args:
307 			pos += 1
308 			if m and opt[:2] == f"-{short_name}":
309 				if len(opt) == 2:
310 					if pos + 1 < len(args):
311 						self.Init(pos, pos + 1, 0, 0)
312 					else:
313 						self.Init(error = f"-{short_name} option missing value")
314 				else:
315 					self.Init(pos, pos, 0, 2)
316 				return
317 			if opt[:n] == f"--{long_name}":
318 				if len(opt) == n:
319 					if pos + 1 < len(args):
320 						self.Init(pos, pos + 1, 0, 0)
321 					else:
322 						self.Init(error = f"--{long_name} option missing value")
323 				elif opt[n] == "=":
324 					self.Init(pos, pos, 0, n + 1)
325 				else:
326 					self.Init(error = f"--{long_name} option expected '='")
327 				return
328 			if m and opt[:1] == "-" and opt[:2] != "--" and short_name in opt:
329 				ipos = opt.index(short_name)
330 				if "-" in opt[1:]:
331 					hpos = opt[1:].index("-")
332 					if hpos < ipos:
333 						continue
334 				if ipos + 1 == len(opt):
335 					if pos + 1 < len(args):
336 						self.Init(pos, pos + 1, ipos, 0)
337 					else:
338 						self.Init(error = f"-{short_name} option missing value")
339 				else:
340 					self.Init(pos, pos, ipos, ipos + 1)
341 				return
342 		self.Init()
343 
344 	def Value(self):
345 		if self.opt_element >= 0:
346 			if self.opt_element != self.value_element:
347 				return self.args[self.value_element]
348 			else:
349 				return self.args[self.value_element][self.value_pos:]
350 		return self.default
351 
352 	def Remove(self, args):
353 		if self.opt_element == -1:
354 			return
355 		if self.opt_element != self.value_element:
356 			del args[self.value_element]
357 		if self.opt_pos:
358 			args[self.opt_element] = args[self.opt_element][:self.opt_pos]
359 		else:
360 			del args[self.opt_element]
361 
362 def DetermineInputFileName(cmd):
363 	p = OptPos(cmd, "i", "input", "perf.data")
364 	if p.error:
365 		raise Exception(f"perf command {p.error}")
366 	file_name = p.Value()
367 	if not os.path.exists(file_name):
368 		raise Exception(f"perf command input file '{file_name}' not found")
369 	return file_name
370 
371 def ReadOption(args, short_name, long_name, err_prefix, remove=False):
372 	p = OptPos(args, short_name, long_name)
373 	if p.error:
374 		raise Exception(f"{err_prefix}{p.error}")
375 	value = p.Value()
376 	if remove:
377 		p.Remove(args)
378 	return value
379 
380 def ExtractOption(args, short_name, long_name, err_prefix):
381 	return ReadOption(args, short_name, long_name, err_prefix, True)
382 
383 def ReadPerfOption(args, short_name, long_name):
384 	return ReadOption(args, short_name, long_name, "perf command ")
385 
386 def ExtractPerfOption(args, short_name, long_name):
387 	return ExtractOption(args, short_name, long_name, "perf command ")
388 
389 def PerfDoubleQuickCommands(cmd, file_name):
390 	cpu_str = ReadPerfOption(cmd, "C", "cpu")
391 	time_str = ReadPerfOption(cmd, "", "time")
392 	# Use double-quick sampling to determine trace data density
393 	times_cmd = ["perf", "script", "--ns", "--input", file_name, "--itrace=qqi"]
394 	if cpu_str != None and cpu_str != "":
395 		times_cmd.append(f"--cpu={cpu_str}")
396 	if time_str != None and time_str != "":
397 		times_cmd.append(f"--time={time_str}")
398 	cnts_cmd = list(times_cmd)
399 	cnts_cmd.append("-Fcpu")
400 	times_cmd.append("-Fcpu,time")
401 	return cnts_cmd, times_cmd
402 
403 class CPUTimeRange():
404 	def __init__(self, cpu):
405 		self.cpu = cpu
406 		self.sample_cnt = 0
407 		self.time_ranges = None
408 		self.interval = 0
409 		self.interval_remaining = 0
410 		self.remaining = 0
411 		self.tr_pos = 0
412 
413 def CalcTimeRangesByCPU(line, cpu, cpu_time_ranges, max_time):
414 	cpu_time_range = cpu_time_ranges[cpu]
415 	cpu_time_range.remaining -= 1
416 	cpu_time_range.interval_remaining -= 1
417 	if cpu_time_range.remaining == 0:
418 		cpu_time_range.time_ranges[cpu_time_range.tr_pos][1] = max_time
419 		return
420 	if cpu_time_range.interval_remaining == 0:
421 		time = TimeVal(line[1][:-1], 0)
422 		time_ranges = cpu_time_range.time_ranges
423 		time_ranges[cpu_time_range.tr_pos][1] = time - 1
424 		time_ranges.append([time, max_time])
425 		cpu_time_range.tr_pos += 1
426 		cpu_time_range.interval_remaining = cpu_time_range.interval
427 
428 def CountSamplesByCPU(line, cpu, cpu_time_ranges):
429 	try:
430 		cpu_time_ranges[cpu].sample_cnt += 1
431 	except:
432 		print("exception")
433 		print("cpu", cpu)
434 		print("len(cpu_time_ranges)", len(cpu_time_ranges))
435 		raise
436 
437 def ProcessCommandOutputLines(cmd, per_cpu, fn, *x):
438 	# Assume CPU number is at beginning of line and enclosed by []
439 	pat = re.compile(r"\s*\[[0-9]+\]")
440 	p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
441 	while True:
442 		line = p.stdout.readline()
443 		if line:
444 			line = line.decode("utf-8")
445 			if pat.match(line):
446 				line = line.split()
447 				if per_cpu:
448 					# Assumes CPU number is enclosed by []
449 					cpu = int(line[0][1:-1])
450 				else:
451 					cpu = 0
452 				fn(line, cpu, *x)
453 		else:
454 			break
455 	p.wait()
456 
457 def IntersectTimeRanges(new_time_ranges, time_ranges):
458 	pos = 0
459 	new_pos = 0
460 	# Can assume len(time_ranges) != 0 and len(new_time_ranges) != 0
461 	# Note also, there *must* be at least one intersection.
462 	while pos < len(time_ranges) and new_pos < len(new_time_ranges):
463 		# new end < old start => no intersection, remove new
464 		if new_time_ranges[new_pos][1] < time_ranges[pos][0]:
465 			del new_time_ranges[new_pos]
466 			continue
467 		# new start > old end => no intersection, check next
468 		if new_time_ranges[new_pos][0] > time_ranges[pos][1]:
469 			pos += 1
470 			if pos < len(time_ranges):
471 				continue
472 			# no next, so remove remaining
473 			while new_pos < len(new_time_ranges):
474 				del new_time_ranges[new_pos]
475 			return
476 		# Found an intersection
477 		# new start < old start => adjust new start = old start
478 		if new_time_ranges[new_pos][0] < time_ranges[pos][0]:
479 			new_time_ranges[new_pos][0] = time_ranges[pos][0]
480 		# new end > old end => keep the overlap, insert the remainder
481 		if new_time_ranges[new_pos][1] > time_ranges[pos][1]:
482 			r = [ time_ranges[pos][1] + 1, new_time_ranges[new_pos][1] ]
483 			new_time_ranges[new_pos][1] = time_ranges[pos][1]
484 			new_pos += 1
485 			new_time_ranges.insert(new_pos, r)
486 			continue
487 		# new [start, end] is within old [start, end]
488 		new_pos += 1
489 
490 def SplitTimeRangesByTraceDataDensity(time_ranges, cpus, nr, cmd, file_name, per_cpu, min_size, min_interval, verbosity):
491 	if verbosity.normal:
492 		print("\rAnalyzing...", flush=True, end=" ")
493 		if verbosity.verbose:
494 			print()
495 	cnts_cmd, times_cmd = PerfDoubleQuickCommands(cmd, file_name)
496 
497 	nr_cpus = cpus[-1] + 1 if per_cpu else 1
498 	if per_cpu:
499 		nr_cpus = cpus[-1] + 1
500 		cpu_time_ranges = [ CPUTimeRange(cpu) for cpu in range(nr_cpus) ]
501 	else:
502 		nr_cpus = 1
503 		cpu_time_ranges = [ CPUTimeRange(-1) ]
504 
505 	if verbosity.debug:
506 		print("nr_cpus", nr_cpus)
507 		print("cnts_cmd", cnts_cmd)
508 		print("times_cmd", times_cmd)
509 
510 	# Count the number of "double quick" samples per CPU
511 	ProcessCommandOutputLines(cnts_cmd, per_cpu, CountSamplesByCPU, cpu_time_ranges)
512 
513 	tot = 0
514 	mx = 0
515 	for cpu_time_range in cpu_time_ranges:
516 		cnt = cpu_time_range.sample_cnt
517 		tot += cnt
518 		if cnt > mx:
519 			mx = cnt
520 		if verbosity.debug:
521 			print("cpu:", cpu_time_range.cpu, "sample_cnt", cnt)
522 
523 	if min_size < 1:
524 		min_size = 1
525 
526 	if mx < min_size:
527 		# Too little data to be worth splitting
528 		if verbosity.debug:
529 			print("Too little data to split by time")
530 		if nr == 0:
531 			nr = 1
532 		return [ SplitTimeRangesIntoN(time_ranges, nr, min_interval) ]
533 
534 	if nr:
535 		divisor = nr
536 		min_size = 1
537 	else:
538 		divisor = NumberOfCPUs()
539 
540 	interval = int(round(tot / divisor, 0))
541 	if interval < min_size:
542 		interval = min_size
543 
544 	if verbosity.debug:
545 		print("divisor", divisor)
546 		print("min_size", min_size)
547 		print("interval", interval)
548 
549 	min_time = time_ranges[0][0]
550 	max_time = time_ranges[-1][1]
551 
552 	for cpu_time_range in cpu_time_ranges:
553 		cnt = cpu_time_range.sample_cnt
554 		if cnt == 0:
555 			cpu_time_range.time_ranges = copy.deepcopy(time_ranges)
556 			continue
557 		# Adjust target interval for CPU to give approximately equal interval sizes
558 		# Determine number of intervals, rounding to nearest integer
559 		n = int(round(cnt / interval, 0))
560 		if n < 1:
561 			n = 1
562 		# Determine interval size, rounding up
563 		d, m = divmod(cnt, n)
564 		if m:
565 			d += 1
566 		cpu_time_range.interval = d
567 		cpu_time_range.interval_remaining = d
568 		cpu_time_range.remaining = cnt
569 		# Init. time ranges for each CPU with the start time
570 		cpu_time_range.time_ranges = [ [min_time, max_time] ]
571 
572 	# Set time ranges so that the same number of "double quick" samples
573 	# will fall into each time range.
574 	ProcessCommandOutputLines(times_cmd, per_cpu, CalcTimeRangesByCPU, cpu_time_ranges, max_time)
575 
576 	for cpu_time_range in cpu_time_ranges:
577 		if cpu_time_range.sample_cnt:
578 			IntersectTimeRanges(cpu_time_range.time_ranges, time_ranges)
579 
580 	return [cpu_time_ranges[cpu].time_ranges for cpu in cpus]
581 
582 def SplitSingleTimeRangeIntoN(time_range, n):
583 	if n <= 1:
584 		return [time_range]
585 	start = time_range[0]
586 	end   = time_range[1]
587 	duration = int((end - start + 1) / n)
588 	if duration < 1:
589 		return [time_range]
590 	time_ranges = []
591 	for i in range(n):
592 		time_ranges.append([start, start + duration - 1])
593 		start += duration
594 	time_ranges[-1][1] = end
595 	return time_ranges
596 
597 def TimeRangeDuration(r):
598 	return r[1] - r[0] + 1
599 
600 def TotalDuration(time_ranges):
601 	duration = 0
602 	for r in time_ranges:
603 		duration += TimeRangeDuration(r)
604 	return duration
605 
606 def SplitTimeRangesByInterval(time_ranges, interval):
607 	new_ranges = []
608 	for r in time_ranges:
609 		duration = TimeRangeDuration(r)
610 		n = duration / interval
611 		n = int(round(n, 0))
612 		new_ranges += SplitSingleTimeRangeIntoN(r, n)
613 	return new_ranges
614 
615 def SplitTimeRangesIntoN(time_ranges, n, min_interval):
616 	if n <= len(time_ranges):
617 		return time_ranges
618 	duration = TotalDuration(time_ranges)
619 	interval = duration / n
620 	if interval < min_interval:
621 		interval = min_interval
622 	return SplitTimeRangesByInterval(time_ranges, interval)
623 
624 def RecombineTimeRanges(tr):
625 	new_tr = copy.deepcopy(tr)
626 	n = len(new_tr)
627 	i = 1
628 	while i < len(new_tr):
629 		# if prev end + 1 == cur start, combine them
630 		if new_tr[i - 1][1] + 1 == new_tr[i][0]:
631 			new_tr[i][0] = new_tr[i - 1][0]
632 			del new_tr[i - 1]
633 		else:
634 			i += 1
635 	return new_tr
636 
637 def OpenTimeRangeEnds(time_ranges, min_time, max_time):
638 	if time_ranges[0][0] <= min_time:
639 		time_ranges[0][0] = None
640 	if time_ranges[-1][1] >= max_time:
641 		time_ranges[-1][1] = None
642 
643 def BadTimeStr(time_str):
644 	raise Exception(f"perf command bad time option: '{time_str}'\nCheck also 'time of first sample' and 'time of last sample' in perf script --header-only")
645 
646 def ValidateTimeRanges(time_ranges, time_str):
647 	n = len(time_ranges)
648 	for i in range(n):
649 		start = time_ranges[i][0]
650 		end   = time_ranges[i][1]
651 		if i != 0 and start <= time_ranges[i - 1][1]:
652 			BadTimeStr(time_str)
653 		if start > end:
654 			BadTimeStr(time_str)
655 
656 def TimeVal(s, dflt):
657 	s = s.strip()
658 	if s == "":
659 		return dflt
660 	a = s.split(".")
661 	if len(a) > 2:
662 		raise Exception(f"Bad time value'{s}'")
663 	x = int(a[0])
664 	if x < 0:
665 		raise Exception("Negative time not allowed")
666 	x *= 1000000000
667 	if len(a) > 1:
668 		x += int((a[1] + "000000000")[:9])
669 	return x
670 
671 def BadCPUStr(cpu_str):
672 	raise Exception(f"perf command bad cpu option: '{cpu_str}'\nCheck also 'nrcpus avail' in perf script --header-only")
673 
674 def ParseTimeStr(time_str, min_time, max_time):
675 	if time_str == None or time_str == "":
676 		return [[min_time, max_time]]
677 	time_ranges = []
678 	for r in time_str.split():
679 		a = r.split(",")
680 		if len(a) != 2:
681 			BadTimeStr(time_str)
682 		try:
683 			start = TimeVal(a[0], min_time)
684 			end   = TimeVal(a[1], max_time)
685 		except:
686 			BadTimeStr(time_str)
687 		time_ranges.append([start, end])
688 	ValidateTimeRanges(time_ranges, time_str)
689 	return time_ranges
690 
691 def ParseCPUStr(cpu_str, nr_cpus):
692 	if cpu_str == None or cpu_str == "":
693 		return [-1]
694 	cpus = []
695 	for r in cpu_str.split(","):
696 		a = r.split("-")
697 		if len(a) < 1 or len(a) > 2:
698 			BadCPUStr(cpu_str)
699 		try:
700 			start = int(a[0].strip())
701 			if len(a) > 1:
702 				end = int(a[1].strip())
703 			else:
704 				end = start
705 		except:
706 			BadCPUStr(cpu_str)
707 		if start < 0 or end < 0 or end < start or end >= nr_cpus:
708 			BadCPUStr(cpu_str)
709 		cpus.extend(range(start, end + 1))
710 	cpus = list(set(cpus)) # Remove duplicates
711 	cpus.sort()
712 	return cpus
713 
714 class ParallelPerf():
715 
716 	def __init__(self, a):
717 		for arg_name in vars(a):
718 			setattr(self, arg_name, getattr(a, arg_name))
719 		self.orig_nr = self.nr
720 		self.orig_cmd = list(self.cmd)
721 		self.perf = self.cmd[0]
722 		if os.path.exists(self.output_dir):
723 			raise Exception(f"Output '{self.output_dir}' already exists")
724 		if self.jobs < 0 or self.nr < 0 or self.interval < 0:
725 			raise Exception("Bad options (negative values): try -h option for help")
726 		if self.nr != 0 and self.interval != 0:
727 			raise Exception("Cannot specify number of time subdivisions and time interval")
728 		if self.jobs == 0:
729 			self.jobs = NumberOfCPUs()
730 		if self.nr == 0 and self.interval == 0:
731 			if self.per_cpu:
732 				self.nr = 1
733 			else:
734 				self.nr = self.jobs
735 
736 	def Init(self):
737 		if self.verbosity.debug:
738 			print("cmd", self.cmd)
739 		self.file_name = DetermineInputFileName(self.cmd)
740 		self.hdr = ReadHeader(self.perf, self.file_name)
741 		self.hdr_dict = ParseHeader(self.hdr)
742 		self.cmd_line = HeaderField(self.hdr_dict, "cmdline")
743 
744 	def ExtractTimeInfo(self):
745 		self.min_time = TimeVal(HeaderField(self.hdr_dict, "time of first sample"), 0)
746 		self.max_time = TimeVal(HeaderField(self.hdr_dict, "time of last sample"), 0)
747 		self.time_str = ExtractPerfOption(self.cmd, "", "time")
748 		self.time_ranges = ParseTimeStr(self.time_str, self.min_time, self.max_time)
749 		if self.verbosity.debug:
750 			print("time_ranges", self.time_ranges)
751 
752 	def ExtractCPUInfo(self):
753 		if self.per_cpu:
754 			nr_cpus = int(HeaderField(self.hdr_dict, "nrcpus avail"))
755 			self.cpu_str = ExtractPerfOption(self.cmd, "C", "cpu")
756 			if self.cpu_str == None or self.cpu_str == "":
757 				self.cpus = [ x for x in range(nr_cpus) ]
758 			else:
759 				self.cpus = ParseCPUStr(self.cpu_str, nr_cpus)
760 		else:
761 			self.cpu_str = None
762 			self.cpus = [-1]
763 		if self.verbosity.debug:
764 			print("cpus", self.cpus)
765 
766 	def IsIntelPT(self):
767 		return self.cmd_line.find("intel_pt") >= 0
768 
769 	def SplitTimeRanges(self):
770 		if self.IsIntelPT() and self.interval == 0:
771 			self.split_time_ranges_for_each_cpu = \
772 				SplitTimeRangesByTraceDataDensity(self.time_ranges, self.cpus, self.orig_nr,
773 								  self.orig_cmd, self.file_name, self.per_cpu,
774 								  self.min_size, self.min_interval, self.verbosity)
775 		elif self.nr:
776 			self.split_time_ranges_for_each_cpu = [ SplitTimeRangesIntoN(self.time_ranges, self.nr, self.min_interval) ]
777 		else:
778 			self.split_time_ranges_for_each_cpu = [ SplitTimeRangesByInterval(self.time_ranges, self.interval) ]
779 
780 	def CheckTimeRanges(self):
781 		for tr in self.split_time_ranges_for_each_cpu:
782 			# Re-combined time ranges should be the same
783 			new_tr = RecombineTimeRanges(tr)
784 			if new_tr != self.time_ranges:
785 				if self.verbosity.debug:
786 					print("tr", tr)
787 					print("new_tr", new_tr)
788 				raise Exception("Self test failed!")
789 
790 	def OpenTimeRangeEnds(self):
791 		for time_ranges in self.split_time_ranges_for_each_cpu:
792 			OpenTimeRangeEnds(time_ranges, self.min_time, self.max_time)
793 
794 	def CreateWorkList(self):
795 		self.worklist = CreateWorkList(self.cmd, self.pipe_to, self.output_dir, self.cpus, self.split_time_ranges_for_each_cpu)
796 
797 	def PerfDataRecordedPerCPU(self):
798 		if "--per-thread" in self.cmd_line.split():
799 			return False
800 		return True
801 
802 	def DefaultToPerCPU(self):
803 		# --no-per-cpu option takes precedence
804 		if self.no_per_cpu:
805 			return False
806 		if not self.PerfDataRecordedPerCPU():
807 			return False
808 		# Default to per-cpu for Intel PT data that was recorded per-cpu,
809 		# because decoding can be done for each CPU separately.
810 		if self.IsIntelPT():
811 			return True
812 		return False
813 
814 	def Config(self):
815 		self.Init()
816 		self.ExtractTimeInfo()
817 		if not self.per_cpu:
818 			self.per_cpu = self.DefaultToPerCPU()
819 		if self.verbosity.debug:
820 			print("per_cpu", self.per_cpu)
821 		self.ExtractCPUInfo()
822 		self.SplitTimeRanges()
823 		if self.verbosity.self_test:
824 			self.CheckTimeRanges()
825 		# Prefer open-ended time range to starting / ending with min_time / max_time resp.
826 		self.OpenTimeRangeEnds()
827 		self.CreateWorkList()
828 
829 	def Run(self):
830 		if self.dry_run:
831 			print(len(self.worklist),"jobs:")
832 			for w in self.worklist:
833 				print(w.Command())
834 			return True
835 		result = RunWork(self.worklist, self.jobs, verbosity=self.verbosity)
836 		if self.verbosity.verbose:
837 			print(glb_prog_name, "done")
838 		return result
839 
840 def RunParallelPerf(a):
841 	pp = ParallelPerf(a)
842 	pp.Config()
843 	return pp.Run()
844 
845 def Main(args):
846 	ap = argparse.ArgumentParser(
847 		prog=glb_prog_name, formatter_class = argparse.RawDescriptionHelpFormatter,
848 		description =
849 """
850 Run a perf script command multiple times in parallel, using perf script options
851 --cpu and --time so that each job processes a different chunk of the data.
852 """,
853 		epilog =
854 """
855 Follow the options by '--' and then the perf script command e.g.
856 
857 	$ perf record -a -- sleep 10
858 	$ parallel-perf.py --nr=4 -- perf script --ns
859 	All jobs finished successfully
860 	$ tree parallel-perf-output/
861 	parallel-perf-output/
862 	├── time-range-0
863 	│   ├── cmd.txt
864 	│   └── out.txt
865 	├── time-range-1
866 	│   ├── cmd.txt
867 	│   └── out.txt
868 	├── time-range-2
869 	│   ├── cmd.txt
870 	│   └── out.txt
871 	└── time-range-3
872 	    ├── cmd.txt
873 	    └── out.txt
874 	$ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
875 	parallel-perf-output/time-range-0/cmd.txt:perf script --time=,9466.504461499 --ns
876 	parallel-perf-output/time-range-1/cmd.txt:perf script --time=9466.504461500,9469.005396999 --ns
877 	parallel-perf-output/time-range-2/cmd.txt:perf script --time=9469.005397000,9471.506332499 --ns
878 	parallel-perf-output/time-range-3/cmd.txt:perf script --time=9471.506332500, --ns
879 
880 Any perf script command can be used, including the use of perf script options
881 --dlfilter and --script, so that the benefit of running parallel jobs
882 naturally extends to them also.
883 
884 If option --pipe-to is used, standard output is first piped through that
885 command. Beware, if the command fails (e.g. grep with no matches), it will be
886 considered a fatal error.
887 
888 Final standard output is redirected to files named out.txt in separate
889 subdirectories under the output directory. Similarly, standard error is
890 written to files named err.txt. In addition, files named cmd.txt contain the
891 corresponding perf script command. After processing, err.txt files are removed
892 if they are empty.
893 
894 If any job exits with a non-zero exit code, then all jobs are killed and no
895 more are started. A message is printed if any job results in a non-empty
896 err.txt file.
897 
898 There is a separate output subdirectory for each time range. If the --per-cpu
899 option is used, these are further grouped under cpu-n subdirectories, e.g.
900 
901 	$ parallel-perf.py --per-cpu --nr=2 -- perf script --ns --cpu=0,1
902 	All jobs finished successfully
903 	$ tree parallel-perf-output
904 	parallel-perf-output/
905 	├── cpu-0
906 	│   ├── time-range-0
907 	│   │   ├── cmd.txt
908 	│   │   └── out.txt
909 	│   └── time-range-1
910 	│       ├── cmd.txt
911 	│       └── out.txt
912 	└── cpu-1
913 	    ├── time-range-0
914 	    │   ├── cmd.txt
915 	    │   └── out.txt
916 	    └── time-range-1
917 	        ├── cmd.txt
918 	        └── out.txt
919 	$ find parallel-perf-output -name cmd.txt | sort | xargs grep -H .
920 	parallel-perf-output/cpu-0/time-range-0/cmd.txt:perf script --cpu=0 --time=,9469.005396999 --ns
921 	parallel-perf-output/cpu-0/time-range-1/cmd.txt:perf script --cpu=0 --time=9469.005397000, --ns
922 	parallel-perf-output/cpu-1/time-range-0/cmd.txt:perf script --cpu=1 --time=,9469.005396999 --ns
923 	parallel-perf-output/cpu-1/time-range-1/cmd.txt:perf script --cpu=1 --time=9469.005397000, --ns
924 
925 Subdivisions of time range, and cpus if the --per-cpu option is used, are
926 expressed by the --time and --cpu perf script options respectively. If the
927 supplied perf script command has a --time option, then that time range is
928 subdivided, otherwise the time range given by 'time of first sample' to
929 'time of last sample' is used (refer perf script --header-only). Similarly, the
930 supplied perf script command may provide a --cpu option, and only those CPUs
931 will be processed.
932 
933 To prevent time intervals becoming too small, the --min-interval option can
934 be used.
935 
936 Note there is special handling for processing Intel PT traces. If an interval is
937 not specified and the perf record command contained the intel_pt event, then the
938 time range will be subdivided in order to produce subdivisions that contain
939 approximately the same amount of trace data. That is accomplished by counting
940 double-quick (--itrace=qqi) samples, and choosing time ranges that encompass
941 approximately the same number of samples. In that case, time ranges may not be
942 the same for each CPU processed. For Intel PT, --per-cpu is the default, but
943 that can be overridden by --no-per-cpu. Note, for Intel PT, double-quick
944 decoding produces 1 sample for each PSB synchronization packet, which in turn
945 come after a certain number of bytes output, determined by psb_period (refer
946 perf Intel PT documentation). The minimum number of double-quick samples that
947 will define a time range can be set by the --min_size option, which defaults to
948 64.
949 """)
950 	ap.add_argument("-o", "--output-dir", default="parallel-perf-output", help="output directory (default 'parallel-perf-output')")
951 	ap.add_argument("-j", "--jobs", type=int, default=0, help="maximum number of jobs to run in parallel at one time (default is the number of CPUs)")
952 	ap.add_argument("-n", "--nr", type=int, default=0, help="number of time subdivisions (default is the number of jobs)")
953 	ap.add_argument("-i", "--interval", type=float, default=0, help="subdivide the time range using this time interval (in seconds e.g. 0.1 for a tenth of a second)")
954 	ap.add_argument("-c", "--per-cpu", action="store_true", help="process data for each CPU in parallel")
955 	ap.add_argument("-m", "--min-interval", type=float, default=glb_min_interval, help=f"minimum interval (default {glb_min_interval} seconds)")
956 	ap.add_argument("-p", "--pipe-to", help="command to pipe output to (optional)")
957 	ap.add_argument("-N", "--no-per-cpu", action="store_true", help="do not process data for each CPU in parallel")
958 	ap.add_argument("-b", "--min_size", type=int, default=glb_min_samples, help="minimum data size (for Intel PT in PSBs)")
959 	ap.add_argument("-D", "--dry-run", action="store_true", help="do not run any jobs, just show the perf script commands")
960 	ap.add_argument("-q", "--quiet", action="store_true", help="do not print any messages except errors")
961 	ap.add_argument("-v", "--verbose", action="store_true", help="print more messages")
962 	ap.add_argument("-d", "--debug", action="store_true", help="print debugging messages")
963 	cmd_line = list(args)
964 	try:
965 		split_pos = cmd_line.index("--")
966 		cmd = cmd_line[split_pos + 1:]
967 		args = cmd_line[:split_pos]
968 	except:
969 		cmd = None
970 		args = cmd_line
971 	a = ap.parse_args(args=args[1:])
972 	a.cmd = cmd
973 	a.verbosity = Verbosity(a.quiet, a.verbose, a.debug)
974 	try:
975 		if a.cmd == None:
976 			if len(args) <= 1:
977 				ap.print_help()
978 				return True
979 			raise Exception("Command line must contain '--' before perf command")
980 		return RunParallelPerf(a)
981 	except Exception as e:
982 		print("Fatal error: ", str(e))
983 		if a.debug:
984 			raise
985 		return False
986 
987 if __name__ == "__main__":
988 	if not Main(sys.argv):
989 		sys.exit(1)
990