xref: /linux/tools/perf/python/ilist.py (revision 48a710760e10a4f36e11233a21860796ba204b1e)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
3"""Interactive perf list."""
4
5from abc import ABC, abstractmethod
6import argparse
7from dataclasses import dataclass
8import math
9from typing import Any, Dict, Optional, Tuple
10import perf
11from textual import on
12from textual.app import App, ComposeResult
13from textual.binding import Binding
14from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll
15from textual.css.query import NoMatches
16from textual.command import SearchIcon
17from textual.screen import ModalScreen
18from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree
19from textual.widgets.tree import TreeNode
20
21
22def get_info(info: Dict[str, str], key: str):
23    return (info[key] + "\n") if key in info else ""
24
25
26class TreeValue(ABC):
27    """Abstraction for the data of value in the tree."""
28
29    @abstractmethod
30    def name(self) -> str:
31        pass
32
33    @abstractmethod
34    def description(self) -> str:
35        pass
36
37    @abstractmethod
38    def matches(self, query: str) -> bool:
39        pass
40
41    @abstractmethod
42    def parse(self) -> perf.evlist:
43        pass
44
45    @abstractmethod
46    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
47        pass
48
49
50@dataclass
51class Metric(TreeValue):
52    """A metric in the tree."""
53    metric_name: str
54
55    def name(self) -> str:
56        return self.metric_name
57
58    def description(self) -> str:
59        """Find and format metric description."""
60        for metric in perf.metrics():
61            if metric["MetricName"] != self.metric_name:
62                continue
63            desc = get_info(metric, "BriefDescription")
64            desc += get_info(metric, "PublicDescription")
65            desc += get_info(metric, "MetricExpr")
66            desc += get_info(metric, "MetricThreshold")
67            return desc
68        return "description"
69
70    def matches(self, query: str) -> bool:
71        return query in self.metric_name
72
73    def parse(self) -> perf.evlist:
74        return perf.parse_metrics(self.metric_name)
75
76    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
77        val = evlist.compute_metric(self.metric_name, cpu, thread)
78        return 0 if math.isnan(val) else val
79
80
81@dataclass
82class PmuEvent(TreeValue):
83    """A PMU and event within the tree."""
84    pmu: str
85    event: str
86
87    def name(self) -> str:
88        if self.event.startswith(self.pmu) or ':' in self.event:
89            return self.event
90        else:
91            return f"{self.pmu}/{self.event}/"
92
93    def description(self) -> str:
94        """Find and format event description for {pmu}/{event}/."""
95        for p in perf.pmus():
96            if p.name() != self.pmu:
97                continue
98            for info in p.events():
99                if "name" not in info or info["name"] != self.event:
100                    continue
101
102                desc = get_info(info, "topic")
103                desc += get_info(info, "event_type_desc")
104                desc += get_info(info, "desc")
105                desc += get_info(info, "long_desc")
106                desc += get_info(info, "encoding_desc")
107                return desc
108        return "description"
109
110    def matches(self, query: str) -> bool:
111        return query in self.pmu or query in self.event
112
113    def parse(self) -> perf.evlist:
114        return perf.parse_events(self.name())
115
116    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
117        return evsel.read(cpu, thread).val
118
119
120class ErrorScreen(ModalScreen[bool]):
121    """Pop up dialog for errors."""
122
123    CSS = """
124    ErrorScreen {
125        align: center middle;
126    }
127    """
128
129    def __init__(self, error: str):
130        self.error = error
131        super().__init__()
132
133    def compose(self) -> ComposeResult:
134        yield Button(f"Error: {self.error}", variant="primary", id="error")
135
136    def on_button_pressed(self, event: Button.Pressed) -> None:
137        self.dismiss(True)
138
139
140class SearchScreen(ModalScreen[str]):
141    """Pop up dialog for search."""
142
143    CSS = """
144    SearchScreen Horizontal {
145        align: center middle;
146        margin-top: 1;
147    }
148    SearchScreen Input {
149        width: 1fr;
150    }
151    """
152
153    def compose(self) -> ComposeResult:
154        yield Horizontal(SearchIcon(), Input(placeholder="Event name"))
155
156    def on_input_submitted(self, event: Input.Submitted) -> None:
157        """Handle the user pressing Enter in the input field."""
158        self.dismiss(event.value)
159
160
161class Counter(HorizontalGroup):
162    """Two labels for a CPU and its counter value."""
163
164    CSS = """
165    Label {
166        gutter: 1;
167    }
168    """
169
170    def __init__(self, cpu: int) -> None:
171        self.cpu = cpu
172        super().__init__()
173
174    def compose(self) -> ComposeResult:
175        label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
176        yield Label(label + " ")
177        yield Label("0", id=f"counter_{label}")
178
179
180class CounterSparkline(HorizontalGroup):
181    """A Sparkline for a performance counter."""
182
183    def __init__(self, cpu: int) -> None:
184        self.cpu = cpu
185        super().__init__()
186
187    def compose(self) -> ComposeResult:
188        label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
189        yield Label(label)
190        yield Sparkline([], summary_function=max, id=f"sparkline_{label}")
191
192
193class IListApp(App):
194    TITLE = "Interactive Perf List"
195
196    BINDINGS = [
197        Binding(key="s", action="search", description="Search",
198                tooltip="Search events and PMUs"),
199        Binding(key="n", action="next", description="Next",
200                tooltip="Next search result or item"),
201        Binding(key="p", action="prev", description="Previous",
202                tooltip="Previous search result or item"),
203        Binding(key="c", action="collapse", description="Collapse",
204                tooltip="Collapse the current PMU"),
205        Binding(key="^q", action="quit", description="Quit",
206                tooltip="Quit the app"),
207    ]
208
209    CSS = """
210        /* Make the 'total' sparkline a different color. */
211        #sparkline_total > .sparkline--min-color {
212            color: $accent;
213        }
214        #sparkline_total > .sparkline--max-color {
215            color: $accent 30%;
216        }
217        /*
218         * Make the active_search initially not displayed with the text in
219         * the middle of the line.
220         */
221        #active_search {
222            display: none;
223            width: 100%;
224            text-align: center;
225        }
226    """
227
228    def __init__(self, interval: float) -> None:
229        self.interval = interval
230        self.evlist = None
231        self.selected: Optional[TreeValue] = None
232        self.search_results: list[TreeNode[TreeValue]] = []
233        self.cur_search_result: TreeNode[TreeValue] | None = None
234        super().__init__()
235
236    def expand_and_select(self, node: TreeNode[Any]) -> None:
237        """Expand select a node in the tree."""
238        if node.parent:
239            node.parent.expand()
240            if node.parent.parent:
241                node.parent.parent.expand()
242        node.expand()
243        node.tree.select_node(node)
244        node.tree.scroll_to_node(node)
245
246    def set_searched_tree_node(self, previous: bool) -> None:
247        """Set the cur_search_result node to either the next or previous."""
248        l = len(self.search_results)
249
250        if l < 1:
251            tree: Tree[TreeValue] = self.query_one("#root", Tree)
252            if previous:
253                tree.action_cursor_up()
254            else:
255                tree.action_cursor_down()
256            return
257
258        if self.cur_search_result:
259            idx = self.search_results.index(self.cur_search_result)
260            if previous:
261                idx = idx - 1 if idx > 0 else l - 1
262            else:
263                idx = idx + 1 if idx < l - 1 else 0
264        else:
265            idx = l - 1 if previous else 0
266
267        node = self.search_results[idx]
268        if node == self.cur_search_result:
269            return
270
271        self.cur_search_result = node
272        self.expand_and_select(node)
273
274    def action_search(self) -> None:
275        """Search was chosen."""
276        def set_initial_focus(event: str | None) -> None:
277            """Sets the focus after the SearchScreen is dismissed."""
278
279            search_label = self.query_one("#active_search", Label)
280            search_label.display = True if event else False
281            if not event:
282                return
283            event = event.lower()
284            search_label.update(f'Searching for events matching "{event}"')
285
286            tree: Tree[str] = self.query_one("#root", Tree)
287
288            def find_search_results(event: str, node: TreeNode[str],
289                                    cursor_seen: bool = False,
290                                    match_after_cursor: Optional[TreeNode[str]] = None
291                                    ) -> Tuple[bool, Optional[TreeNode[str]]]:
292                """Find nodes that match the search remembering the one after the cursor."""
293                if not cursor_seen and node == tree.cursor_node:
294                    cursor_seen = True
295                if node.data and node.data.matches(event):
296                    if cursor_seen and not match_after_cursor:
297                        match_after_cursor = node
298                    self.search_results.append(node)
299
300                if node.children:
301                    for child in node.children:
302                        (cursor_seen, match_after_cursor) = \
303                            find_search_results(event, child, cursor_seen, match_after_cursor)
304                return (cursor_seen, match_after_cursor)
305
306            self.search_results.clear()
307            (_, self.cur_search_result) = find_search_results(event, tree.root)
308            if len(self.search_results) < 1:
309                self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}"))
310                search_label.display = False
311            elif self.cur_search_result:
312                self.expand_and_select(self.cur_search_result)
313            else:
314                self.set_searched_tree_node(previous=False)
315
316        self.push_screen(SearchScreen(), set_initial_focus)
317
318    def action_next(self) -> None:
319        """Next was chosen."""
320        self.set_searched_tree_node(previous=False)
321
322    def action_prev(self) -> None:
323        """Previous was chosen."""
324        self.set_searched_tree_node(previous=True)
325
326    def action_collapse(self) -> None:
327        """Collapse the part of the tree currently on."""
328        tree: Tree[str] = self.query_one("#root", Tree)
329        node = tree.cursor_node
330        if node and node.parent:
331            node.parent.collapse_all()
332            node.tree.scroll_to_node(node.parent)
333
334    def update_counts(self) -> None:
335        """Called every interval to update counts."""
336        if not self.selected or not self.evlist:
337            return
338
339        def update_count(cpu: int, count: int):
340            # Update the raw count display.
341            counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total")
342            if not counter:
343                return
344            counter = counter.first(Label)
345            counter.update(str(count))
346
347            # Update the sparkline.
348            line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total")
349            if not line:
350                return
351            line = line.first(Sparkline)
352            # If there are more events than the width, remove the front event.
353            if len(line.data) > line.size.width:
354                line.data.pop(0)
355            line.data.append(count)
356            line.mutate_reactive(Sparkline.data)
357
358        # Update the total and each CPU counts, assume there's just 1 evsel.
359        total = 0
360        self.evlist.disable()
361        for evsel in self.evlist:
362            for cpu in evsel.cpus():
363                aggr = 0
364                for thread in evsel.threads():
365                    aggr += self.selected.value(self.evlist, evsel, cpu, thread)
366                update_count(cpu, aggr)
367                total += aggr
368        update_count(-1, total)
369        self.evlist.enable()
370
371    def on_mount(self) -> None:
372        """When App starts set up periodic event updating."""
373        self.update_counts()
374        self.set_interval(self.interval, self.update_counts)
375
376    def set_selected(self, value: TreeValue) -> None:
377        """Updates the event/description and starts the counters."""
378        try:
379            label_name = self.query_one("#event_name", Label)
380            event_description = self.query_one("#event_description", Static)
381            lines = self.query_one("#lines")
382        except NoMatches:
383            # A race with rendering, ignore the update as we can't
384            # mount the assumed output widgets.
385            return
386
387        self.selected = value
388
389        # Remove previous event information.
390        if self.evlist:
391            self.evlist.disable()
392            self.evlist.close()
393            old_lines = self.query(CounterSparkline)
394            for line in old_lines:
395                line.remove()
396            old_counters = self.query(Counter)
397            for counter in old_counters:
398                counter.remove()
399
400        # Update event/metric text and description.
401        label_name.update(value.name())
402        event_description.update(value.description())
403
404        # Open the event.
405        try:
406            self.evlist = value.parse()
407            if self.evlist:
408                self.evlist.open()
409                self.evlist.enable()
410        except:
411            self.evlist = None
412
413        if not self.evlist:
414            self.push_screen(ErrorScreen(f"Failed to open {value.name()}"))
415            return
416
417        # Add spark lines for all the CPUs. Note, must be done after
418        # open so that the evlist CPUs have been computed by propagate
419        # maps.
420        line = CounterSparkline(cpu=-1)
421        lines.mount(line)
422        for cpu in self.evlist.all_cpus():
423            line = CounterSparkline(cpu)
424            lines.mount(line)
425        line = Counter(cpu=-1)
426        lines.mount(line)
427        for cpu in self.evlist.all_cpus():
428            line = Counter(cpu)
429            lines.mount(line)
430
431    def compose(self) -> ComposeResult:
432        """Draws the app."""
433        def metric_event_tree() -> Tree:
434            """Create tree of PMUs and metricgroups with events or metrics under."""
435            tree: Tree[TreeValue] = Tree("Root", id="root")
436            pmus = tree.root.add("PMUs")
437            for pmu in perf.pmus():
438                pmu_name = pmu.name().lower()
439                pmu_node = pmus.add(pmu_name)
440                try:
441                    for event in sorted(pmu.events(), key=lambda x: x["name"]):
442                        if "name" in event:
443                            e = event["name"].lower()
444                            if "alias" in event:
445                                pmu_node.add_leaf(f'{e} ({event["alias"]})',
446                                                  data=PmuEvent(pmu_name, e))
447                            else:
448                                pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e))
449                except:
450                    # Reading events may fail with EPERM, ignore.
451                    pass
452            metrics = tree.root.add("Metrics")
453            groups = set()
454            for metric in perf.metrics():
455                groups.update(metric["MetricGroup"])
456
457            def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str):
458                for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]):
459                    if parent in metric["MetricGroup"]:
460                        name = metric["MetricName"]
461                        node.add_leaf(name, data=Metric(name))
462                        child_group_name = f'{name}_group'
463                        if child_group_name in groups:
464                            add_metrics_to_tree(node.add(child_group_name), child_group_name)
465
466            for group in sorted(groups):
467                if group.endswith('_group'):
468                    continue
469                add_metrics_to_tree(metrics.add(group), group)
470
471            tree.root.expand()
472            return tree
473
474        yield Header(id="header")
475        yield Horizontal(Vertical(metric_event_tree(), id="events"),
476                         Vertical(Label("event name", id="event_name"),
477                                  Static("description", markup=False, id="event_description"),
478                                  ))
479        yield Label(id="active_search")
480        yield VerticalScroll(id="lines")
481        yield Footer(id="footer")
482
483    @on(Tree.NodeSelected)
484    def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None:
485        """Called when a tree node is selected, selecting the event."""
486        if event.node.data:
487            self.set_selected(event.node.data)
488
489
490if __name__ == "__main__":
491    ap = argparse.ArgumentParser()
492    ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1)
493    args = ap.parse_args()
494    app = IListApp(float(args.interval))
495    app.run()
496