1#!/usr/bin/env python3 2# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause) 3"""Interactive perf list.""" 4 5from abc import ABC, abstractmethod 6import argparse 7from dataclasses import dataclass 8import math 9from typing import Any, Dict, Optional, Tuple 10import perf 11from textual import on 12from textual.app import App, ComposeResult 13from textual.binding import Binding 14from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll 15from textual.css.query import NoMatches 16from textual.command import SearchIcon 17from textual.screen import ModalScreen 18from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree 19from textual.widgets.tree import TreeNode 20 21 22def get_info(info: Dict[str, str], key: str): 23 return (info[key] + "\n") if key in info else "" 24 25 26class TreeValue(ABC): 27 """Abstraction for the data of value in the tree.""" 28 29 @abstractmethod 30 def name(self) -> str: 31 pass 32 33 @abstractmethod 34 def description(self) -> str: 35 pass 36 37 @abstractmethod 38 def matches(self, query: str) -> bool: 39 pass 40 41 @abstractmethod 42 def parse(self) -> perf.evlist: 43 pass 44 45 @abstractmethod 46 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: 47 pass 48 49 50@dataclass 51class Metric(TreeValue): 52 """A metric in the tree.""" 53 metric_name: str 54 55 def name(self) -> str: 56 return self.metric_name 57 58 def description(self) -> str: 59 """Find and format metric description.""" 60 for metric in perf.metrics(): 61 if metric["MetricName"] != self.metric_name: 62 continue 63 desc = get_info(metric, "BriefDescription") 64 desc += get_info(metric, "PublicDescription") 65 desc += get_info(metric, "MetricExpr") 66 desc += get_info(metric, "MetricThreshold") 67 return desc 68 return "description" 69 70 def matches(self, query: str) -> bool: 71 return query in self.metric_name 72 73 def parse(self) -> perf.evlist: 74 return perf.parse_metrics(self.metric_name) 75 76 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: 77 val = evlist.compute_metric(self.metric_name, cpu, thread) 78 return 0 if math.isnan(val) else val 79 80 81@dataclass 82class PmuEvent(TreeValue): 83 """A PMU and event within the tree.""" 84 pmu: str 85 event: str 86 87 def name(self) -> str: 88 if self.event.startswith(self.pmu) or ':' in self.event: 89 return self.event 90 else: 91 return f"{self.pmu}/{self.event}/" 92 93 def description(self) -> str: 94 """Find and format event description for {pmu}/{event}/.""" 95 for p in perf.pmus(): 96 if p.name() != self.pmu: 97 continue 98 for info in p.events(): 99 if "name" not in info or info["name"] != self.event: 100 continue 101 102 desc = get_info(info, "topic") 103 desc += get_info(info, "event_type_desc") 104 desc += get_info(info, "desc") 105 desc += get_info(info, "long_desc") 106 desc += get_info(info, "encoding_desc") 107 return desc 108 return "description" 109 110 def matches(self, query: str) -> bool: 111 return query in self.pmu or query in self.event 112 113 def parse(self) -> perf.evlist: 114 return perf.parse_events(self.name()) 115 116 def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float: 117 return evsel.read(cpu, thread).val 118 119 120class ErrorScreen(ModalScreen[bool]): 121 """Pop up dialog for errors.""" 122 123 CSS = """ 124 ErrorScreen { 125 align: center middle; 126 } 127 """ 128 129 def __init__(self, error: str): 130 self.error = error 131 super().__init__() 132 133 def compose(self) -> ComposeResult: 134 yield Button(f"Error: {self.error}", variant="primary", id="error") 135 136 def on_button_pressed(self, event: Button.Pressed) -> None: 137 self.dismiss(True) 138 139 140class SearchScreen(ModalScreen[str]): 141 """Pop up dialog for search.""" 142 143 CSS = """ 144 SearchScreen Horizontal { 145 align: center middle; 146 margin-top: 1; 147 } 148 SearchScreen Input { 149 width: 1fr; 150 } 151 """ 152 153 def compose(self) -> ComposeResult: 154 yield Horizontal(SearchIcon(), Input(placeholder="Event name")) 155 156 def on_input_submitted(self, event: Input.Submitted) -> None: 157 """Handle the user pressing Enter in the input field.""" 158 self.dismiss(event.value) 159 160 161class Counter(HorizontalGroup): 162 """Two labels for a CPU and its counter value.""" 163 164 CSS = """ 165 Label { 166 gutter: 1; 167 } 168 """ 169 170 def __init__(self, cpu: int) -> None: 171 self.cpu = cpu 172 super().__init__() 173 174 def compose(self) -> ComposeResult: 175 label = f"cpu{self.cpu}" if self.cpu >= 0 else "total" 176 yield Label(label + " ") 177 yield Label("0", id=f"counter_{label}") 178 179 180class CounterSparkline(HorizontalGroup): 181 """A Sparkline for a performance counter.""" 182 183 def __init__(self, cpu: int) -> None: 184 self.cpu = cpu 185 super().__init__() 186 187 def compose(self) -> ComposeResult: 188 label = f"cpu{self.cpu}" if self.cpu >= 0 else "total" 189 yield Label(label) 190 yield Sparkline([], summary_function=max, id=f"sparkline_{label}") 191 192 193class IListApp(App): 194 TITLE = "Interactive Perf List" 195 196 BINDINGS = [ 197 Binding(key="s", action="search", description="Search", 198 tooltip="Search events and PMUs"), 199 Binding(key="n", action="next", description="Next", 200 tooltip="Next search result or item"), 201 Binding(key="p", action="prev", description="Previous", 202 tooltip="Previous search result or item"), 203 Binding(key="c", action="collapse", description="Collapse", 204 tooltip="Collapse the current PMU"), 205 Binding(key="^q", action="quit", description="Quit", 206 tooltip="Quit the app"), 207 ] 208 209 CSS = """ 210 /* Make the 'total' sparkline a different color. */ 211 #sparkline_total > .sparkline--min-color { 212 color: $accent; 213 } 214 #sparkline_total > .sparkline--max-color { 215 color: $accent 30%; 216 } 217 /* 218 * Make the active_search initially not displayed with the text in 219 * the middle of the line. 220 */ 221 #active_search { 222 display: none; 223 width: 100%; 224 text-align: center; 225 } 226 """ 227 228 def __init__(self, interval: float) -> None: 229 self.interval = interval 230 self.evlist = None 231 self.selected: Optional[TreeValue] = None 232 self.search_results: list[TreeNode[TreeValue]] = [] 233 self.cur_search_result: TreeNode[TreeValue] | None = None 234 super().__init__() 235 236 def expand_and_select(self, node: TreeNode[Any]) -> None: 237 """Expand select a node in the tree.""" 238 if node.parent: 239 node.parent.expand() 240 if node.parent.parent: 241 node.parent.parent.expand() 242 node.expand() 243 node.tree.select_node(node) 244 node.tree.scroll_to_node(node) 245 246 def set_searched_tree_node(self, previous: bool) -> None: 247 """Set the cur_search_result node to either the next or previous.""" 248 l = len(self.search_results) 249 250 if l < 1: 251 tree: Tree[TreeValue] = self.query_one("#root", Tree) 252 if previous: 253 tree.action_cursor_up() 254 else: 255 tree.action_cursor_down() 256 return 257 258 if self.cur_search_result: 259 idx = self.search_results.index(self.cur_search_result) 260 if previous: 261 idx = idx - 1 if idx > 0 else l - 1 262 else: 263 idx = idx + 1 if idx < l - 1 else 0 264 else: 265 idx = l - 1 if previous else 0 266 267 node = self.search_results[idx] 268 if node == self.cur_search_result: 269 return 270 271 self.cur_search_result = node 272 self.expand_and_select(node) 273 274 def action_search(self) -> None: 275 """Search was chosen.""" 276 def set_initial_focus(event: str | None) -> None: 277 """Sets the focus after the SearchScreen is dismissed.""" 278 279 search_label = self.query_one("#active_search", Label) 280 search_label.display = True if event else False 281 if not event: 282 return 283 event = event.lower() 284 search_label.update(f'Searching for events matching "{event}"') 285 286 tree: Tree[str] = self.query_one("#root", Tree) 287 288 def find_search_results(event: str, node: TreeNode[str], 289 cursor_seen: bool = False, 290 match_after_cursor: Optional[TreeNode[str]] = None 291 ) -> Tuple[bool, Optional[TreeNode[str]]]: 292 """Find nodes that match the search remembering the one after the cursor.""" 293 if not cursor_seen and node == tree.cursor_node: 294 cursor_seen = True 295 if node.data and node.data.matches(event): 296 if cursor_seen and not match_after_cursor: 297 match_after_cursor = node 298 self.search_results.append(node) 299 300 if node.children: 301 for child in node.children: 302 (cursor_seen, match_after_cursor) = \ 303 find_search_results(event, child, cursor_seen, match_after_cursor) 304 return (cursor_seen, match_after_cursor) 305 306 self.search_results.clear() 307 (_, self.cur_search_result) = find_search_results(event, tree.root) 308 if len(self.search_results) < 1: 309 self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}")) 310 search_label.display = False 311 elif self.cur_search_result: 312 self.expand_and_select(self.cur_search_result) 313 else: 314 self.set_searched_tree_node(previous=False) 315 316 self.push_screen(SearchScreen(), set_initial_focus) 317 318 def action_next(self) -> None: 319 """Next was chosen.""" 320 self.set_searched_tree_node(previous=False) 321 322 def action_prev(self) -> None: 323 """Previous was chosen.""" 324 self.set_searched_tree_node(previous=True) 325 326 def action_collapse(self) -> None: 327 """Collapse the part of the tree currently on.""" 328 tree: Tree[str] = self.query_one("#root", Tree) 329 node = tree.cursor_node 330 if node and node.parent: 331 node.parent.collapse_all() 332 node.tree.scroll_to_node(node.parent) 333 334 def update_counts(self) -> None: 335 """Called every interval to update counts.""" 336 if not self.selected or not self.evlist: 337 return 338 339 def update_count(cpu: int, count: int): 340 # Update the raw count display. 341 counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total") 342 if not counter: 343 return 344 counter = counter.first(Label) 345 counter.update(str(count)) 346 347 # Update the sparkline. 348 line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total") 349 if not line: 350 return 351 line = line.first(Sparkline) 352 # If there are more events than the width, remove the front event. 353 if len(line.data) > line.size.width: 354 line.data.pop(0) 355 line.data.append(count) 356 line.mutate_reactive(Sparkline.data) 357 358 # Update the total and each CPU counts, assume there's just 1 evsel. 359 total = 0 360 self.evlist.disable() 361 for evsel in self.evlist: 362 for cpu in evsel.cpus(): 363 aggr = 0 364 for thread in evsel.threads(): 365 aggr += self.selected.value(self.evlist, evsel, cpu, thread) 366 update_count(cpu, aggr) 367 total += aggr 368 update_count(-1, total) 369 self.evlist.enable() 370 371 def on_mount(self) -> None: 372 """When App starts set up periodic event updating.""" 373 self.update_counts() 374 self.set_interval(self.interval, self.update_counts) 375 376 def set_selected(self, value: TreeValue) -> None: 377 """Updates the event/description and starts the counters.""" 378 try: 379 label_name = self.query_one("#event_name", Label) 380 event_description = self.query_one("#event_description", Static) 381 lines = self.query_one("#lines") 382 except NoMatches: 383 # A race with rendering, ignore the update as we can't 384 # mount the assumed output widgets. 385 return 386 387 self.selected = value 388 389 # Remove previous event information. 390 if self.evlist: 391 self.evlist.disable() 392 self.evlist.close() 393 old_lines = self.query(CounterSparkline) 394 for line in old_lines: 395 line.remove() 396 old_counters = self.query(Counter) 397 for counter in old_counters: 398 counter.remove() 399 400 # Update event/metric text and description. 401 label_name.update(value.name()) 402 event_description.update(value.description()) 403 404 # Open the event. 405 try: 406 self.evlist = value.parse() 407 if self.evlist: 408 self.evlist.open() 409 self.evlist.enable() 410 except: 411 self.evlist = None 412 413 if not self.evlist: 414 self.push_screen(ErrorScreen(f"Failed to open {value.name()}")) 415 return 416 417 # Add spark lines for all the CPUs. Note, must be done after 418 # open so that the evlist CPUs have been computed by propagate 419 # maps. 420 line = CounterSparkline(cpu=-1) 421 lines.mount(line) 422 for cpu in self.evlist.all_cpus(): 423 line = CounterSparkline(cpu) 424 lines.mount(line) 425 line = Counter(cpu=-1) 426 lines.mount(line) 427 for cpu in self.evlist.all_cpus(): 428 line = Counter(cpu) 429 lines.mount(line) 430 431 def compose(self) -> ComposeResult: 432 """Draws the app.""" 433 def metric_event_tree() -> Tree: 434 """Create tree of PMUs and metricgroups with events or metrics under.""" 435 tree: Tree[TreeValue] = Tree("Root", id="root") 436 pmus = tree.root.add("PMUs") 437 for pmu in perf.pmus(): 438 pmu_name = pmu.name().lower() 439 pmu_node = pmus.add(pmu_name) 440 try: 441 for event in sorted(pmu.events(), key=lambda x: x["name"]): 442 if "name" in event: 443 e = event["name"].lower() 444 if "alias" in event: 445 pmu_node.add_leaf(f'{e} ({event["alias"]})', 446 data=PmuEvent(pmu_name, e)) 447 else: 448 pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e)) 449 except: 450 # Reading events may fail with EPERM, ignore. 451 pass 452 metrics = tree.root.add("Metrics") 453 groups = set() 454 for metric in perf.metrics(): 455 groups.update(metric["MetricGroup"]) 456 457 def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str): 458 for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]): 459 if parent in metric["MetricGroup"]: 460 name = metric["MetricName"] 461 node.add_leaf(name, data=Metric(name)) 462 child_group_name = f'{name}_group' 463 if child_group_name in groups: 464 add_metrics_to_tree(node.add(child_group_name), child_group_name) 465 466 for group in sorted(groups): 467 if group.endswith('_group'): 468 continue 469 add_metrics_to_tree(metrics.add(group), group) 470 471 tree.root.expand() 472 return tree 473 474 yield Header(id="header") 475 yield Horizontal(Vertical(metric_event_tree(), id="events"), 476 Vertical(Label("event name", id="event_name"), 477 Static("description", markup=False, id="event_description"), 478 )) 479 yield Label(id="active_search") 480 yield VerticalScroll(id="lines") 481 yield Footer(id="footer") 482 483 @on(Tree.NodeSelected) 484 def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None: 485 """Called when a tree node is selected, selecting the event.""" 486 if event.node.data: 487 self.set_selected(event.node.data) 488 489 490if __name__ == "__main__": 491 ap = argparse.ArgumentParser() 492 ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1) 493 args = ap.parse_args() 494 app = IListApp(float(args.interval)) 495 app.run() 496