1#!/usr/bin/env python3 2# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) 3"""Interactive perf list.""" 4 5from abc import ABC, abstractmethod 6import argparse 7from dataclasses import dataclass 8import math 9from typing import Any, Dict, Optional, Tuple 10import perf 11from textual import on 12from textual.app import App, ComposeResult 13from textual.binding import Binding 14from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll 15from textual.css.query import NoMatches 16from textual.command import SearchIcon 17from textual.screen import ModalScreen 18from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree 19from textual.widgets.tree import TreeNode 20 21 22def get_info(info: Dict[str, str], key: str): 23 return (info[key] + "\n") if key in info else "" 24 25 26class TreeValue(ABC): 27 """Abstraction for the data of value in the tree.""" 28 29 @abstractmethod 30 def name(self) -> str: 31 pass 32 33 @abstractmethod 34 def description(self) -> str: 35 pass 36 37 @abstractmethod 38 def matches(self, query: str) -> bool: 39 pass 40 41 @abstractmethod 42 def parse(self) -> perf.evlist: 43 pass 44 45 @abstractmethod 46 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: 47 pass 48 49 50@dataclass 51class Metric(TreeValue): 52 """A metric in the tree.""" 53 metric_name: str 54 metric_pmu: str 55 56 def name(self) -> str: 57 return self.metric_name 58 59 def description(self) -> str: 60 """Find and format metric description.""" 61 for metric in perf.metrics(): 62 if metric["MetricName"] != self.metric_name: 63 continue 64 if self.metric_pmu and metric["PMU"] != self.metric_pmu: 65 continue 66 desc = get_info(metric, "BriefDescription") 67 desc += get_info(metric, "PublicDescription") 68 desc += get_info(metric, "MetricExpr") 69 desc += get_info(metric, "MetricThreshold") 70 return desc 71 return "description" 72 73 def matches(self, query: str) -> bool: 74 return query in self.metric_name 75 76 def parse(self) -> perf.evlist: 77 return perf.parse_metrics(self.metric_name, self.metric_pmu) 78 79 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: 80 val = evlist.compute_metric(self.metric_name, cpu, thread) 81 return 0 if math.isnan(val) else val 82 83 84@dataclass 85class PmuEvent(TreeValue): 86 """A PMU and event within the tree.""" 87 pmu: str 88 event: str 89 90 def name(self) -> str: 91 if self.event.startswith(self.pmu) or ':' in self.event: 92 return self.event 93 else: 94 return f"{self.pmu}/{self.event}/" 95 96 def description(self) -> str: 97 """Find and format event description for {pmu}/{event}/.""" 98 for p in perf.pmus(): 99 if p.name() != self.pmu: 100 continue 101 for info in p.events(): 102 if "name" not in info or info["name"] != self.event: 103 continue 104 105 desc = get_info(info, "topic") 106 desc += get_info(info, "event_type_desc") 107 desc += get_info(info, "desc") 108 desc += get_info(info, "long_desc") 109 desc += get_info(info, "encoding_desc") 110 return desc 111 return "description" 112 113 def matches(self, query: str) -> bool: 114 return query in self.pmu or query in self.event 115 116 def parse(self) -> perf.evlist: 117 return perf.parse_events(self.name()) 118 119 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: 120 return evsel.read(cpu, thread).val 121 122 123class ErrorScreen(ModalScreen[bool]): 124 """Pop up dialog for errors.""" 125 126 CSS = """ 127 ErrorScreen { 128 align: center middle; 129 } 130 """ 131 132 def __init__(self, error: str): 133 self.error = error 134 super().__init__() 135 136 def compose(self) -> ComposeResult: 137 yield Button(f"Error: {self.error}", variant="primary", id="error") 138 139 def on_button_pressed(self, event: Button.Pressed) -> None: 140 self.dismiss(True) 141 142 143class SearchScreen(ModalScreen[str]): 144 """Pop up dialog for search.""" 145 146 CSS = """ 147 SearchScreen Horizontal { 148 align: center middle; 149 margin-top: 1; 150 } 151 SearchScreen Input { 152 width: 1fr; 153 } 154 """ 155 156 def compose(self) -> ComposeResult: 157 yield Horizontal(SearchIcon(), Input(placeholder="Event name")) 158 159 def on_input_submitted(self, event: Input.Submitted) -> None: 160 """Handle the user pressing Enter in the input field.""" 161 self.dismiss(event.value) 162 163 164class Counter(HorizontalGroup): 165 """Two labels for a CPU and its counter value.""" 166 167 CSS = """ 168 Label { 169 gutter: 1; 170 } 171 """ 172 173 def __init__(self, cpu: int) -> None: 174 self.cpu = cpu 175 super().__init__() 176 177 def compose(self) -> ComposeResult: 178 label = f"cpu{self.cpu}" if self.cpu >= 0 else "total" 179 yield Label(label + " ") 180 yield Label("0", id=f"counter_{label}") 181 182 183class CounterSparkline(HorizontalGroup): 184 """A Sparkline for a performance counter.""" 185 186 def __init__(self, cpu: int) -> None: 187 self.cpu = cpu 188 super().__init__() 189 190 def compose(self) -> ComposeResult: 191 label = f"cpu{self.cpu}" if self.cpu >= 0 else "total" 192 yield Label(label) 193 yield Sparkline([], summary_function=max, id=f"sparkline_{label}") 194 195 196class IListApp(App): 197 TITLE = "Interactive Perf List" 198 199 BINDINGS = [ 200 Binding(key="s", action="search", description="Search", 201 tooltip="Search events and PMUs"), 202 Binding(key="n", action="next", description="Next", 203 tooltip="Next search result or item"), 204 Binding(key="p", action="prev", description="Previous", 205 tooltip="Previous search result or item"), 206 Binding(key="c", action="collapse", description="Collapse", 207 tooltip="Collapse the current PMU"), 208 Binding(key="^q", action="quit", description="Quit", 209 tooltip="Quit the app"), 210 ] 211 212 CSS = """ 213 /* Make the 'total' sparkline a different color. */ 214 #sparkline_total > .sparkline--min-color { 215 color: $accent; 216 } 217 #sparkline_total > .sparkline--max-color { 218 color: $accent 30%; 219 } 220 /* 221 * Make the active_search initially not displayed with the text in 222 * the middle of the line. 223 */ 224 #active_search { 225 display: none; 226 width: 100%; 227 text-align: center; 228 } 229 """ 230 231 def __init__(self, interval: float) -> None: 232 self.interval = interval 233 self.evlist = None 234 self.selected: Optional[TreeValue] = None 235 self.search_results: list[TreeNode[TreeValue]] = [] 236 self.cur_search_result: TreeNode[TreeValue] | None = None 237 super().__init__() 238 239 def expand_and_select(self, node: TreeNode[Any]) -> None: 240 """Expand select a node in the tree.""" 241 if node.parent: 242 node.parent.expand() 243 if node.parent.parent: 244 node.parent.parent.expand() 245 node.expand() 246 node.tree.select_node(node) 247 node.tree.scroll_to_node(node) 248 249 def set_searched_tree_node(self, previous: bool) -> None: 250 """Set the cur_search_result node to either the next or previous.""" 251 l = len(self.search_results) 252 253 if l < 1: 254 tree: Tree[TreeValue] = self.query_one("#root", Tree) 255 if previous: 256 tree.action_cursor_up() 257 else: 258 tree.action_cursor_down() 259 return 260 261 if self.cur_search_result: 262 idx = self.search_results.index(self.cur_search_result) 263 if previous: 264 idx = idx - 1 if idx > 0 else l - 1 265 else: 266 idx = idx + 1 if idx < l - 1 else 0 267 else: 268 idx = l - 1 if previous else 0 269 270 node = self.search_results[idx] 271 if node == self.cur_search_result: 272 return 273 274 self.cur_search_result = node 275 self.expand_and_select(node) 276 277 def action_search(self) -> None: 278 """Search was chosen.""" 279 def set_initial_focus(event: str | None) -> None: 280 """Sets the focus after the SearchScreen is dismissed.""" 281 282 search_label = self.query_one("#active_search", Label) 283 search_label.display = True if event else False 284 if not event: 285 return 286 event = event.lower() 287 search_label.update(f'Searching for events matching "{event}"') 288 289 tree: Tree[str] = self.query_one("#root", Tree) 290 291 def find_search_results(event: str, node: TreeNode[str], 292 cursor_seen: bool = False, 293 match_after_cursor: Optional[TreeNode[str]] = None 294 ) -> Tuple[bool, Optional[TreeNode[str]]]: 295 """Find nodes that match the search remembering the one after the cursor.""" 296 if not cursor_seen and node == tree.cursor_node: 297 cursor_seen = True 298 if node.data and node.data.matches(event): 299 if cursor_seen and not match_after_cursor: 300 match_after_cursor = node 301 self.search_results.append(node) 302 303 if node.children: 304 for child in node.children: 305 (cursor_seen, match_after_cursor) = \ 306 find_search_results(event, child, cursor_seen, match_after_cursor) 307 return (cursor_seen, match_after_cursor) 308 309 self.search_results.clear() 310 (_, self.cur_search_result) = find_search_results(event, tree.root) 311 if len(self.search_results) < 1: 312 self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}")) 313 search_label.display = False 314 elif self.cur_search_result: 315 self.expand_and_select(self.cur_search_result) 316 else: 317 self.set_searched_tree_node(previous=False) 318 319 self.push_screen(SearchScreen(), set_initial_focus) 320 321 def action_next(self) -> None: 322 """Next was chosen.""" 323 self.set_searched_tree_node(previous=False) 324 325 def action_prev(self) -> None: 326 """Previous was chosen.""" 327 self.set_searched_tree_node(previous=True) 328 329 def action_collapse(self) -> None: 330 """Collapse the part of the tree currently on.""" 331 tree: Tree[str] = self.query_one("#root", Tree) 332 node = tree.cursor_node 333 if node and node.parent: 334 node.parent.collapse_all() 335 node.tree.scroll_to_node(node.parent) 336 337 def update_counts(self) -> None: 338 """Called every interval to update counts.""" 339 if not self.selected or not self.evlist: 340 return 341 342 def update_count(cpu: int, count: int): 343 # Update the raw count display. 344 counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total") 345 if not counter: 346 return 347 counter = counter.first(Label) 348 counter.update(str(count)) 349 350 # Update the sparkline. 351 line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total") 352 if not line: 353 return 354 line = line.first(Sparkline) 355 # If there are more events than the width, remove the front event. 356 if len(line.data) > line.size.width: 357 line.data.pop(0) 358 line.data.append(count) 359 line.mutate_reactive(Sparkline.data) 360 361 # Update the total and each CPU counts, assume there's just 1 evsel. 362 total = 0 363 self.evlist.disable() 364 for evsel in self.evlist: 365 for cpu in evsel.cpus(): 366 aggr = 0 367 for thread in evsel.threads(): 368 aggr += self.selected.value(self.evlist, evsel, cpu, thread) 369 update_count(cpu, aggr) 370 total += aggr 371 update_count(-1, total) 372 self.evlist.enable() 373 374 def on_mount(self) -> None: 375 """When App starts set up periodic event updating.""" 376 self.update_counts() 377 self.set_interval(self.interval, self.update_counts) 378 379 def set_selected(self, value: TreeValue) -> None: 380 """Updates the event/description and starts the counters.""" 381 try: 382 label_name = self.query_one("#event_name", Label) 383 event_description = self.query_one("#event_description", Static) 384 lines = self.query_one("#lines") 385 except NoMatches: 386 # A race with rendering, ignore the update as we can't 387 # mount the assumed output widgets. 388 return 389 390 self.selected = value 391 392 # Remove previous event information. 393 if self.evlist: 394 self.evlist.disable() 395 self.evlist.close() 396 old_lines = self.query(CounterSparkline) 397 for line in old_lines: 398 line.remove() 399 old_counters = self.query(Counter) 400 for counter in old_counters: 401 counter.remove() 402 403 # Update event/metric text and description. 404 label_name.update(value.name()) 405 event_description.update(value.description()) 406 407 # Open the event. 408 try: 409 self.evlist = value.parse() 410 if self.evlist: 411 self.evlist.open() 412 self.evlist.enable() 413 except: 414 self.evlist = None 415 416 if not self.evlist: 417 self.push_screen(ErrorScreen(f"Failed to open {value.name()}")) 418 return 419 420 # Add spark lines for all the CPUs. Note, must be done after 421 # open so that the evlist CPUs have been computed by propagate 422 # maps. 423 line = CounterSparkline(cpu=-1) 424 lines.mount(line) 425 for cpu in self.evlist.all_cpus(): 426 line = CounterSparkline(cpu) 427 lines.mount(line) 428 line = Counter(cpu=-1) 429 lines.mount(line) 430 for cpu in self.evlist.all_cpus(): 431 line = Counter(cpu) 432 lines.mount(line) 433 434 def compose(self) -> ComposeResult: 435 """Draws the app.""" 436 def metric_event_tree() -> Tree: 437 """Create tree of PMUs and metricgroups with events or metrics under.""" 438 tree: Tree[TreeValue] = Tree("Root", id="root") 439 pmus = tree.root.add("PMUs") 440 for pmu in perf.pmus(): 441 pmu_name = pmu.name().lower() 442 pmu_node = pmus.add(pmu_name) 443 try: 444 for event in sorted(pmu.events(), key=lambda x: x["name"]): 445 if "deprecated" in event: 446 continue 447 if "name" in event: 448 e = event["name"].lower() 449 if "alias" in event: 450 pmu_node.add_leaf(f'{e} ({event["alias"]})', 451 data=PmuEvent(pmu_name, e)) 452 else: 453 pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e)) 454 except: 455 # Reading events may fail with EPERM, ignore. 456 pass 457 metrics = tree.root.add("Metrics") 458 groups = set() 459 for metric in perf.metrics(): 460 groups.update(metric["MetricGroup"]) 461 462 def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str, pmu: str = None): 463 for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]): 464 metric_pmu = metric.get('PMU') 465 if pmu and metric_pmu and metric_pmu != pmu: 466 continue 467 if parent in metric["MetricGroup"]: 468 name = metric["MetricName"] 469 display_name = name 470 if metric_pmu: 471 display_name += f" ({metric_pmu})" 472 node.add_leaf(display_name, data=Metric(name, metric_pmu)) 473 child_group_name = f'{name}_group' 474 if child_group_name in groups: 475 display_child_group_name = child_group_name 476 if metric_pmu: 477 display_child_group_name += f" ({metric_pmu})" 478 add_metrics_to_tree(node.add(display_child_group_name), 479 child_group_name, 480 metric_pmu) 481 482 for group in sorted(groups): 483 if group.endswith('_group'): 484 continue 485 add_metrics_to_tree(metrics.add(group), group) 486 487 tree.root.expand() 488 return tree 489 490 yield Header(id="header") 491 yield Horizontal(Vertical(metric_event_tree(), id="events"), 492 Vertical(Label("event name", id="event_name"), 493 Static("description", markup=False, id="event_description"), 494 )) 495 yield Label(id="active_search") 496 yield VerticalScroll(id="lines") 497 yield Footer(id="footer") 498 499 @on(Tree.NodeSelected) 500 def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None: 501 """Called when a tree node is selected, selecting the event.""" 502 if event.node.data: 503 self.set_selected(event.node.data) 504 505 506if __name__ == "__main__": 507 ap = argparse.ArgumentParser() 508 ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1) 509 args = ap.parse_args() 510 app = IListApp(float(args.interval)) 511 app.run() 512