xref: /linux/tools/verification/rvgen/rvgen/automata.py (revision 0fc8f6200d2313278fbf4539bbab74677c685531)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0-only
3#
4# Copyright (C) 2019-2022 Red Hat, Inc. Daniel Bristot de Oliveira <bristot@kernel.org>
5#
6# Automata class: parse an automaton in dot file digraph format into a python object
7#
8# For further information, see:
9#   Documentation/trace/rv/deterministic_automata.rst
10
11import ntpath
12import re
13from typing import Iterator
14from itertools import islice
15
16class _ConstraintKey:
17    """Base class for constraint keys."""
18
19class _StateConstraintKey(_ConstraintKey, int):
20    """Key for a state constraint. Under the hood just state_id."""
21    def __new__(cls, state_id: int):
22        return super().__new__(cls, state_id)
23
24class _EventConstraintKey(_ConstraintKey, tuple):
25    """Key for an event constraint. Under the hood just tuple(state_id,event_id)."""
26    def __new__(cls, state_id: int, event_id: int):
27        return super().__new__(cls, (state_id, event_id))
28
29class AutomataError(Exception):
30    """Exception raised for errors in automata parsing and validation.
31
32    Raised when DOT file processing fails due to invalid format, I/O errors,
33    or malformed automaton definitions.
34    """
35
36class Automata:
37    """Automata class: Reads a dot file and parses it as an automaton.
38
39    It supports both deterministic and hybrid automata.
40
41    Attributes:
42        dot_file: A dot file with an state_automaton definition.
43    """
44
45    invalid_state_str = "INVALID_STATE"
46    init_marker = "__init_"
47    node_marker = "{node"
48    # val can be numerical, uppercase (constant or macro), lowercase (parameter or function)
49    # only numerical values should have units
50    constraint_rule = re.compile(r"""
51        ^
52        (?P<env>[a-zA-Z_][a-zA-Z0-9_]+)  # C-like identifier for the env var
53        (?P<op>[!<=>]{1,2})              # operator
54        (?P<val>
55            [0-9]+ |                     # numerical value
56            [A-Z_]+\(\) |                # macro
57            [A-Z_]+ |                    # constant
58            [a-z_]+\(\) |                # function
59            [a-z_]+                      # parameter
60        )
61        (?P<unit>[a-z]{1,2})?            # optional unit for numerical values
62        """, re.VERBOSE)
63    constraint_reset = re.compile(r"^reset\((?P<env>[a-zA-Z_][a-zA-Z0-9_]+)\)")
64
65    def __init__(self, file_path, model_name=None):
66        self.__dot_path = file_path
67        self.name = model_name or self.__get_model_name()
68        self.__dot_lines = self.__open_dot()
69        self.states, self.initial_state, self.final_states = self.__get_state_variables()
70        self.env_types = {}
71        self.env_stored = set()
72        self.constraint_vars = set()
73        self.self_loop_reset_events = set()
74        self.events, self.envs = self.__get_event_variables()
75        self.function, self.constraints = self.__create_matrix()
76        self.events_start, self.events_start_run = self.__store_init_events()
77        self.env_stored = sorted(self.env_stored)
78        self.constraint_vars = sorted(self.constraint_vars)
79        self.self_loop_reset_events = sorted(self.self_loop_reset_events)
80
81    def __get_model_name(self) -> str:
82        basename = ntpath.basename(self.__dot_path)
83        if not basename.endswith(".dot") and not basename.endswith(".gv"):
84            print("not a dot file")
85            raise AutomataError(f"not a dot file: {self.__dot_path}")
86
87        model_name = ntpath.splitext(basename)[0]
88        if not model_name:
89            raise AutomataError(f"not a dot file: {self.__dot_path}")
90
91        return model_name
92
93    def __open_dot(self) -> list[str]:
94        dot_lines = []
95        try:
96            with open(self.__dot_path) as dot_file:
97                dot_lines = dot_file.readlines()
98        except OSError as exc:
99            raise AutomataError(exc.strerror) from exc
100
101        if not dot_lines:
102            raise AutomataError(f"{self.__dot_path} is empty")
103
104        # checking the first line:
105        line = dot_lines[0].split()
106
107        if len(line) < 2 or line[0] != "digraph" or line[1] != "state_automaton":
108            raise AutomataError(f"Not a valid .dot format: {self.__dot_path}")
109
110        return dot_lines
111
112    def __get_cursor_begin_states(self) -> int:
113        for cursor, line in enumerate(self.__dot_lines):
114            split_line = line.split()
115
116            if len(split_line) and split_line[0] == self.node_marker:
117                return cursor
118
119        raise AutomataError("Could not find a beginning state")
120
121    def __get_cursor_begin_events(self) -> int:
122        state = 0
123        cursor = 0 # make pyright happy
124
125        for cursor, line in enumerate(self.__dot_lines):
126            line = line.split()
127            if not line:
128                continue
129
130            if state == 0:
131                if line[0] == self.node_marker:
132                    state = 1
133            elif line[0] != self.node_marker:
134                break
135        else:
136            raise AutomataError("Could not find beginning event")
137
138        cursor += 1 # skip initial state transition
139        if cursor == len(self.__dot_lines):
140            raise AutomataError("Dot file ended after event beginning")
141
142        return cursor
143
144    def __get_state_variables(self) -> tuple[list[str], str, list[str]]:
145        # wait for node declaration
146        states = []
147        final_states = []
148        initial_state = ""
149
150        has_final_states = False
151        cursor = self.__get_cursor_begin_states()
152
153        # process nodes
154        for line in islice(self.__dot_lines, cursor, None):
155            split_line = line.split()
156            if not split_line or split_line[0] != self.node_marker:
157                break
158
159            raw_state = split_line[-1]
160
161            #  "enabled_fired"}; -> enabled_fired
162            state = raw_state.replace('"', '').replace('};', '').replace(',', '_')
163            if state.startswith(self.init_marker):
164                initial_state = state[len(self.init_marker):]
165            else:
166                states.append(state)
167                if "doublecircle" in line:
168                    final_states.append(state)
169                    has_final_states = True
170
171                if "ellipse" in line:
172                    final_states.append(state)
173                    has_final_states = True
174
175        if not initial_state:
176            raise AutomataError("The automaton doesn't have an initial state")
177
178        states = sorted(set(states))
179        states.remove(initial_state)
180
181        # Insert the initial state at the beginning of the states
182        states.insert(0, initial_state)
183
184        if not has_final_states:
185            final_states.append(initial_state)
186
187        return states, initial_state, final_states
188
189    def __get_event_variables(self) -> tuple[list[str], list[str]]:
190        events: list[str] = []
191        envs: list[str] = []
192        # here we are at the begin of transitions, take a note, we will return later.
193        cursor = self.__get_cursor_begin_events()
194
195        for line in map(str.lstrip, islice(self.__dot_lines, cursor, None)):
196            if not line.startswith('"'):
197                break
198
199            # transitions have the format:
200            # "all_fired" -> "both_fired" [ label = "disable_irq" ];
201            #  ------------ event is here ------------^^^^^
202            split_line = line.split()
203            if len(split_line) > 1 and split_line[1] == "->":
204                event = "".join(split_line[split_line.index("label") + 2:-1]).replace('"', '')
205
206                # when a transition has more than one label, they are like this
207                # "local_irq_enable\nhw_local_irq_enable_n"
208                # so split them.
209
210                for i in event.split("\\n"):
211                    # if the event contains a constraint (hybrid automata),
212                    # it will be separated by a ";":
213                    # "sched_switch;x<1000;reset(x)"
214                    ev, *constr = i.split(";")
215                    if constr:
216                        if len(constr) > 2:
217                            raise AutomataError("Only 1 constraint and 1 reset are supported")
218                        envs += self.__extract_env_var(constr)
219                    events.append(ev)
220            else:
221                # state labels have the format:
222                # "enable_fired" [label = "enable_fired\ncondition"];
223                #  ----- label is here -----^^^^^
224                # label and node name must be the same, condition is optional
225                state = line.split("label")[1].split('"')[1]
226                _, *constr = state.split("\\n")
227                if constr:
228                    if len(constr) > 1:
229                        raise AutomataError("Only 1 constraint is supported in the state")
230                    envs += self.__extract_env_var([constr[0].replace(" ", "")])
231
232        return sorted(set(events)), sorted(set(envs))
233
234    def _split_constraint_expr(self, constr: list[str]) -> Iterator[tuple[str,
235                                                                          str | None]]:
236        """
237        Get a list of strings of the type constr1 && constr2 and returns a list of
238        constraints and separators: [[constr1,"&&"],[constr2,None]]
239        """
240        exprs = []
241        seps = []
242        for c in constr:
243            while "&&" in c or "||" in c:
244                a = c.find("&&")
245                o = c.find("||")
246                pos = a if o < 0 or 0 < a < o else o
247                exprs.append(c[:pos].replace(" ", ""))
248                seps.append(c[pos:pos + 2].replace(" ", ""))
249                c = c[pos + 2:].replace(" ", "")
250            exprs.append(c)
251            seps.append(None)
252        return zip(exprs, seps)
253
254    def __extract_env_var(self, constraint: list[str]) -> list[str]:
255        env = []
256        for c, _ in self._split_constraint_expr(constraint):
257            rule = self.constraint_rule.search(c)
258            reset = self.constraint_reset.search(c)
259            if rule:
260                env.append(rule["env"])
261                if rule.groupdict().get("unit"):
262                    self.env_types[rule["env"]] = rule["unit"]
263                if rule["val"][0].isalpha():
264                    self.constraint_vars.add(rule["val"])
265                # try to infer unit from constants or parameters
266                val_for_unit = rule["val"].lower().replace("()", "")
267                if val_for_unit.endswith("_ns"):
268                    self.env_types[rule["env"]] = "ns"
269                if val_for_unit.endswith("_jiffies"):
270                    self.env_types[rule["env"]] = "j"
271            if reset:
272                env.append(reset["env"])
273                # environment variables that are reset need a storage
274                self.env_stored.add(reset["env"])
275        return env
276
277    def __create_matrix(self) -> tuple[list[list[str]], dict[_ConstraintKey, list[str]]]:
278        # transform the array into a dictionary
279        events = self.events
280        states = self.states
281        events_dict = {}
282        states_dict = {}
283        nr_event = 0
284        for event in events:
285            events_dict[event] = nr_event
286            nr_event += 1
287
288        nr_state = 0
289        for state in states:
290            states_dict[state] = nr_state
291            nr_state += 1
292
293        # declare the matrix....
294        matrix = [[self.invalid_state_str for _ in range(nr_event)] for _ in range(nr_state)]
295        constraints: dict[_ConstraintKey, list[str]] = {}
296
297        # and we are back! Let's fill the matrix
298        cursor = self.__get_cursor_begin_events()
299
300        for line in map(str.lstrip,
301                        islice(self.__dot_lines, cursor, None)):
302
303            if not line or line[0] != '"':
304                break
305
306            split_line = line.split()
307
308            if len(split_line) > 2 and split_line[1] == "->":
309                origin_state = split_line[0].replace('"', '').replace(',', '_')
310                dest_state = split_line[2].replace('"', '').replace(',', '_')
311                possible_events = "".join(split_line[split_line.index("label") + 2:-1]).replace('"', '')
312                for event in possible_events.split("\\n"):
313                    event, *constr = event.split(";")
314                    if constr:
315                        key = _EventConstraintKey(states_dict[origin_state], events_dict[event])
316                        constraints[key] = constr
317                        # those events reset also on self loops
318                        if origin_state == dest_state and "reset" in "".join(constr):
319                            self.self_loop_reset_events.add(event)
320                    matrix[states_dict[origin_state]][events_dict[event]] = dest_state
321            else:
322                state = line.split("label")[1].split('"')[1]
323                state, *constr = state.replace(" ", "").split("\\n")
324                if constr:
325                    constraints[_StateConstraintKey(states_dict[state])] = constr
326
327        return matrix, constraints
328
329    def __store_init_events(self) -> tuple[list[bool], list[bool]]:
330        events_start = [False] * len(self.events)
331        events_start_run = [False] * len(self.events)
332        for i in range(len(self.events)):
333            curr_event_will_init = 0
334            curr_event_from_init = False
335            curr_event_used = 0
336            for j in range(len(self.states)):
337                if self.function[j][i] != self.invalid_state_str:
338                    curr_event_used += 1
339                if self.function[j][i] == self.initial_state:
340                    curr_event_will_init += 1
341            if self.function[0][i] != self.invalid_state_str:
342                curr_event_from_init = True
343            # this event always leads to init
344            if curr_event_will_init and curr_event_used == curr_event_will_init:
345                events_start[i] = True
346            # this event is only called from init
347            if curr_event_from_init and curr_event_used == 1:
348                events_start_run[i] = True
349        return events_start, events_start_run
350
351    def is_start_event(self, event: str) -> bool:
352        return self.events_start[self.events.index(event)]
353
354    def is_start_run_event(self, event: str) -> bool:
355        # prefer handle_start_event if there
356        if any(self.events_start):
357            return False
358        return self.events_start_run[self.events.index(event)]
359
360    def is_hybrid_automata(self) -> bool:
361        return bool(self.envs)
362
363    def is_event_constraint(self, key: _ConstraintKey) -> bool:
364        """
365        Given the key in self.constraints return true if it is an event
366        constraint, false if it is a state constraint
367        """
368        return isinstance(key, _EventConstraintKey)
369