1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3# 4# Run a perf script command multiple times in parallel, using perf script 5# options --cpu and --time so that each job processes a different chunk 6# of the data. 7# 8# Copyright (c) 2024, Intel Corporation. 9 10import subprocess 11import argparse 12import pathlib 13import shlex 14import time 15import copy 16import sys 17import os 18import re 19 20glb_prog_name = "parallel-perf.py" 21glb_min_interval = 10.0 22glb_min_samples = 64 23 24class Verbosity(): 25 26 def __init__(self, quiet=False, verbose=False, debug=False): 27 self.normal = True 28 self.verbose = verbose 29 self.debug = debug 30 self.self_test = True 31 if self.debug: 32 self.verbose = True 33 if self.verbose: 34 quiet = False 35 if quiet: 36 self.normal = False 37 38# Manage work (Start/Wait/Kill), as represented by a subprocess.Popen command 39class Work(): 40 41 def __init__(self, cmd, pipe_to, output_dir="."): 42 self.popen = None 43 self.consumer = None 44 self.cmd = cmd 45 self.pipe_to = pipe_to 46 self.output_dir = output_dir 47 self.cmdout_name = f"{output_dir}/cmd.txt" 48 self.stdout_name = f"{output_dir}/out.txt" 49 self.stderr_name = f"{output_dir}/err.txt" 50 51 def Command(self): 52 sh_cmd = [ shlex.quote(x) for x in self.cmd ] 53 return " ".join(self.cmd) 54 55 def Stdout(self): 56 return open(self.stdout_name, "w") 57 58 def Stderr(self): 59 return open(self.stderr_name, "w") 60 61 def CreateOutputDir(self): 62 pathlib.Path(self.output_dir).mkdir(parents=True, exist_ok=True) 63 64 def Start(self): 65 if self.popen: 66 return 67 self.CreateOutputDir() 68 with open(self.cmdout_name, "w") as f: 69 f.write(self.Command()) 70 f.write("\n") 71 stdout = self.Stdout() 72 stderr = self.Stderr() 73 if self.pipe_to: 74 self.popen = subprocess.Popen(self.cmd, stdout=subprocess.PIPE, stderr=stderr) 75 args = shlex.split(self.pipe_to) 76 self.consumer = subprocess.Popen(args, stdin=self.popen.stdout, stdout=stdout, stderr=stderr) 77 else: 78 self.popen = subprocess.Popen(self.cmd, stdout=stdout, stderr=stderr) 79 80 def RemoveEmptyErrFile(self): 81 if os.path.exists(self.stderr_name): 82 if os.path.getsize(self.stderr_name) == 0: 83 os.unlink(self.stderr_name) 84 85 def Errors(self): 86 if os.path.exists(self.stderr_name): 87 if os.path.getsize(self.stderr_name) != 0: 88 return [ f"Non-empty error file {self.stderr_name}" ] 89 return [] 90 91 def TidyUp(self): 92 self.RemoveEmptyErrFile() 93 94 def RawPollWait(self, p, wait): 95 if wait: 96 return p.wait() 97 return p.poll() 98 99 def Poll(self, wait=False): 100 if not self.popen: 101 return None 102 result = self.RawPollWait(self.popen, wait) 103 if self.consumer: 104 res = result 105 result = self.RawPollWait(self.consumer, wait) 106 if result != None and res == None: 107 self.popen.kill() 108 result = None 109 elif result == 0 and res != None and res != 0: 110 result = res 111 if result != None: 112 self.TidyUp() 113 return result 114 115 def Wait(self): 116 return self.Poll(wait=True) 117 118 def Kill(self): 119 if not self.popen: 120 return 121 self.popen.kill() 122 if self.consumer: 123 self.consumer.kill() 124 125def KillWork(worklist, verbosity): 126 for w in worklist: 127 w.Kill() 128 for w in worklist: 129 w.Wait() 130 131def NumberOfCPUs(): 132 return os.sysconf("SC_NPROCESSORS_ONLN") 133 134def NanoSecsToSecsStr(x): 135 if x == None: 136 return "" 137 x = str(x) 138 if len(x) < 10: 139 x = "0" * (10 - len(x)) + x 140 return x[:len(x) - 9] + "." + x[-9:] 141 142def InsertOptionAfter(cmd, option, after): 143 try: 144 pos = cmd.index(after) 145 cmd.insert(pos + 1, option) 146 except: 147 cmd.append(option) 148 149def CreateWorkList(cmd, pipe_to, output_dir, cpus, time_ranges_by_cpu): 150 max_len = len(str(cpus[-1])) 151 cpu_dir_fmt = f"cpu-%.{max_len}u" 152 worklist = [] 153 pos = 0 154 for cpu in cpus: 155 if cpu >= 0: 156 cpu_dir = os.path.join(output_dir, cpu_dir_fmt % cpu) 157 cpu_option = f"--cpu={cpu}" 158 else: 159 cpu_dir = output_dir 160 cpu_option = None 161 162 tr_dir_fmt = "time-range" 163 164 if len(time_ranges_by_cpu) > 1: 165 time_ranges = time_ranges_by_cpu[pos] 166 tr_dir_fmt += f"-{pos}" 167 pos += 1 168 else: 169 time_ranges = time_ranges_by_cpu[0] 170 171 max_len = len(str(len(time_ranges))) 172 tr_dir_fmt += f"-%.{max_len}u" 173 174 i = 0 175 for r in time_ranges: 176 if r == [None, None]: 177 time_option = None 178 work_output_dir = cpu_dir 179 else: 180 time_option = "--time=" + NanoSecsToSecsStr(r[0]) + "," + NanoSecsToSecsStr(r[1]) 181 work_output_dir = os.path.join(cpu_dir, tr_dir_fmt % i) 182 i += 1 183 work_cmd = list(cmd) 184 if time_option != None: 185 InsertOptionAfter(work_cmd, time_option, "script") 186 if cpu_option != None: 187 InsertOptionAfter(work_cmd, cpu_option, "script") 188 w = Work(work_cmd, pipe_to, work_output_dir) 189 worklist.append(w) 190 return worklist 191 192def DoRunWork(worklist, nr_jobs, verbosity): 193 nr_to_do = len(worklist) 194 not_started = list(worklist) 195 running = [] 196 done = [] 197 chg = False 198 while True: 199 nr_done = len(done) 200 if chg and verbosity.normal: 201 nr_run = len(running) 202 print(f"\rThere are {nr_to_do} jobs: {nr_done} completed, {nr_run} running", flush=True, end=" ") 203 if verbosity.verbose: 204 print() 205 chg = False 206 if nr_done == nr_to_do: 207 break 208 while len(running) < nr_jobs and len(not_started): 209 w = not_started.pop(0) 210 running.append(w) 211 if verbosity.verbose: 212 print("Starting:", w.Command()) 213 w.Start() 214 chg = True 215 if len(running): 216 time.sleep(0.1) 217 finished = [] 218 not_finished = [] 219 while len(running): 220 w = running.pop(0) 221 r = w.Poll() 222 if r == None: 223 not_finished.append(w) 224 continue 225 if r == 0: 226 if verbosity.verbose: 227 print("Finished:", w.Command()) 228 finished.append(w) 229 chg = True 230 continue 231 if verbosity.normal and not verbosity.verbose: 232 print() 233 print("Job failed!\n return code:", r, "\n command: ", w.Command()) 234 if w.pipe_to: 235 print(" piped to: ", w.pipe_to) 236 print("Killing outstanding jobs") 237 KillWork(not_finished, verbosity) 238 KillWork(running, verbosity) 239 return False 240 running = not_finished 241 done += finished 242 errorlist = [] 243 for w in worklist: 244 errorlist += w.Errors() 245 if len(errorlist): 246 print("Errors:") 247 for e in errorlist: 248 print(e) 249 elif verbosity.normal: 250 print("\r"," "*50, "\rAll jobs finished successfully", flush=True) 251 return True 252 253def RunWork(worklist, nr_jobs=NumberOfCPUs(), verbosity=Verbosity()): 254 try: 255 return DoRunWork(worklist, nr_jobs, verbosity) 256 except: 257 for w in worklist: 258 w.Kill() 259 raise 260 return True 261 262def ReadHeader(perf, file_name): 263 return subprocess.Popen([perf, "script", "--header-only", "--input", file_name], stdout=subprocess.PIPE).stdout.read().decode("utf-8") 264 265def ParseHeader(hdr): 266 result = {} 267 lines = hdr.split("\n") 268 for line in lines: 269 if ":" in line and line[0] == "#": 270 pos = line.index(":") 271 name = line[1:pos-1].strip() 272 value = line[pos+1:].strip() 273 if name in result: 274 orig_name = name 275 nr = 2 276 while True: 277 name = f"{orig_name} {nr}" 278 if name not in result: 279 break 280 nr += 1 281 result[name] = value 282 return result 283 284def HeaderField(hdr_dict, hdr_fld): 285 if hdr_fld not in hdr_dict: 286 raise Exception(f"'{hdr_fld}' missing from header information") 287 return hdr_dict[hdr_fld] 288 289# Represent the position of an option within a command string 290# and provide the option value and/or remove the option 291class OptPos(): 292 293 def Init(self, opt_element=-1, value_element=-1, opt_pos=-1, value_pos=-1, error=None): 294 self.opt_element = opt_element # list element that contains option 295 self.value_element = value_element # list element that contains option value 296 self.opt_pos = opt_pos # string position of option 297 self.value_pos = value_pos # string position of value 298 self.error = error # error message string 299 300 def __init__(self, args, short_name, long_name, default=None): 301 self.args = list(args) 302 self.default = default 303 n = 2 + len(long_name) 304 m = len(short_name) 305 pos = -1 306 for opt in args: 307 pos += 1 308 if m and opt[:2] == f"-{short_name}": 309 if len(opt) == 2: 310 if pos + 1 < len(args): 311 self.Init(pos, pos + 1, 0, 0) 312 else: 313 self.Init(error = f"-{short_name} option missing value") 314 else: 315 self.Init(pos, pos, 0, 2) 316 return 317 if opt[:n] == f"--{long_name}": 318 if len(opt) == n: 319 if pos + 1 < len(args): 320 self.Init(pos, pos + 1, 0, 0) 321 else: 322 self.Init(error = f"--{long_name} option missing value") 323 elif opt[n] == "=": 324 self.Init(pos, pos, 0, n + 1) 325 else: 326 self.Init(error = f"--{long_name} option expected '='") 327 return 328 if m and opt[:1] == "-" and opt[:2] != "--" and short_name in opt: 329 ipos = opt.index(short_name) 330 if "-" in opt[1:]: 331 hpos = opt[1:].index("-") 332 if hpos < ipos: 333 continue 334 if ipos + 1 == len(opt): 335 if pos + 1 < len(args): 336 self.Init(pos, pos + 1, ipos, 0) 337 else: 338 self.Init(error = f"-{short_name} option missing value") 339 else: 340 self.Init(pos, pos, ipos, ipos + 1) 341 return 342 self.Init() 343 344 def Value(self): 345 if self.opt_element >= 0: 346 if self.opt_element != self.value_element: 347 return self.args[self.value_element] 348 else: 349 return self.args[self.value_element][self.value_pos:] 350 return self.default 351 352 def Remove(self, args): 353 if self.opt_element == -1: 354 return 355 if self.opt_element != self.value_element: 356 del args[self.value_element] 357 if self.opt_pos: 358 args[self.opt_element] = args[self.opt_element][:self.opt_pos] 359 else: 360 del args[self.opt_element] 361 362def DetermineInputFileName(cmd): 363 p = OptPos(cmd, "i", "input", "perf.data") 364 if p.error: 365 raise Exception(f"perf command {p.error}") 366 file_name = p.Value() 367 if not os.path.exists(file_name): 368 raise Exception(f"perf command input file '{file_name}' not found") 369 return file_name 370 371def ReadOption(args, short_name, long_name, err_prefix, remove=False): 372 p = OptPos(args, short_name, long_name) 373 if p.error: 374 raise Exception(f"{err_prefix}{p.error}") 375 value = p.Value() 376 if remove: 377 p.Remove(args) 378 return value 379 380def ExtractOption(args, short_name, long_name, err_prefix): 381 return ReadOption(args, short_name, long_name, err_prefix, True) 382 383def ReadPerfOption(args, short_name, long_name): 384 return ReadOption(args, short_name, long_name, "perf command ") 385 386def ExtractPerfOption(args, short_name, long_name): 387 return ExtractOption(args, short_name, long_name, "perf command ") 388 389def PerfDoubleQuickCommands(cmd, file_name): 390 cpu_str = ReadPerfOption(cmd, "C", "cpu") 391 time_str = ReadPerfOption(cmd, "", "time") 392 # Use double-quick sampling to determine trace data density 393 times_cmd = ["perf", "script", "--ns", "--input", file_name, "--itrace=qqi"] 394 if cpu_str != None and cpu_str != "": 395 times_cmd.append(f"--cpu={cpu_str}") 396 if time_str != None and time_str != "": 397 times_cmd.append(f"--time={time_str}") 398 cnts_cmd = list(times_cmd) 399 cnts_cmd.append("-Fcpu") 400 times_cmd.append("-Fcpu,time") 401 return cnts_cmd, times_cmd 402 403class CPUTimeRange(): 404 def __init__(self, cpu): 405 self.cpu = cpu 406 self.sample_cnt = 0 407 self.time_ranges = None 408 self.interval = 0 409 self.interval_remaining = 0 410 self.remaining = 0 411 self.tr_pos = 0 412 413def CalcTimeRangesByCPU(line, cpu, cpu_time_ranges, max_time): 414 cpu_time_range = cpu_time_ranges[cpu] 415 cpu_time_range.remaining -= 1 416 cpu_time_range.interval_remaining -= 1 417 if cpu_time_range.remaining == 0: 418 cpu_time_range.time_ranges[cpu_time_range.tr_pos][1] = max_time 419 return 420 if cpu_time_range.interval_remaining == 0: 421 time = TimeVal(line[1][:-1], 0) 422 time_ranges = cpu_time_range.time_ranges 423 time_ranges[cpu_time_range.tr_pos][1] = time - 1 424 time_ranges.append([time, max_time]) 425 cpu_time_range.tr_pos += 1 426 cpu_time_range.interval_remaining = cpu_time_range.interval 427 428def CountSamplesByCPU(line, cpu, cpu_time_ranges): 429 try: 430 cpu_time_ranges[cpu].sample_cnt += 1 431 except: 432 print("exception") 433 print("cpu", cpu) 434 print("len(cpu_time_ranges)", len(cpu_time_ranges)) 435 raise 436 437def ProcessCommandOutputLines(cmd, per_cpu, fn, *x): 438 # Assume CPU number is at beginning of line and enclosed by [] 439 pat = re.compile(r"\s*\[[0-9]+\]") 440 p = subprocess.Popen(cmd, stdout=subprocess.PIPE) 441 while True: 442 line = p.stdout.readline() 443 if line: 444 line = line.decode("utf-8") 445 if pat.match(line): 446 line = line.split() 447 if per_cpu: 448 # Assumes CPU number is enclosed by [] 449 cpu = int(line[0][1:-1]) 450 else: 451 cpu = 0 452 fn(line, cpu, *x) 453 else: 454 break 455 p.wait() 456 457def IntersectTimeRanges(new_time_ranges, time_ranges): 458 pos = 0 459 new_pos = 0 460 # Can assume len(time_ranges) != 0 and len(new_time_ranges) != 0 461 # Note also, there *must* be at least one intersection. 462 while pos < len(time_ranges) and new_pos < len(new_time_ranges): 463 # new end < old start => no intersection, remove new 464 if new_time_ranges[new_pos][1] < time_ranges[pos][0]: 465 del new_time_ranges[new_pos] 466 continue 467 # new start > old end => no intersection, check next 468 if new_time_ranges[new_pos][0] > time_ranges[pos][1]: 469 pos += 1 470 if pos < len(time_ranges): 471 continue 472 # no next, so remove remaining 473 while new_pos < len(new_time_ranges): 474 del new_time_ranges[new_pos] 475 return 476 # Found an intersection 477 # new start < old start => adjust new start = old start 478 if new_time_ranges[new_pos][0] < time_ranges[pos][0]: 479 new_time_ranges[new_pos][0] = time_ranges[pos][0] 480 # new end > old end => keep the overlap, insert the remainder 481 if new_time_ranges[new_pos][1] > time_ranges[pos][1]: 482 r = [ time_ranges[pos][1] + 1, new_time_ranges[new_pos][1] ] 483 new_time_ranges[new_pos][1] = time_ranges[pos][1] 484 new_pos += 1 485 new_time_ranges.insert(new_pos, r) 486 continue 487 # new [start, end] is within old [start, end] 488 new_pos += 1 489 490def SplitTimeRangesByTraceDataDensity(time_ranges, cpus, nr, cmd, file_name, per_cpu, min_size, min_interval, verbosity): 491 if verbosity.normal: 492 print("\rAnalyzing...", flush=True, end=" ") 493 if verbosity.verbose: 494 print() 495 cnts_cmd, times_cmd = PerfDoubleQuickCommands(cmd, file_name) 496 497 nr_cpus = cpus[-1] + 1 if per_cpu else 1 498 if per_cpu: 499 nr_cpus = cpus[-1] + 1 500 cpu_time_ranges = [ CPUTimeRange(cpu) for cpu in range(nr_cpus) ] 501 else: 502 nr_cpus = 1 503 cpu_time_ranges = [ CPUTimeRange(-1) ] 504 505 if verbosity.debug: 506 print("nr_cpus", nr_cpus) 507 print("cnts_cmd", cnts_cmd) 508 print("times_cmd", times_cmd) 509 510 # Count the number of "double quick" samples per CPU 511 ProcessCommandOutputLines(cnts_cmd, per_cpu, CountSamplesByCPU, cpu_time_ranges) 512 513 tot = 0 514 mx = 0 515 for cpu_time_range in cpu_time_ranges: 516 cnt = cpu_time_range.sample_cnt 517 tot += cnt 518 if cnt > mx: 519 mx = cnt 520 if verbosity.debug: 521 print("cpu:", cpu_time_range.cpu, "sample_cnt", cnt) 522 523 if min_size < 1: 524 min_size = 1 525 526 if mx < min_size: 527 # Too little data to be worth splitting 528 if verbosity.debug: 529 print("Too little data to split by time") 530 if nr == 0: 531 nr = 1 532 return [ SplitTimeRangesIntoN(time_ranges, nr, min_interval) ] 533 534 if nr: 535 divisor = nr 536 min_size = 1 537 else: 538 divisor = NumberOfCPUs() 539 540 interval = int(round(tot / divisor, 0)) 541 if interval < min_size: 542 interval = min_size 543 544 if verbosity.debug: 545 print("divisor", divisor) 546 print("min_size", min_size) 547 print("interval", interval) 548 549 min_time = time_ranges[0][0] 550 max_time = time_ranges[-1][1] 551 552 for cpu_time_range in cpu_time_ranges: 553 cnt = cpu_time_range.sample_cnt 554 if cnt == 0: 555 cpu_time_range.time_ranges = copy.deepcopy(time_ranges) 556 continue 557 # Adjust target interval for CPU to give approximately equal interval sizes 558 # Determine number of intervals, rounding to nearest integer 559 n = int(round(cnt / interval, 0)) 560 if n < 1: 561 n = 1 562 # Determine interval size, rounding up 563 d, m = divmod(cnt, n) 564 if m: 565 d += 1 566 cpu_time_range.interval = d 567 cpu_time_range.interval_remaining = d 568 cpu_time_range.remaining = cnt 569 # Init. time ranges for each CPU with the start time 570 cpu_time_range.time_ranges = [ [min_time, max_time] ] 571 572 # Set time ranges so that the same number of "double quick" samples 573 # will fall into each time range. 574 ProcessCommandOutputLines(times_cmd, per_cpu, CalcTimeRangesByCPU, cpu_time_ranges, max_time) 575 576 for cpu_time_range in cpu_time_ranges: 577 if cpu_time_range.sample_cnt: 578 IntersectTimeRanges(cpu_time_range.time_ranges, time_ranges) 579 580 return [cpu_time_ranges[cpu].time_ranges for cpu in cpus] 581 582def SplitSingleTimeRangeIntoN(time_range, n): 583 if n <= 1: 584 return [time_range] 585 start = time_range[0] 586 end = time_range[1] 587 duration = int((end - start + 1) / n) 588 if duration < 1: 589 return [time_range] 590 time_ranges = [] 591 for i in range(n): 592 time_ranges.append([start, start + duration - 1]) 593 start += duration 594 time_ranges[-1][1] = end 595 return time_ranges 596 597def TimeRangeDuration(r): 598 return r[1] - r[0] + 1 599 600def TotalDuration(time_ranges): 601 duration = 0 602 for r in time_ranges: 603 duration += TimeRangeDuration(r) 604 return duration 605 606def SplitTimeRangesByInterval(time_ranges, interval): 607 new_ranges = [] 608 for r in time_ranges: 609 duration = TimeRangeDuration(r) 610 n = duration / interval 611 n = int(round(n, 0)) 612 new_ranges += SplitSingleTimeRangeIntoN(r, n) 613 return new_ranges 614 615def SplitTimeRangesIntoN(time_ranges, n, min_interval): 616 if n <= len(time_ranges): 617 return time_ranges 618 duration = TotalDuration(time_ranges) 619 interval = duration / n 620 if interval < min_interval: 621 interval = min_interval 622 return SplitTimeRangesByInterval(time_ranges, interval) 623 624def RecombineTimeRanges(tr): 625 new_tr = copy.deepcopy(tr) 626 n = len(new_tr) 627 i = 1 628 while i < len(new_tr): 629 # if prev end + 1 == cur start, combine them 630 if new_tr[i - 1][1] + 1 == new_tr[i][0]: 631 new_tr[i][0] = new_tr[i - 1][0] 632 del new_tr[i - 1] 633 else: 634 i += 1 635 return new_tr 636 637def OpenTimeRangeEnds(time_ranges, min_time, max_time): 638 if time_ranges[0][0] <= min_time: 639 time_ranges[0][0] = None 640 if time_ranges[-1][1] >= max_time: 641 time_ranges[-1][1] = None 642 643def BadTimeStr(time_str): 644 raise Exception(f"perf command bad time option: '{time_str}'\nCheck also 'time of first sample' and 'time of last sample' in perf script --header-only") 645 646def ValidateTimeRanges(time_ranges, time_str): 647 n = len(time_ranges) 648 for i in range(n): 649 start = time_ranges[i][0] 650 end = time_ranges[i][1] 651 if i != 0 and start <= time_ranges[i - 1][1]: 652 BadTimeStr(time_str) 653 if start > end: 654 BadTimeStr(time_str) 655 656def TimeVal(s, dflt): 657 s = s.strip() 658 if s == "": 659 return dflt 660 a = s.split(".") 661 if len(a) > 2: 662 raise Exception(f"Bad time value'{s}'") 663 x = int(a[0]) 664 if x < 0: 665 raise Exception("Negative time not allowed") 666 x *= 1000000000 667 if len(a) > 1: 668 x += int((a[1] + "000000000")[:9]) 669 return x 670 671def BadCPUStr(cpu_str): 672 raise Exception(f"perf command bad cpu option: '{cpu_str}'\nCheck also 'nrcpus avail' in perf script --header-only") 673 674def ParseTimeStr(time_str, min_time, max_time): 675 if time_str == None or time_str == "": 676 return [[min_time, max_time]] 677 time_ranges = [] 678 for r in time_str.split(): 679 a = r.split(",") 680 if len(a) != 2: 681 BadTimeStr(time_str) 682 try: 683 start = TimeVal(a[0], min_time) 684 end = TimeVal(a[1], max_time) 685 except: 686 BadTimeStr(time_str) 687 time_ranges.append([start, end]) 688 ValidateTimeRanges(time_ranges, time_str) 689 return time_ranges 690 691def ParseCPUStr(cpu_str, nr_cpus): 692 if cpu_str == None or cpu_str == "": 693 return [-1] 694 cpus = [] 695 for r in cpu_str.split(","): 696 a = r.split("-") 697 if len(a) < 1 or len(a) > 2: 698 BadCPUStr(cpu_str) 699 try: 700 start = int(a[0].strip()) 701 if len(a) > 1: 702 end = int(a[1].strip()) 703 else: 704 end = start 705 except: 706 BadCPUStr(cpu_str) 707 if start < 0 or end < 0 or end < start or end >= nr_cpus: 708 BadCPUStr(cpu_str) 709 cpus.extend(range(start, end + 1)) 710 cpus = list(set(cpus)) # Remove duplicates 711 cpus.sort() 712 return cpus 713 714class ParallelPerf(): 715 716 def __init__(self, a): 717 for arg_name in vars(a): 718 setattr(self, arg_name, getattr(a, arg_name)) 719 self.orig_nr = self.nr 720 self.orig_cmd = list(self.cmd) 721 self.perf = self.cmd[0] 722 if os.path.exists(self.output_dir): 723 raise Exception(f"Output '{self.output_dir}' already exists") 724 if self.jobs < 0 or self.nr < 0 or self.interval < 0: 725 raise Exception("Bad options (negative values): try -h option for help") 726 if self.nr != 0 and self.interval != 0: 727 raise Exception("Cannot specify number of time subdivisions and time interval") 728 if self.jobs == 0: 729 self.jobs = NumberOfCPUs() 730 if self.nr == 0 and self.interval == 0: 731 if self.per_cpu: 732 self.nr = 1 733 else: 734 self.nr = self.jobs 735 736 def Init(self): 737 if self.verbosity.debug: 738 print("cmd", self.cmd) 739 self.file_name = DetermineInputFileName(self.cmd) 740 self.hdr = ReadHeader(self.perf, self.file_name) 741 self.hdr_dict = ParseHeader(self.hdr) 742 self.cmd_line = HeaderField(self.hdr_dict, "cmdline") 743 744 def ExtractTimeInfo(self): 745 self.min_time = TimeVal(HeaderField(self.hdr_dict, "time of first sample"), 0) 746 self.max_time = TimeVal(HeaderField(self.hdr_dict, "time of last sample"), 0) 747 self.time_str = ExtractPerfOption(self.cmd, "", "time") 748 self.time_ranges = ParseTimeStr(self.time_str, self.min_time, self.max_time) 749 if self.verbosity.debug: 750 print("time_ranges", self.time_ranges) 751 752 def ExtractCPUInfo(self): 753 if self.per_cpu: 754 nr_cpus = int(HeaderField(self.hdr_dict, "nrcpus avail")) 755 self.cpu_str = ExtractPerfOption(self.cmd, "C", "cpu") 756 if self.cpu_str == None or self.cpu_str == "": 757 self.cpus = [ x for x in range(nr_cpus) ] 758 else: 759 self.cpus = ParseCPUStr(self.cpu_str, nr_cpus) 760 else: 761 self.cpu_str = None 762 self.cpus = [-1] 763 if self.verbosity.debug: 764 print("cpus", self.cpus) 765 766 def IsIntelPT(self): 767 return self.cmd_line.find("intel_pt") >= 0 768 769 def SplitTimeRanges(self): 770 if self.IsIntelPT() and self.interval == 0: 771 self.split_time_ranges_for_each_cpu = \ 772 SplitTimeRangesByTraceDataDensity(self.time_ranges, self.cpus, self.orig_nr, 773 self.orig_cmd, self.file_name, self.per_cpu, 774 self.min_size, self.min_interval, self.verbosity) 775 elif self.nr: 776 self.split_time_ranges_for_each_cpu = [ SplitTimeRangesIntoN(self.time_ranges, self.nr, self.min_interval) ] 777 else: 778 self.split_time_ranges_for_each_cpu = [ SplitTimeRangesByInterval(self.time_ranges, self.interval) ] 779 780 def CheckTimeRanges(self): 781 for tr in self.split_time_ranges_for_each_cpu: 782 # Re-combined time ranges should be the same 783 new_tr = RecombineTimeRanges(tr) 784 if new_tr != self.time_ranges: 785 if self.verbosity.debug: 786 print("tr", tr) 787 print("new_tr", new_tr) 788 raise Exception("Self test failed!") 789 790 def OpenTimeRangeEnds(self): 791 for time_ranges in self.split_time_ranges_for_each_cpu: 792 OpenTimeRangeEnds(time_ranges, self.min_time, self.max_time) 793 794 def CreateWorkList(self): 795 self.worklist = CreateWorkList(self.cmd, self.pipe_to, self.output_dir, self.cpus, self.split_time_ranges_for_each_cpu) 796 797 def PerfDataRecordedPerCPU(self): 798 if "--per-thread" in self.cmd_line.split(): 799 return False 800 return True 801 802 def DefaultToPerCPU(self): 803 # --no-per-cpu option takes precedence 804 if self.no_per_cpu: 805 return False 806 if not self.PerfDataRecordedPerCPU(): 807 return False 808 # Default to per-cpu for Intel PT data that was recorded per-cpu, 809 # because decoding can be done for each CPU separately. 810 if self.IsIntelPT(): 811 return True 812 return False 813 814 def Config(self): 815 self.Init() 816 self.ExtractTimeInfo() 817 if not self.per_cpu: 818 self.per_cpu = self.DefaultToPerCPU() 819 if self.verbosity.debug: 820 print("per_cpu", self.per_cpu) 821 self.ExtractCPUInfo() 822 self.SplitTimeRanges() 823 if self.verbosity.self_test: 824 self.CheckTimeRanges() 825 # Prefer open-ended time range to starting / ending with min_time / max_time resp. 826 self.OpenTimeRangeEnds() 827 self.CreateWorkList() 828 829 def Run(self): 830 if self.dry_run: 831 print(len(self.worklist),"jobs:") 832 for w in self.worklist: 833 print(w.Command()) 834 return True 835 result = RunWork(self.worklist, self.jobs, verbosity=self.verbosity) 836 if self.verbosity.verbose: 837 print(glb_prog_name, "done") 838 return result 839 840def RunParallelPerf(a): 841 pp = ParallelPerf(a) 842 pp.Config() 843 return pp.Run() 844 845def Main(args): 846 ap = argparse.ArgumentParser( 847 prog=glb_prog_name, formatter_class = argparse.RawDescriptionHelpFormatter, 848 description = 849""" 850Run a perf script command multiple times in parallel, using perf script options 851--cpu and --time so that each job processes a different chunk of the data. 852""", 853 epilog = 854""" 855Follow the options by '--' and then the perf script command e.g. 856 857 $ perf record -a -- sleep 10 858 $ parallel-perf.py --nr=4 -- perf script --ns 859 All jobs finished successfully 860 $ tree parallel-perf-output/ 861 parallel-perf-output/ 862 ├── time-range-0 863 │ ├── cmd.txt 864 │ └── out.txt 865 ├── time-range-1 866 │ ├── cmd.txt 867 │ └── out.txt 868 ├── time-range-2 869 │ ├── cmd.txt 870 │ └── out.txt 871 └── time-range-3 872 ├── cmd.txt 873 └── out.txt 874 $ find parallel-perf-output -name cmd.txt | sort | xargs grep -H . 875 parallel-perf-output/time-range-0/cmd.txt:perf script --time=,9466.504461499 --ns 876 parallel-perf-output/time-range-1/cmd.txt:perf script --time=9466.504461500,9469.005396999 --ns 877 parallel-perf-output/time-range-2/cmd.txt:perf script --time=9469.005397000,9471.506332499 --ns 878 parallel-perf-output/time-range-3/cmd.txt:perf script --time=9471.506332500, --ns 879 880Any perf script command can be used, including the use of perf script options 881--dlfilter and --script, so that the benefit of running parallel jobs 882naturally extends to them also. 883 884If option --pipe-to is used, standard output is first piped through that 885command. Beware, if the command fails (e.g. grep with no matches), it will be 886considered a fatal error. 887 888Final standard output is redirected to files named out.txt in separate 889subdirectories under the output directory. Similarly, standard error is 890written to files named err.txt. In addition, files named cmd.txt contain the 891corresponding perf script command. After processing, err.txt files are removed 892if they are empty. 893 894If any job exits with a non-zero exit code, then all jobs are killed and no 895more are started. A message is printed if any job results in a non-empty 896err.txt file. 897 898There is a separate output subdirectory for each time range. If the --per-cpu 899option is used, these are further grouped under cpu-n subdirectories, e.g. 900 901 $ parallel-perf.py --per-cpu --nr=2 -- perf script --ns --cpu=0,1 902 All jobs finished successfully 903 $ tree parallel-perf-output 904 parallel-perf-output/ 905 ├── cpu-0 906 │ ├── time-range-0 907 │ │ ├── cmd.txt 908 │ │ └── out.txt 909 │ └── time-range-1 910 │ ├── cmd.txt 911 │ └── out.txt 912 └── cpu-1 913 ├── time-range-0 914 │ ├── cmd.txt 915 │ └── out.txt 916 └── time-range-1 917 ├── cmd.txt 918 └── out.txt 919 $ find parallel-perf-output -name cmd.txt | sort | xargs grep -H . 920 parallel-perf-output/cpu-0/time-range-0/cmd.txt:perf script --cpu=0 --time=,9469.005396999 --ns 921 parallel-perf-output/cpu-0/time-range-1/cmd.txt:perf script --cpu=0 --time=9469.005397000, --ns 922 parallel-perf-output/cpu-1/time-range-0/cmd.txt:perf script --cpu=1 --time=,9469.005396999 --ns 923 parallel-perf-output/cpu-1/time-range-1/cmd.txt:perf script --cpu=1 --time=9469.005397000, --ns 924 925Subdivisions of time range, and cpus if the --per-cpu option is used, are 926expressed by the --time and --cpu perf script options respectively. If the 927supplied perf script command has a --time option, then that time range is 928subdivided, otherwise the time range given by 'time of first sample' to 929'time of last sample' is used (refer perf script --header-only). Similarly, the 930supplied perf script command may provide a --cpu option, and only those CPUs 931will be processed. 932 933To prevent time intervals becoming too small, the --min-interval option can 934be used. 935 936Note there is special handling for processing Intel PT traces. If an interval is 937not specified and the perf record command contained the intel_pt event, then the 938time range will be subdivided in order to produce subdivisions that contain 939approximately the same amount of trace data. That is accomplished by counting 940double-quick (--itrace=qqi) samples, and choosing time ranges that encompass 941approximately the same number of samples. In that case, time ranges may not be 942the same for each CPU processed. For Intel PT, --per-cpu is the default, but 943that can be overridden by --no-per-cpu. Note, for Intel PT, double-quick 944decoding produces 1 sample for each PSB synchronization packet, which in turn 945come after a certain number of bytes output, determined by psb_period (refer 946perf Intel PT documentation). The minimum number of double-quick samples that 947will define a time range can be set by the --min_size option, which defaults to 94864. 949""") 950 ap.add_argument("-o", "--output-dir", default="parallel-perf-output", help="output directory (default 'parallel-perf-output')") 951 ap.add_argument("-j", "--jobs", type=int, default=0, help="maximum number of jobs to run in parallel at one time (default is the number of CPUs)") 952 ap.add_argument("-n", "--nr", type=int, default=0, help="number of time subdivisions (default is the number of jobs)") 953 ap.add_argument("-i", "--interval", type=float, default=0, help="subdivide the time range using this time interval (in seconds e.g. 0.1 for a tenth of a second)") 954 ap.add_argument("-c", "--per-cpu", action="store_true", help="process data for each CPU in parallel") 955 ap.add_argument("-m", "--min-interval", type=float, default=glb_min_interval, help=f"minimum interval (default {glb_min_interval} seconds)") 956 ap.add_argument("-p", "--pipe-to", help="command to pipe output to (optional)") 957 ap.add_argument("-N", "--no-per-cpu", action="store_true", help="do not process data for each CPU in parallel") 958 ap.add_argument("-b", "--min_size", type=int, default=glb_min_samples, help="minimum data size (for Intel PT in PSBs)") 959 ap.add_argument("-D", "--dry-run", action="store_true", help="do not run any jobs, just show the perf script commands") 960 ap.add_argument("-q", "--quiet", action="store_true", help="do not print any messages except errors") 961 ap.add_argument("-v", "--verbose", action="store_true", help="print more messages") 962 ap.add_argument("-d", "--debug", action="store_true", help="print debugging messages") 963 cmd_line = list(args) 964 try: 965 split_pos = cmd_line.index("--") 966 cmd = cmd_line[split_pos + 1:] 967 args = cmd_line[:split_pos] 968 except: 969 cmd = None 970 args = cmd_line 971 a = ap.parse_args(args=args[1:]) 972 a.cmd = cmd 973 a.verbosity = Verbosity(a.quiet, a.verbose, a.debug) 974 try: 975 if a.cmd == None: 976 if len(args) <= 1: 977 ap.print_help() 978 return True 979 raise Exception("Command line must contain '--' before perf command") 980 return RunParallelPerf(a) 981 except Exception as e: 982 print("Fatal error: ", str(e)) 983 if a.debug: 984 raise 985 return False 986 987if __name__ == "__main__": 988 if not Main(sys.argv): 989 sys.exit(1) 990