xref: /linux/tools/perf/python/ilist.py (revision af9e8d12b139c92e748eb2956bbef03315ea7516)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
3"""Interactive perf list."""
4
5from abc import ABC, abstractmethod
6import argparse
7from dataclasses import dataclass
8import math
9from typing import Any, Dict, Optional, Tuple
10import perf
11from textual import on
12from textual.app import App, ComposeResult
13from textual.binding import Binding
14from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll
15from textual.css.query import NoMatches
16from textual.command import SearchIcon
17from textual.screen import ModalScreen
18from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree
19from textual.widgets.tree import TreeNode
20
21
22def get_info(info: Dict[str, str], key: str):
23    return (info[key] + "\n") if key in info else ""
24
25
26class TreeValue(ABC):
27    """Abstraction for the data of value in the tree."""
28
29    @abstractmethod
30    def name(self) -> str:
31        pass
32
33    @abstractmethod
34    def description(self) -> str:
35        pass
36
37    @abstractmethod
38    def matches(self, query: str) -> bool:
39        pass
40
41    @abstractmethod
42    def parse(self) -> perf.evlist:
43        pass
44
45    @abstractmethod
46    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
47        pass
48
49
50@dataclass
51class Metric(TreeValue):
52    """A metric in the tree."""
53    metric_name: str
54    metric_pmu: str
55
56    def name(self) -> str:
57        return self.metric_name
58
59    def description(self) -> str:
60        """Find and format metric description."""
61        for metric in perf.metrics():
62            if metric["MetricName"] != self.metric_name:
63                continue
64            if self.metric_pmu and metric["PMU"] != self.metric_pmu:
65                continue
66            desc = get_info(metric, "BriefDescription")
67            desc += get_info(metric, "PublicDescription")
68            desc += get_info(metric, "MetricExpr")
69            desc += get_info(metric, "MetricThreshold")
70            return desc
71        return "description"
72
73    def matches(self, query: str) -> bool:
74        return query in self.metric_name
75
76    def parse(self) -> perf.evlist:
77        return perf.parse_metrics(self.metric_name, self.metric_pmu)
78
79    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
80        val = evlist.compute_metric(self.metric_name, cpu, thread)
81        return 0 if math.isnan(val) else val
82
83
84@dataclass
85class PmuEvent(TreeValue):
86    """A PMU and event within the tree."""
87    pmu: str
88    event: str
89
90    def name(self) -> str:
91        if self.event.startswith(self.pmu) or ':' in self.event:
92            return self.event
93        else:
94            return f"{self.pmu}/{self.event}/"
95
96    def description(self) -> str:
97        """Find and format event description for {pmu}/{event}/."""
98        for p in perf.pmus():
99            if p.name() != self.pmu:
100                continue
101            for info in p.events():
102                if "name" not in info or info["name"] != self.event:
103                    continue
104
105                desc = get_info(info, "topic")
106                desc += get_info(info, "event_type_desc")
107                desc += get_info(info, "desc")
108                desc += get_info(info, "long_desc")
109                desc += get_info(info, "encoding_desc")
110                return desc
111        return "description"
112
113    def matches(self, query: str) -> bool:
114        return query in self.pmu or query in self.event
115
116    def parse(self) -> perf.evlist:
117        return perf.parse_events(self.name())
118
119    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
120        return evsel.read(cpu, thread).val
121
122
123class ErrorScreen(ModalScreen[bool]):
124    """Pop up dialog for errors."""
125
126    CSS = """
127    ErrorScreen {
128        align: center middle;
129    }
130    """
131
132    def __init__(self, error: str):
133        self.error = error
134        super().__init__()
135
136    def compose(self) -> ComposeResult:
137        yield Button(f"Error: {self.error}", variant="primary", id="error")
138
139    def on_button_pressed(self, event: Button.Pressed) -> None:
140        self.dismiss(True)
141
142
143class SearchScreen(ModalScreen[str]):
144    """Pop up dialog for search."""
145
146    CSS = """
147    SearchScreen Horizontal {
148        align: center middle;
149        margin-top: 1;
150    }
151    SearchScreen Input {
152        width: 1fr;
153    }
154    """
155
156    def compose(self) -> ComposeResult:
157        yield Horizontal(SearchIcon(), Input(placeholder="Event name"))
158
159    def on_input_submitted(self, event: Input.Submitted) -> None:
160        """Handle the user pressing Enter in the input field."""
161        self.dismiss(event.value)
162
163
164class Counter(HorizontalGroup):
165    """Two labels for a CPU and its counter value."""
166
167    CSS = """
168    Label {
169        gutter: 1;
170    }
171    """
172
173    def __init__(self, cpu: int) -> None:
174        self.cpu = cpu
175        super().__init__()
176
177    def compose(self) -> ComposeResult:
178        label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
179        yield Label(label + " ")
180        yield Label("0", id=f"counter_{label}")
181
182
183class CounterSparkline(HorizontalGroup):
184    """A Sparkline for a performance counter."""
185
186    def __init__(self, cpu: int) -> None:
187        self.cpu = cpu
188        super().__init__()
189
190    def compose(self) -> ComposeResult:
191        label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
192        yield Label(label)
193        yield Sparkline([], summary_function=max, id=f"sparkline_{label}")
194
195
196class IListApp(App):
197    TITLE = "Interactive Perf List"
198
199    BINDINGS = [
200        Binding(key="s", action="search", description="Search",
201                tooltip="Search events and PMUs"),
202        Binding(key="n", action="next", description="Next",
203                tooltip="Next search result or item"),
204        Binding(key="p", action="prev", description="Previous",
205                tooltip="Previous search result or item"),
206        Binding(key="c", action="collapse", description="Collapse",
207                tooltip="Collapse the current PMU"),
208        Binding(key="^q", action="quit", description="Quit",
209                tooltip="Quit the app"),
210    ]
211
212    CSS = """
213        /* Make the 'total' sparkline a different color. */
214        #sparkline_total > .sparkline--min-color {
215            color: $accent;
216        }
217        #sparkline_total > .sparkline--max-color {
218            color: $accent 30%;
219        }
220        /*
221         * Make the active_search initially not displayed with the text in
222         * the middle of the line.
223         */
224        #active_search {
225            display: none;
226            width: 100%;
227            text-align: center;
228        }
229    """
230
231    def __init__(self, interval: float) -> None:
232        self.interval = interval
233        self.evlist = None
234        self.selected: Optional[TreeValue] = None
235        self.search_results: list[TreeNode[TreeValue]] = []
236        self.cur_search_result: TreeNode[TreeValue] | None = None
237        super().__init__()
238
239    def expand_and_select(self, node: TreeNode[Any]) -> None:
240        """Expand select a node in the tree."""
241        if node.parent:
242            node.parent.expand()
243            if node.parent.parent:
244                node.parent.parent.expand()
245        node.expand()
246        node.tree.select_node(node)
247        node.tree.scroll_to_node(node)
248
249    def set_searched_tree_node(self, previous: bool) -> None:
250        """Set the cur_search_result node to either the next or previous."""
251        l = len(self.search_results)
252
253        if l < 1:
254            tree: Tree[TreeValue] = self.query_one("#root", Tree)
255            if previous:
256                tree.action_cursor_up()
257            else:
258                tree.action_cursor_down()
259            return
260
261        if self.cur_search_result:
262            idx = self.search_results.index(self.cur_search_result)
263            if previous:
264                idx = idx - 1 if idx > 0 else l - 1
265            else:
266                idx = idx + 1 if idx < l - 1 else 0
267        else:
268            idx = l - 1 if previous else 0
269
270        node = self.search_results[idx]
271        if node == self.cur_search_result:
272            return
273
274        self.cur_search_result = node
275        self.expand_and_select(node)
276
277    def action_search(self) -> None:
278        """Search was chosen."""
279        def set_initial_focus(event: str | None) -> None:
280            """Sets the focus after the SearchScreen is dismissed."""
281
282            search_label = self.query_one("#active_search", Label)
283            search_label.display = True if event else False
284            if not event:
285                return
286            event = event.lower()
287            search_label.update(f'Searching for events matching "{event}"')
288
289            tree: Tree[str] = self.query_one("#root", Tree)
290
291            def find_search_results(event: str, node: TreeNode[str],
292                                    cursor_seen: bool = False,
293                                    match_after_cursor: Optional[TreeNode[str]] = None
294                                    ) -> Tuple[bool, Optional[TreeNode[str]]]:
295                """Find nodes that match the search remembering the one after the cursor."""
296                if not cursor_seen and node == tree.cursor_node:
297                    cursor_seen = True
298                if node.data and node.data.matches(event):
299                    if cursor_seen and not match_after_cursor:
300                        match_after_cursor = node
301                    self.search_results.append(node)
302
303                if node.children:
304                    for child in node.children:
305                        (cursor_seen, match_after_cursor) = \
306                            find_search_results(event, child, cursor_seen, match_after_cursor)
307                return (cursor_seen, match_after_cursor)
308
309            self.search_results.clear()
310            (_, self.cur_search_result) = find_search_results(event, tree.root)
311            if len(self.search_results) < 1:
312                self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}"))
313                search_label.display = False
314            elif self.cur_search_result:
315                self.expand_and_select(self.cur_search_result)
316            else:
317                self.set_searched_tree_node(previous=False)
318
319        self.push_screen(SearchScreen(), set_initial_focus)
320
321    def action_next(self) -> None:
322        """Next was chosen."""
323        self.set_searched_tree_node(previous=False)
324
325    def action_prev(self) -> None:
326        """Previous was chosen."""
327        self.set_searched_tree_node(previous=True)
328
329    def action_collapse(self) -> None:
330        """Collapse the part of the tree currently on."""
331        tree: Tree[str] = self.query_one("#root", Tree)
332        node = tree.cursor_node
333        if node and node.parent:
334            node.parent.collapse_all()
335            node.tree.scroll_to_node(node.parent)
336
337    def update_counts(self) -> None:
338        """Called every interval to update counts."""
339        if not self.selected or not self.evlist:
340            return
341
342        def update_count(cpu: int, count: int):
343            # Update the raw count display.
344            counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total")
345            if not counter:
346                return
347            counter = counter.first(Label)
348            counter.update(str(count))
349
350            # Update the sparkline.
351            line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total")
352            if not line:
353                return
354            line = line.first(Sparkline)
355            # If there are more events than the width, remove the front event.
356            if len(line.data) > line.size.width:
357                line.data.pop(0)
358            line.data.append(count)
359            line.mutate_reactive(Sparkline.data)
360
361        # Update the total and each CPU counts, assume there's just 1 evsel.
362        total = 0
363        self.evlist.disable()
364        for evsel in self.evlist:
365            for cpu in evsel.cpus():
366                aggr = 0
367                for thread in evsel.threads():
368                    aggr += self.selected.value(self.evlist, evsel, cpu, thread)
369                update_count(cpu, aggr)
370                total += aggr
371        update_count(-1, total)
372        self.evlist.enable()
373
374    def on_mount(self) -> None:
375        """When App starts set up periodic event updating."""
376        self.update_counts()
377        self.set_interval(self.interval, self.update_counts)
378
379    def set_selected(self, value: TreeValue) -> None:
380        """Updates the event/description and starts the counters."""
381        try:
382            label_name = self.query_one("#event_name", Label)
383            event_description = self.query_one("#event_description", Static)
384            lines = self.query_one("#lines")
385        except NoMatches:
386            # A race with rendering, ignore the update as we can't
387            # mount the assumed output widgets.
388            return
389
390        self.selected = value
391
392        # Remove previous event information.
393        if self.evlist:
394            self.evlist.disable()
395            self.evlist.close()
396            old_lines = self.query(CounterSparkline)
397            for line in old_lines:
398                line.remove()
399            old_counters = self.query(Counter)
400            for counter in old_counters:
401                counter.remove()
402
403        # Update event/metric text and description.
404        label_name.update(value.name())
405        event_description.update(value.description())
406
407        # Open the event.
408        try:
409            self.evlist = value.parse()
410            if self.evlist:
411                self.evlist.open()
412                self.evlist.enable()
413        except:
414            self.evlist = None
415
416        if not self.evlist:
417            self.push_screen(ErrorScreen(f"Failed to open {value.name()}"))
418            return
419
420        # Add spark lines for all the CPUs. Note, must be done after
421        # open so that the evlist CPUs have been computed by propagate
422        # maps.
423        line = CounterSparkline(cpu=-1)
424        lines.mount(line)
425        for cpu in self.evlist.all_cpus():
426            line = CounterSparkline(cpu)
427            lines.mount(line)
428        line = Counter(cpu=-1)
429        lines.mount(line)
430        for cpu in self.evlist.all_cpus():
431            line = Counter(cpu)
432            lines.mount(line)
433
434    def compose(self) -> ComposeResult:
435        """Draws the app."""
436        def metric_event_tree() -> Tree:
437            """Create tree of PMUs and metricgroups with events or metrics under."""
438            tree: Tree[TreeValue] = Tree("Root", id="root")
439            pmus = tree.root.add("PMUs")
440            for pmu in perf.pmus():
441                pmu_name = pmu.name().lower()
442                pmu_node = pmus.add(pmu_name)
443                try:
444                    for event in sorted(pmu.events(), key=lambda x: x["name"]):
445                        if "deprecated" in event:
446                            continue
447                        if "name" in event:
448                            e = event["name"].lower()
449                            if "alias" in event:
450                                pmu_node.add_leaf(f'{e} ({event["alias"]})',
451                                                  data=PmuEvent(pmu_name, e))
452                            else:
453                                pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e))
454                except:
455                    # Reading events may fail with EPERM, ignore.
456                    pass
457            metrics = tree.root.add("Metrics")
458            groups = set()
459            for metric in perf.metrics():
460                groups.update(metric["MetricGroup"])
461
462            def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str, pmu: str = None):
463                for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]):
464                    metric_pmu = metric.get('PMU')
465                    if pmu and metric_pmu and metric_pmu != pmu:
466                        continue
467                    if parent in metric["MetricGroup"]:
468                        name = metric["MetricName"]
469                        display_name = name
470                        if metric_pmu:
471                            display_name += f" ({metric_pmu})"
472                        node.add_leaf(display_name, data=Metric(name, metric_pmu))
473                        child_group_name = f'{name}_group'
474                        if child_group_name in groups:
475                            display_child_group_name = child_group_name
476                            if metric_pmu:
477                                display_child_group_name += f" ({metric_pmu})"
478                            add_metrics_to_tree(node.add(display_child_group_name),
479                                                child_group_name,
480                                                metric_pmu)
481
482            for group in sorted(groups):
483                if group.endswith('_group'):
484                    continue
485                add_metrics_to_tree(metrics.add(group), group)
486
487            tree.root.expand()
488            return tree
489
490        yield Header(id="header")
491        yield Horizontal(Vertical(metric_event_tree(), id="events"),
492                         Vertical(Label("event name", id="event_name"),
493                                  Static("description", markup=False, id="event_description"),
494                                  ))
495        yield Label(id="active_search")
496        yield VerticalScroll(id="lines")
497        yield Footer(id="footer")
498
499    @on(Tree.NodeSelected)
500    def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None:
501        """Called when a tree node is selected, selecting the event."""
502        if event.node.data:
503            self.set_selected(event.node.data)
504
505
506if __name__ == "__main__":
507    ap = argparse.ArgumentParser()
508    ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1)
509    args = ap.parse_args()
510    app = IListApp(float(args.interval))
511    app.run()
512