1#!/usr/bin/env python3 2# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) 3"""Interactive perf list.""" 4 5from abc import ABC, abstractmethod 6import argparse 7from dataclasses import dataclass 8import math 9from typing import Any, Dict, Optional, Tuple 10import perf 11from textual import on 12from textual.app import App, ComposeResult 13from textual.binding import Binding 14from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll 15from textual.css.query import NoMatches 16from textual.command import SearchIcon 17from textual.screen import ModalScreen 18from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree 19from textual.widgets.tree import TreeNode 20 21 22def get_info(info: Dict[str, str], key: str): 23 return (info[key] + "\n") if key in info else "" 24 25 26class TreeValue(ABC): 27 """Abstraction for the data of value in the tree.""" 28 29 @abstractmethod 30 def name(self) -> str: 31 pass 32 33 @abstractmethod 34 def description(self) -> str: 35 pass 36 37 @abstractmethod 38 def matches(self, query: str) -> bool: 39 pass 40 41 @abstractmethod 42 def parse(self) -> perf.evlist: 43 pass 44 45 @abstractmethod 46 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: 47 pass 48 49 50@dataclass 51class Metric(TreeValue): 52 """A metric in the tree.""" 53 metric_name: str 54 metric_pmu: str 55 56 def name(self) -> str: 57 return self.metric_name 58 59 def description(self) -> str: 60 """Find and format metric description.""" 61 for metric in perf.metrics(): 62 if metric["MetricName"] != self.metric_name: 63 continue 64 if self.metric_pmu and metric["PMU"] != self.metric_pmu: 65 continue 66 desc = get_info(metric, "BriefDescription") 67 desc += get_info(metric, "PublicDescription") 68 desc += get_info(metric, "MetricExpr") 69 desc += get_info(metric, "MetricThreshold") 70 return desc 71 return "description" 72 73 def matches(self, query: str) -> bool: 74 return query in self.metric_name 75 76 def parse(self) -> perf.evlist: 77 return perf.parse_metrics(self.metric_name, self.metric_pmu) 78 79 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: 80 try: 81 val = evlist.compute_metric(self.metric_name, cpu, thread) 82 return 0 if math.isnan(val) else val 83 except: 84 # Be tolerant of failures to compute metrics on particular CPUs/threads. 85 return 0 86 87 88@dataclass 89class PmuEvent(TreeValue): 90 """A PMU and event within the tree.""" 91 pmu: str 92 event: str 93 94 def name(self) -> str: 95 if self.event.startswith(self.pmu) or ':' in self.event: 96 return self.event 97 else: 98 return f"{self.pmu}/{self.event}/" 99 100 def description(self) -> str: 101 """Find and format event description for {pmu}/{event}/.""" 102 for p in perf.pmus(): 103 if p.name() != self.pmu: 104 continue 105 for info in p.events(): 106 if "name" not in info or info["name"] != self.event: 107 continue 108 109 desc = get_info(info, "topic") 110 desc += get_info(info, "event_type_desc") 111 desc += get_info(info, "desc") 112 desc += get_info(info, "long_desc") 113 desc += get_info(info, "encoding_desc") 114 return desc 115 return "description" 116 117 def matches(self, query: str) -> bool: 118 return query in self.pmu or query in self.event 119 120 def parse(self) -> perf.evlist: 121 return perf.parse_events(self.name()) 122 123 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: 124 return evsel.read(cpu, thread).val 125 126 127class ErrorScreen(ModalScreen[bool]): 128 """Pop up dialog for errors.""" 129 130 CSS = """ 131 ErrorScreen { 132 align: center middle; 133 } 134 """ 135 136 def __init__(self, error: str): 137 self.error = error 138 super().__init__() 139 140 def compose(self) -> ComposeResult: 141 yield Button(f"Error: {self.error}", variant="primary", id="error") 142 143 def on_button_pressed(self, event: Button.Pressed) -> None: 144 self.dismiss(True) 145 146 147class SearchScreen(ModalScreen[str]): 148 """Pop up dialog for search.""" 149 150 CSS = """ 151 SearchScreen Horizontal { 152 align: center middle; 153 margin-top: 1; 154 } 155 SearchScreen Input { 156 width: 1fr; 157 } 158 """ 159 160 def compose(self) -> ComposeResult: 161 yield Horizontal(SearchIcon(), Input(placeholder="Event name")) 162 163 def on_input_submitted(self, event: Input.Submitted) -> None: 164 """Handle the user pressing Enter in the input field.""" 165 self.dismiss(event.value) 166 167 168class Counter(HorizontalGroup): 169 """Two labels for a CPU and its counter value.""" 170 171 CSS = """ 172 Label { 173 gutter: 1; 174 } 175 """ 176 177 def __init__(self, cpu: int) -> None: 178 self.cpu = cpu 179 super().__init__() 180 181 def compose(self) -> ComposeResult: 182 label = f"cpu{self.cpu}" if self.cpu >= 0 else "total" 183 yield Label(label + " ") 184 yield Label("0", id=f"counter_{label}") 185 186 187class CounterSparkline(HorizontalGroup): 188 """A Sparkline for a performance counter.""" 189 190 def __init__(self, cpu: int) -> None: 191 self.cpu = cpu 192 super().__init__() 193 194 def compose(self) -> ComposeResult: 195 label = f"cpu{self.cpu}" if self.cpu >= 0 else "total" 196 yield Label(label) 197 yield Sparkline([], summary_function=max, id=f"sparkline_{label}") 198 199 200class IListApp(App): 201 TITLE = "Interactive Perf List" 202 203 BINDINGS = [ 204 Binding(key="s", action="search", description="Search", 205 tooltip="Search events and PMUs"), 206 Binding(key="n", action="next", description="Next", 207 tooltip="Next search result or item"), 208 Binding(key="p", action="prev", description="Previous", 209 tooltip="Previous search result or item"), 210 Binding(key="c", action="collapse", description="Collapse", 211 tooltip="Collapse the current PMU"), 212 Binding(key="^q", action="quit", description="Quit", 213 tooltip="Quit the app"), 214 ] 215 216 CSS = """ 217 /* Make the 'total' sparkline a different color. */ 218 #sparkline_total > .sparkline--min-color { 219 color: $accent; 220 } 221 #sparkline_total > .sparkline--max-color { 222 color: $accent 30%; 223 } 224 /* 225 * Make the active_search initially not displayed with the text in 226 * the middle of the line. 227 */ 228 #active_search { 229 display: none; 230 width: 100%; 231 text-align: center; 232 } 233 """ 234 235 def __init__(self, interval: float) -> None: 236 self.interval = interval 237 self.evlist = None 238 self.selected: Optional[TreeValue] = None 239 self.search_results: list[TreeNode[TreeValue]] = [] 240 self.cur_search_result: TreeNode[TreeValue] | None = None 241 super().__init__() 242 243 def expand_and_select(self, node: TreeNode[Any]) -> None: 244 """Expand select a node in the tree.""" 245 if node.parent: 246 node.parent.expand() 247 if node.parent.parent: 248 node.parent.parent.expand() 249 node.expand() 250 node.tree.select_node(node) 251 node.tree.scroll_to_node(node) 252 253 def set_searched_tree_node(self, previous: bool) -> None: 254 """Set the cur_search_result node to either the next or previous.""" 255 l = len(self.search_results) 256 257 if l < 1: 258 tree: Tree[TreeValue] = self.query_one("#root", Tree) 259 if previous: 260 tree.action_cursor_up() 261 else: 262 tree.action_cursor_down() 263 return 264 265 if self.cur_search_result: 266 idx = self.search_results.index(self.cur_search_result) 267 if previous: 268 idx = idx - 1 if idx > 0 else l - 1 269 else: 270 idx = idx + 1 if idx < l - 1 else 0 271 else: 272 idx = l - 1 if previous else 0 273 274 node = self.search_results[idx] 275 if node == self.cur_search_result: 276 return 277 278 self.cur_search_result = node 279 self.expand_and_select(node) 280 281 def action_search(self) -> None: 282 """Search was chosen.""" 283 def set_initial_focus(event: str | None) -> None: 284 """Sets the focus after the SearchScreen is dismissed.""" 285 286 search_label = self.query_one("#active_search", Label) 287 search_label.display = True if event else False 288 if not event: 289 return 290 event = event.lower() 291 search_label.update(f'Searching for events matching "{event}"') 292 293 tree: Tree[str] = self.query_one("#root", Tree) 294 295 def find_search_results(event: str, node: TreeNode[str], 296 cursor_seen: bool = False, 297 match_after_cursor: Optional[TreeNode[str]] = None 298 ) -> Tuple[bool, Optional[TreeNode[str]]]: 299 """Find nodes that match the search remembering the one after the cursor.""" 300 if not cursor_seen and node == tree.cursor_node: 301 cursor_seen = True 302 if node.data and node.data.matches(event): 303 if cursor_seen and not match_after_cursor: 304 match_after_cursor = node 305 self.search_results.append(node) 306 307 if node.children: 308 for child in node.children: 309 (cursor_seen, match_after_cursor) = \ 310 find_search_results(event, child, cursor_seen, match_after_cursor) 311 return (cursor_seen, match_after_cursor) 312 313 self.search_results.clear() 314 (_, self.cur_search_result) = find_search_results(event, tree.root) 315 if len(self.search_results) < 1: 316 self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}")) 317 search_label.display = False 318 elif self.cur_search_result: 319 self.expand_and_select(self.cur_search_result) 320 else: 321 self.set_searched_tree_node(previous=False) 322 323 self.push_screen(SearchScreen(), set_initial_focus) 324 325 def action_next(self) -> None: 326 """Next was chosen.""" 327 self.set_searched_tree_node(previous=False) 328 329 def action_prev(self) -> None: 330 """Previous was chosen.""" 331 self.set_searched_tree_node(previous=True) 332 333 def action_collapse(self) -> None: 334 """Collapse the part of the tree currently on.""" 335 tree: Tree[str] = self.query_one("#root", Tree) 336 node = tree.cursor_node 337 if node and node.parent: 338 node.parent.collapse_all() 339 node.tree.scroll_to_node(node.parent) 340 341 def update_counts(self) -> None: 342 """Called every interval to update counts.""" 343 if not self.selected or not self.evlist: 344 return 345 346 def update_count(cpu: int, count: int): 347 # Update the raw count display. 348 counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total") 349 if not counter: 350 return 351 counter = counter.first(Label) 352 counter.update(str(count)) 353 354 # Update the sparkline. 355 line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total") 356 if not line: 357 return 358 line = line.first(Sparkline) 359 # If there are more events than the width, remove the front event. 360 if len(line.data) > line.size.width: 361 line.data.pop(0) 362 line.data.append(count) 363 line.mutate_reactive(Sparkline.data) 364 365 # Update the total and each CPU counts, assume there's just 1 evsel. 366 total = 0 367 self.evlist.disable() 368 for evsel in self.evlist: 369 for cpu in evsel.cpus(): 370 aggr = 0 371 for thread in evsel.threads(): 372 aggr += self.selected.value(self.evlist, evsel, cpu, thread) 373 update_count(cpu, aggr) 374 total += aggr 375 update_count(-1, total) 376 self.evlist.enable() 377 378 def on_mount(self) -> None: 379 """When App starts set up periodic event updating.""" 380 self.update_counts() 381 self.set_interval(self.interval, self.update_counts) 382 383 def set_selected(self, value: TreeValue) -> None: 384 """Updates the event/description and starts the counters.""" 385 try: 386 label_name = self.query_one("#event_name", Label) 387 event_description = self.query_one("#event_description", Static) 388 lines = self.query_one("#lines") 389 except NoMatches: 390 # A race with rendering, ignore the update as we can't 391 # mount the assumed output widgets. 392 return 393 394 self.selected = value 395 396 # Remove previous event information. 397 if self.evlist: 398 self.evlist.disable() 399 self.evlist.close() 400 old_lines = self.query(CounterSparkline) 401 for line in old_lines: 402 line.remove() 403 old_counters = self.query(Counter) 404 for counter in old_counters: 405 counter.remove() 406 407 # Update event/metric text and description. 408 label_name.update(value.name()) 409 event_description.update(value.description()) 410 411 # Open the event. 412 try: 413 self.evlist = value.parse() 414 if self.evlist: 415 self.evlist.open() 416 self.evlist.enable() 417 except: 418 self.evlist = None 419 420 if not self.evlist: 421 self.push_screen(ErrorScreen(f"Failed to open {value.name()}")) 422 return 423 424 # Add spark lines for all the CPUs. Note, must be done after 425 # open so that the evlist CPUs have been computed by propagate 426 # maps. 427 line = CounterSparkline(cpu=-1) 428 lines.mount(line) 429 for cpu in self.evlist.all_cpus(): 430 line = CounterSparkline(cpu) 431 lines.mount(line) 432 line = Counter(cpu=-1) 433 lines.mount(line) 434 for cpu in self.evlist.all_cpus(): 435 line = Counter(cpu) 436 lines.mount(line) 437 438 def compose(self) -> ComposeResult: 439 """Draws the app.""" 440 def metric_event_tree() -> Tree: 441 """Create tree of PMUs and metricgroups with events or metrics under.""" 442 tree: Tree[TreeValue] = Tree("Root", id="root") 443 pmus = tree.root.add("PMUs") 444 for pmu in perf.pmus(): 445 pmu_name = pmu.name().lower() 446 pmu_node = pmus.add(pmu_name) 447 try: 448 for event in sorted(pmu.events(), key=lambda x: x["name"]): 449 if "deprecated" in event: 450 continue 451 if "name" in event: 452 e = event["name"].lower() 453 if "alias" in event: 454 pmu_node.add_leaf(f'{e} ({event["alias"]})', 455 data=PmuEvent(pmu_name, e)) 456 else: 457 pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e)) 458 except: 459 # Reading events may fail with EPERM, ignore. 460 pass 461 metrics = tree.root.add("Metrics") 462 groups = set() 463 for metric in perf.metrics(): 464 groups.update(metric["MetricGroup"]) 465 466 def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str, pmu: str = None): 467 for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]): 468 metric_pmu = metric.get('PMU') 469 if pmu and metric_pmu and metric_pmu != pmu: 470 continue 471 if parent in metric["MetricGroup"]: 472 name = metric["MetricName"] 473 display_name = name 474 if metric_pmu: 475 display_name += f" ({metric_pmu})" 476 node.add_leaf(display_name, data=Metric(name, metric_pmu)) 477 child_group_name = f'{name}_group' 478 if child_group_name in groups: 479 display_child_group_name = child_group_name 480 if metric_pmu: 481 display_child_group_name += f" ({metric_pmu})" 482 add_metrics_to_tree(node.add(display_child_group_name), 483 child_group_name, 484 metric_pmu) 485 486 for group in sorted(groups): 487 if group.endswith('_group'): 488 continue 489 add_metrics_to_tree(metrics.add(group), group) 490 491 tree.root.expand() 492 return tree 493 494 yield Header(id="header") 495 yield Horizontal(Vertical(metric_event_tree(), id="events"), 496 Vertical(Label("event name", id="event_name"), 497 Static("description", markup=False, id="event_description"), 498 )) 499 yield Label(id="active_search") 500 yield VerticalScroll(id="lines") 501 yield Footer(id="footer") 502 503 @on(Tree.NodeSelected) 504 def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None: 505 """Called when a tree node is selected, selecting the event.""" 506 if event.node.data: 507 self.set_selected(event.node.data) 508 509 510if __name__ == "__main__": 511 ap = argparse.ArgumentParser() 512 ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1) 513 args = ap.parse_args() 514 app = IListApp(float(args.interval)) 515 app.run() 516