xref: /linux/tools/perf/python/ilist.py (revision 9e906a9dead17d81d6c2687f65e159231d0e3286)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
3"""Interactive perf list."""
4
5from abc import ABC, abstractmethod
6import argparse
7from dataclasses import dataclass
8import math
9from typing import Any, Dict, Optional, Tuple
10import perf
11from textual import on
12from textual.app import App, ComposeResult
13from textual.binding import Binding
14from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll
15from textual.css.query import NoMatches
16from textual.command import SearchIcon
17from textual.screen import ModalScreen
18from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree
19from textual.widgets.tree import TreeNode
20
21
22def get_info(info: Dict[str, str], key: str):
23    return (info[key] + "\n") if key in info else ""
24
25
26class TreeValue(ABC):
27    """Abstraction for the data of value in the tree."""
28
29    @abstractmethod
30    def name(self) -> str:
31        pass
32
33    @abstractmethod
34    def description(self) -> str:
35        pass
36
37    @abstractmethod
38    def matches(self, query: str) -> bool:
39        pass
40
41    @abstractmethod
42    def parse(self) -> perf.evlist:
43        pass
44
45    @abstractmethod
46    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
47        pass
48
49
50@dataclass
51class Metric(TreeValue):
52    """A metric in the tree."""
53    metric_name: str
54    metric_pmu: str
55
56    def name(self) -> str:
57        return self.metric_name
58
59    def description(self) -> str:
60        """Find and format metric description."""
61        for metric in perf.metrics():
62            if metric["MetricName"] != self.metric_name:
63                continue
64            if self.metric_pmu and metric["PMU"] != self.metric_pmu:
65                continue
66            desc = get_info(metric, "BriefDescription")
67            desc += get_info(metric, "PublicDescription")
68            desc += get_info(metric, "MetricExpr")
69            desc += get_info(metric, "MetricThreshold")
70            return desc
71        return "description"
72
73    def matches(self, query: str) -> bool:
74        return query in self.metric_name
75
76    def parse(self) -> perf.evlist:
77        return perf.parse_metrics(self.metric_name, self.metric_pmu)
78
79    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
80        try:
81            val = evlist.compute_metric(self.metric_name, cpu, thread)
82            return 0 if math.isnan(val) else val
83        except:
84            # Be tolerant of failures to compute metrics on particular CPUs/threads.
85            return 0
86
87
88@dataclass
89class PmuEvent(TreeValue):
90    """A PMU and event within the tree."""
91    pmu: str
92    event: str
93
94    def name(self) -> str:
95        if self.event.startswith(self.pmu) or ':' in self.event:
96            return self.event
97        else:
98            return f"{self.pmu}/{self.event}/"
99
100    def description(self) -> str:
101        """Find and format event description for {pmu}/{event}/."""
102        for p in perf.pmus():
103            if p.name() != self.pmu:
104                continue
105            for info in p.events():
106                if "name" not in info or info["name"] != self.event:
107                    continue
108
109                desc = get_info(info, "topic")
110                desc += get_info(info, "event_type_desc")
111                desc += get_info(info, "desc")
112                desc += get_info(info, "long_desc")
113                desc += get_info(info, "encoding_desc")
114                return desc
115        return "description"
116
117    def matches(self, query: str) -> bool:
118        return query in self.pmu or query in self.event
119
120    def parse(self) -> perf.evlist:
121        return perf.parse_events(self.name())
122
123    def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
124        return evsel.read(cpu, thread).val
125
126
127class ErrorScreen(ModalScreen[bool]):
128    """Pop up dialog for errors."""
129
130    CSS = """
131    ErrorScreen {
132        align: center middle;
133    }
134    """
135
136    def __init__(self, error: str):
137        self.error = error
138        super().__init__()
139
140    def compose(self) -> ComposeResult:
141        yield Button(f"Error: {self.error}", variant="primary", id="error")
142
143    def on_button_pressed(self, event: Button.Pressed) -> None:
144        self.dismiss(True)
145
146
147class SearchScreen(ModalScreen[str]):
148    """Pop up dialog for search."""
149
150    CSS = """
151    SearchScreen Horizontal {
152        align: center middle;
153        margin-top: 1;
154    }
155    SearchScreen Input {
156        width: 1fr;
157    }
158    """
159
160    def compose(self) -> ComposeResult:
161        yield Horizontal(SearchIcon(), Input(placeholder="Event name"))
162
163    def on_input_submitted(self, event: Input.Submitted) -> None:
164        """Handle the user pressing Enter in the input field."""
165        self.dismiss(event.value)
166
167
168class Counter(HorizontalGroup):
169    """Two labels for a CPU and its counter value."""
170
171    CSS = """
172    Label {
173        gutter: 1;
174    }
175    """
176
177    def __init__(self, cpu: int) -> None:
178        self.cpu = cpu
179        super().__init__()
180
181    def compose(self) -> ComposeResult:
182        label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
183        yield Label(label + " ")
184        yield Label("0", id=f"counter_{label}")
185
186
187class CounterSparkline(HorizontalGroup):
188    """A Sparkline for a performance counter."""
189
190    def __init__(self, cpu: int) -> None:
191        self.cpu = cpu
192        super().__init__()
193
194    def compose(self) -> ComposeResult:
195        label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
196        yield Label(label)
197        yield Sparkline([], summary_function=max, id=f"sparkline_{label}")
198
199
200class IListApp(App):
201    TITLE = "Interactive Perf List"
202
203    BINDINGS = [
204        Binding(key="s", action="search", description="Search",
205                tooltip="Search events and PMUs"),
206        Binding(key="n", action="next", description="Next",
207                tooltip="Next search result or item"),
208        Binding(key="p", action="prev", description="Previous",
209                tooltip="Previous search result or item"),
210        Binding(key="c", action="collapse", description="Collapse",
211                tooltip="Collapse the current PMU"),
212        Binding(key="^q", action="quit", description="Quit",
213                tooltip="Quit the app"),
214    ]
215
216    CSS = """
217        /* Make the 'total' sparkline a different color. */
218        #sparkline_total > .sparkline--min-color {
219            color: $accent;
220        }
221        #sparkline_total > .sparkline--max-color {
222            color: $accent 30%;
223        }
224        /*
225         * Make the active_search initially not displayed with the text in
226         * the middle of the line.
227         */
228        #active_search {
229            display: none;
230            width: 100%;
231            text-align: center;
232        }
233    """
234
235    def __init__(self, interval: float) -> None:
236        self.interval = interval
237        self.evlist = None
238        self.selected: Optional[TreeValue] = None
239        self.search_results: list[TreeNode[TreeValue]] = []
240        self.cur_search_result: TreeNode[TreeValue] | None = None
241        super().__init__()
242
243    def expand_and_select(self, node: TreeNode[Any]) -> None:
244        """Expand select a node in the tree."""
245        if node.parent:
246            node.parent.expand()
247            if node.parent.parent:
248                node.parent.parent.expand()
249        node.expand()
250        node.tree.select_node(node)
251        node.tree.scroll_to_node(node)
252
253    def set_searched_tree_node(self, previous: bool) -> None:
254        """Set the cur_search_result node to either the next or previous."""
255        l = len(self.search_results)
256
257        if l < 1:
258            tree: Tree[TreeValue] = self.query_one("#root", Tree)
259            if previous:
260                tree.action_cursor_up()
261            else:
262                tree.action_cursor_down()
263            return
264
265        if self.cur_search_result:
266            idx = self.search_results.index(self.cur_search_result)
267            if previous:
268                idx = idx - 1 if idx > 0 else l - 1
269            else:
270                idx = idx + 1 if idx < l - 1 else 0
271        else:
272            idx = l - 1 if previous else 0
273
274        node = self.search_results[idx]
275        if node == self.cur_search_result:
276            return
277
278        self.cur_search_result = node
279        self.expand_and_select(node)
280
281    def action_search(self) -> None:
282        """Search was chosen."""
283        def set_initial_focus(event: str | None) -> None:
284            """Sets the focus after the SearchScreen is dismissed."""
285
286            search_label = self.query_one("#active_search", Label)
287            search_label.display = True if event else False
288            if not event:
289                return
290            event = event.lower()
291            search_label.update(f'Searching for events matching "{event}"')
292
293            tree: Tree[str] = self.query_one("#root", Tree)
294
295            def find_search_results(event: str, node: TreeNode[str],
296                                    cursor_seen: bool = False,
297                                    match_after_cursor: Optional[TreeNode[str]] = None
298                                    ) -> Tuple[bool, Optional[TreeNode[str]]]:
299                """Find nodes that match the search remembering the one after the cursor."""
300                if not cursor_seen and node == tree.cursor_node:
301                    cursor_seen = True
302                if node.data and node.data.matches(event):
303                    if cursor_seen and not match_after_cursor:
304                        match_after_cursor = node
305                    self.search_results.append(node)
306
307                if node.children:
308                    for child in node.children:
309                        (cursor_seen, match_after_cursor) = \
310                            find_search_results(event, child, cursor_seen, match_after_cursor)
311                return (cursor_seen, match_after_cursor)
312
313            self.search_results.clear()
314            (_, self.cur_search_result) = find_search_results(event, tree.root)
315            if len(self.search_results) < 1:
316                self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}"))
317                search_label.display = False
318            elif self.cur_search_result:
319                self.expand_and_select(self.cur_search_result)
320            else:
321                self.set_searched_tree_node(previous=False)
322
323        self.push_screen(SearchScreen(), set_initial_focus)
324
325    def action_next(self) -> None:
326        """Next was chosen."""
327        self.set_searched_tree_node(previous=False)
328
329    def action_prev(self) -> None:
330        """Previous was chosen."""
331        self.set_searched_tree_node(previous=True)
332
333    def action_collapse(self) -> None:
334        """Collapse the part of the tree currently on."""
335        tree: Tree[str] = self.query_one("#root", Tree)
336        node = tree.cursor_node
337        if node and node.parent:
338            node.parent.collapse_all()
339            node.tree.scroll_to_node(node.parent)
340
341    def update_counts(self) -> None:
342        """Called every interval to update counts."""
343        if not self.selected or not self.evlist:
344            return
345
346        def update_count(cpu: int, count: int):
347            # Update the raw count display.
348            counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total")
349            if not counter:
350                return
351            counter = counter.first(Label)
352            counter.update(str(count))
353
354            # Update the sparkline.
355            line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total")
356            if not line:
357                return
358            line = line.first(Sparkline)
359            # If there are more events than the width, remove the front event.
360            if len(line.data) > line.size.width:
361                line.data.pop(0)
362            line.data.append(count)
363            line.mutate_reactive(Sparkline.data)
364
365        # Update the total and each CPU counts, assume there's just 1 evsel.
366        total = 0
367        self.evlist.disable()
368        for evsel in self.evlist:
369            for cpu in evsel.cpus():
370                aggr = 0
371                for thread in evsel.threads():
372                    aggr += self.selected.value(self.evlist, evsel, cpu, thread)
373                update_count(cpu, aggr)
374                total += aggr
375        update_count(-1, total)
376        self.evlist.enable()
377
378    def on_mount(self) -> None:
379        """When App starts set up periodic event updating."""
380        self.update_counts()
381        self.set_interval(self.interval, self.update_counts)
382
383    def set_selected(self, value: TreeValue) -> None:
384        """Updates the event/description and starts the counters."""
385        try:
386            label_name = self.query_one("#event_name", Label)
387            event_description = self.query_one("#event_description", Static)
388            lines = self.query_one("#lines")
389        except NoMatches:
390            # A race with rendering, ignore the update as we can't
391            # mount the assumed output widgets.
392            return
393
394        self.selected = value
395
396        # Remove previous event information.
397        if self.evlist:
398            self.evlist.disable()
399            self.evlist.close()
400            old_lines = self.query(CounterSparkline)
401            for line in old_lines:
402                line.remove()
403            old_counters = self.query(Counter)
404            for counter in old_counters:
405                counter.remove()
406
407        # Update event/metric text and description.
408        label_name.update(value.name())
409        event_description.update(value.description())
410
411        # Open the event.
412        try:
413            self.evlist = value.parse()
414            if self.evlist:
415                self.evlist.open()
416                self.evlist.enable()
417        except:
418            self.evlist = None
419
420        if not self.evlist:
421            self.push_screen(ErrorScreen(f"Failed to open {value.name()}"))
422            return
423
424        # Add spark lines for all the CPUs. Note, must be done after
425        # open so that the evlist CPUs have been computed by propagate
426        # maps.
427        line = CounterSparkline(cpu=-1)
428        lines.mount(line)
429        for cpu in self.evlist.all_cpus():
430            line = CounterSparkline(cpu)
431            lines.mount(line)
432        line = Counter(cpu=-1)
433        lines.mount(line)
434        for cpu in self.evlist.all_cpus():
435            line = Counter(cpu)
436            lines.mount(line)
437
438    def compose(self) -> ComposeResult:
439        """Draws the app."""
440        def metric_event_tree() -> Tree:
441            """Create tree of PMUs and metricgroups with events or metrics under."""
442            tree: Tree[TreeValue] = Tree("Root", id="root")
443            pmus = tree.root.add("PMUs")
444            for pmu in perf.pmus():
445                pmu_name = pmu.name().lower()
446                pmu_node = pmus.add(pmu_name)
447                try:
448                    for event in sorted(pmu.events(), key=lambda x: x["name"]):
449                        if "deprecated" in event:
450                            continue
451                        if "name" in event:
452                            e = event["name"].lower()
453                            if "alias" in event:
454                                pmu_node.add_leaf(f'{e} ({event["alias"]})',
455                                                  data=PmuEvent(pmu_name, e))
456                            else:
457                                pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e))
458                except:
459                    # Reading events may fail with EPERM, ignore.
460                    pass
461            metrics = tree.root.add("Metrics")
462            groups = set()
463            for metric in perf.metrics():
464                groups.update(metric["MetricGroup"])
465
466            def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str, pmu: str = None):
467                for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]):
468                    metric_pmu = metric.get('PMU')
469                    if pmu and metric_pmu and metric_pmu != pmu:
470                        continue
471                    if parent in metric["MetricGroup"]:
472                        name = metric["MetricName"]
473                        display_name = name
474                        if metric_pmu:
475                            display_name += f" ({metric_pmu})"
476                        node.add_leaf(display_name, data=Metric(name, metric_pmu))
477                        child_group_name = f'{name}_group'
478                        if child_group_name in groups:
479                            display_child_group_name = child_group_name
480                            if metric_pmu:
481                                display_child_group_name += f" ({metric_pmu})"
482                            add_metrics_to_tree(node.add(display_child_group_name),
483                                                child_group_name,
484                                                metric_pmu)
485
486            for group in sorted(groups):
487                if group.endswith('_group'):
488                    continue
489                add_metrics_to_tree(metrics.add(group), group)
490
491            tree.root.expand()
492            return tree
493
494        yield Header(id="header")
495        yield Horizontal(Vertical(metric_event_tree(), id="events"),
496                         Vertical(Label("event name", id="event_name"),
497                                  Static("description", markup=False, id="event_description"),
498                                  ))
499        yield Label(id="active_search")
500        yield VerticalScroll(id="lines")
501        yield Footer(id="footer")
502
503    @on(Tree.NodeSelected)
504    def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None:
505        """Called when a tree node is selected, selecting the event."""
506        if event.node.data:
507            self.set_selected(event.node.data)
508
509
510if __name__ == "__main__":
511    ap = argparse.ArgumentParser()
512    ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1)
513    args = ap.parse_args()
514    app = IListApp(float(args.interval))
515    app.run()
516