1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0-only 3# 4# Copyright (C) 2019-2022 Red Hat, Inc. Daniel Bristot de Oliveira <bristot@kernel.org> 5# 6# Automata class: parse an automaton in dot file digraph format into a python object 7# 8# For further information, see: 9# Documentation/trace/rv/deterministic_automata.rst 10 11import ntpath 12import re 13from typing import Iterator 14from itertools import islice 15 16class _ConstraintKey: 17 """Base class for constraint keys.""" 18 19class _StateConstraintKey(_ConstraintKey, int): 20 """Key for a state constraint. Under the hood just state_id.""" 21 def __new__(cls, state_id: int): 22 return super().__new__(cls, state_id) 23 24class _EventConstraintKey(_ConstraintKey, tuple): 25 """Key for an event constraint. Under the hood just tuple(state_id,event_id).""" 26 def __new__(cls, state_id: int, event_id: int): 27 return super().__new__(cls, (state_id, event_id)) 28 29class AutomataError(Exception): 30 """Exception raised for errors in automata parsing and validation. 31 32 Raised when DOT file processing fails due to invalid format, I/O errors, 33 or malformed automaton definitions. 34 """ 35 36class Automata: 37 """Automata class: Reads a dot file and parses it as an automaton. 38 39 It supports both deterministic and hybrid automata. 40 41 Attributes: 42 dot_file: A dot file with an state_automaton definition. 43 """ 44 45 invalid_state_str = "INVALID_STATE" 46 init_marker = "__init_" 47 node_marker = "{node" 48 # val can be numerical, uppercase (constant or macro), lowercase (parameter or function) 49 # only numerical values should have units 50 constraint_rule = re.compile(r""" 51 ^ 52 (?P<env>[a-zA-Z_][a-zA-Z0-9_]+) # C-like identifier for the env var 53 (?P<op>[!<=>]{1,2}) # operator 54 (?P<val> 55 [0-9]+ | # numerical value 56 [A-Z_]+\(\) | # macro 57 [A-Z_]+ | # constant 58 [a-z_]+\(\) | # function 59 [a-z_]+ # parameter 60 ) 61 (?P<unit>[a-z]{1,2})? # optional unit for numerical values 62 """, re.VERBOSE) 63 constraint_reset = re.compile(r"^reset\((?P<env>[a-zA-Z_][a-zA-Z0-9_]+)\)") 64 65 def __init__(self, file_path, model_name=None): 66 self.__dot_path = file_path 67 self.name = model_name or self.__get_model_name() 68 self.__dot_lines = self.__open_dot() 69 self.states, self.initial_state, self.final_states = self.__get_state_variables() 70 self.env_types = {} 71 self.env_stored = set() 72 self.constraint_vars = set() 73 self.self_loop_reset_events = set() 74 self.events, self.envs = self.__get_event_variables() 75 self.function, self.constraints = self.__create_matrix() 76 self.events_start, self.events_start_run = self.__store_init_events() 77 self.env_stored = sorted(self.env_stored) 78 self.constraint_vars = sorted(self.constraint_vars) 79 self.self_loop_reset_events = sorted(self.self_loop_reset_events) 80 81 def __get_model_name(self) -> str: 82 basename = ntpath.basename(self.__dot_path) 83 if not basename.endswith(".dot") and not basename.endswith(".gv"): 84 print("not a dot file") 85 raise AutomataError(f"not a dot file: {self.__dot_path}") 86 87 model_name = ntpath.splitext(basename)[0] 88 if not model_name: 89 raise AutomataError(f"not a dot file: {self.__dot_path}") 90 91 return model_name 92 93 def __open_dot(self) -> list[str]: 94 dot_lines = [] 95 try: 96 with open(self.__dot_path) as dot_file: 97 dot_lines = dot_file.readlines() 98 except OSError as exc: 99 raise AutomataError(exc.strerror) from exc 100 101 if not dot_lines: 102 raise AutomataError(f"{self.__dot_path} is empty") 103 104 # checking the first line: 105 line = dot_lines[0].split() 106 107 if len(line) < 2 or line[0] != "digraph" or line[1] != "state_automaton": 108 raise AutomataError(f"Not a valid .dot format: {self.__dot_path}") 109 110 return dot_lines 111 112 def __get_cursor_begin_states(self) -> int: 113 for cursor, line in enumerate(self.__dot_lines): 114 split_line = line.split() 115 116 if len(split_line) and split_line[0] == self.node_marker: 117 return cursor 118 119 raise AutomataError("Could not find a beginning state") 120 121 def __get_cursor_begin_events(self) -> int: 122 state = 0 123 cursor = 0 # make pyright happy 124 125 for cursor, line in enumerate(self.__dot_lines): 126 line = line.split() 127 if not line: 128 continue 129 130 if state == 0: 131 if line[0] == self.node_marker: 132 state = 1 133 elif line[0] != self.node_marker: 134 break 135 else: 136 raise AutomataError("Could not find beginning event") 137 138 cursor += 1 # skip initial state transition 139 if cursor == len(self.__dot_lines): 140 raise AutomataError("Dot file ended after event beginning") 141 142 return cursor 143 144 def __get_state_variables(self) -> tuple[list[str], str, list[str]]: 145 # wait for node declaration 146 states = [] 147 final_states = [] 148 initial_state = "" 149 150 has_final_states = False 151 cursor = self.__get_cursor_begin_states() 152 153 # process nodes 154 for line in islice(self.__dot_lines, cursor, None): 155 split_line = line.split() 156 if not split_line or split_line[0] != self.node_marker: 157 break 158 159 raw_state = split_line[-1] 160 161 # "enabled_fired"}; -> enabled_fired 162 state = raw_state.replace('"', '').replace('};', '').replace(',', '_') 163 if state.startswith(self.init_marker): 164 initial_state = state[len(self.init_marker):] 165 else: 166 states.append(state) 167 if "doublecircle" in line: 168 final_states.append(state) 169 has_final_states = True 170 171 if "ellipse" in line: 172 final_states.append(state) 173 has_final_states = True 174 175 if not initial_state: 176 raise AutomataError("The automaton doesn't have an initial state") 177 178 states = sorted(set(states)) 179 states.remove(initial_state) 180 181 # Insert the initial state at the beginning of the states 182 states.insert(0, initial_state) 183 184 if not has_final_states: 185 final_states.append(initial_state) 186 187 return states, initial_state, final_states 188 189 def __get_event_variables(self) -> tuple[list[str], list[str]]: 190 events: list[str] = [] 191 envs: list[str] = [] 192 # here we are at the begin of transitions, take a note, we will return later. 193 cursor = self.__get_cursor_begin_events() 194 195 for line in map(str.lstrip, islice(self.__dot_lines, cursor, None)): 196 if not line.startswith('"'): 197 break 198 199 # transitions have the format: 200 # "all_fired" -> "both_fired" [ label = "disable_irq" ]; 201 # ------------ event is here ------------^^^^^ 202 split_line = line.split() 203 if len(split_line) > 1 and split_line[1] == "->": 204 event = "".join(split_line[split_line.index("label") + 2:-1]).replace('"', '') 205 206 # when a transition has more than one label, they are like this 207 # "local_irq_enable\nhw_local_irq_enable_n" 208 # so split them. 209 210 for i in event.split("\\n"): 211 # if the event contains a constraint (hybrid automata), 212 # it will be separated by a ";": 213 # "sched_switch;x<1000;reset(x)" 214 ev, *constr = i.split(";") 215 if constr: 216 if len(constr) > 2: 217 raise AutomataError("Only 1 constraint and 1 reset are supported") 218 envs += self.__extract_env_var(constr) 219 events.append(ev) 220 else: 221 # state labels have the format: 222 # "enable_fired" [label = "enable_fired\ncondition"]; 223 # ----- label is here -----^^^^^ 224 # label and node name must be the same, condition is optional 225 state = line.split("label")[1].split('"')[1] 226 _, *constr = state.split("\\n") 227 if constr: 228 if len(constr) > 1: 229 raise AutomataError("Only 1 constraint is supported in the state") 230 envs += self.__extract_env_var([constr[0].replace(" ", "")]) 231 232 return sorted(set(events)), sorted(set(envs)) 233 234 def _split_constraint_expr(self, constr: list[str]) -> Iterator[tuple[str, 235 str | None]]: 236 """ 237 Get a list of strings of the type constr1 && constr2 and returns a list of 238 constraints and separators: [[constr1,"&&"],[constr2,None]] 239 """ 240 exprs = [] 241 seps = [] 242 for c in constr: 243 while "&&" in c or "||" in c: 244 a = c.find("&&") 245 o = c.find("||") 246 pos = a if o < 0 or 0 < a < o else o 247 exprs.append(c[:pos].replace(" ", "")) 248 seps.append(c[pos:pos + 2].replace(" ", "")) 249 c = c[pos + 2:].replace(" ", "") 250 exprs.append(c) 251 seps.append(None) 252 return zip(exprs, seps) 253 254 def __extract_env_var(self, constraint: list[str]) -> list[str]: 255 env = [] 256 for c, _ in self._split_constraint_expr(constraint): 257 rule = self.constraint_rule.search(c) 258 reset = self.constraint_reset.search(c) 259 if rule: 260 env.append(rule["env"]) 261 if rule.groupdict().get("unit"): 262 self.env_types[rule["env"]] = rule["unit"] 263 if rule["val"][0].isalpha(): 264 self.constraint_vars.add(rule["val"]) 265 # try to infer unit from constants or parameters 266 val_for_unit = rule["val"].lower().replace("()", "") 267 if val_for_unit.endswith("_ns"): 268 self.env_types[rule["env"]] = "ns" 269 if val_for_unit.endswith("_jiffies"): 270 self.env_types[rule["env"]] = "j" 271 if reset: 272 env.append(reset["env"]) 273 # environment variables that are reset need a storage 274 self.env_stored.add(reset["env"]) 275 return env 276 277 def __create_matrix(self) -> tuple[list[list[str]], dict[_ConstraintKey, list[str]]]: 278 # transform the array into a dictionary 279 events = self.events 280 states = self.states 281 events_dict = {} 282 states_dict = {} 283 nr_event = 0 284 for event in events: 285 events_dict[event] = nr_event 286 nr_event += 1 287 288 nr_state = 0 289 for state in states: 290 states_dict[state] = nr_state 291 nr_state += 1 292 293 # declare the matrix.... 294 matrix = [[self.invalid_state_str for _ in range(nr_event)] for _ in range(nr_state)] 295 constraints: dict[_ConstraintKey, list[str]] = {} 296 297 # and we are back! Let's fill the matrix 298 cursor = self.__get_cursor_begin_events() 299 300 for line in map(str.lstrip, 301 islice(self.__dot_lines, cursor, None)): 302 303 if not line or line[0] != '"': 304 break 305 306 split_line = line.split() 307 308 if len(split_line) > 2 and split_line[1] == "->": 309 origin_state = split_line[0].replace('"', '').replace(',', '_') 310 dest_state = split_line[2].replace('"', '').replace(',', '_') 311 possible_events = "".join(split_line[split_line.index("label") + 2:-1]).replace('"', '') 312 for event in possible_events.split("\\n"): 313 event, *constr = event.split(";") 314 if constr: 315 key = _EventConstraintKey(states_dict[origin_state], events_dict[event]) 316 constraints[key] = constr 317 # those events reset also on self loops 318 if origin_state == dest_state and "reset" in "".join(constr): 319 self.self_loop_reset_events.add(event) 320 matrix[states_dict[origin_state]][events_dict[event]] = dest_state 321 else: 322 state = line.split("label")[1].split('"')[1] 323 state, *constr = state.replace(" ", "").split("\\n") 324 if constr: 325 constraints[_StateConstraintKey(states_dict[state])] = constr 326 327 return matrix, constraints 328 329 def __store_init_events(self) -> tuple[list[bool], list[bool]]: 330 events_start = [False] * len(self.events) 331 events_start_run = [False] * len(self.events) 332 for i in range(len(self.events)): 333 curr_event_will_init = 0 334 curr_event_from_init = False 335 curr_event_used = 0 336 for j in range(len(self.states)): 337 if self.function[j][i] != self.invalid_state_str: 338 curr_event_used += 1 339 if self.function[j][i] == self.initial_state: 340 curr_event_will_init += 1 341 if self.function[0][i] != self.invalid_state_str: 342 curr_event_from_init = True 343 # this event always leads to init 344 if curr_event_will_init and curr_event_used == curr_event_will_init: 345 events_start[i] = True 346 # this event is only called from init 347 if curr_event_from_init and curr_event_used == 1: 348 events_start_run[i] = True 349 return events_start, events_start_run 350 351 def is_start_event(self, event: str) -> bool: 352 return self.events_start[self.events.index(event)] 353 354 def is_start_run_event(self, event: str) -> bool: 355 # prefer handle_start_event if there 356 if any(self.events_start): 357 return False 358 return self.events_start_run[self.events.index(event)] 359 360 def is_hybrid_automata(self) -> bool: 361 return bool(self.envs) 362 363 def is_event_constraint(self, key: _ConstraintKey) -> bool: 364 """ 365 Given the key in self.constraints return true if it is an event 366 constraint, false if it is a state constraint 367 """ 368 return isinstance(key, _EventConstraintKey) 369