1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0-only 3 4from pathlib import Path 5from . import generator 6from . import ltl2ba 7from .automata import AutomataError 8 9COLUMN_LIMIT = 100 10 11def line_len(line: str) -> int: 12 tabs = line.count('\t') 13 return tabs * 7 + len(line) 14 15def break_long_line(line: str, indent='') -> list[str]: 16 result = [] 17 while line_len(line) > COLUMN_LIMIT: 18 i = line[:COLUMN_LIMIT - line_len(line)].rfind(' ') 19 result.append(line[:i]) 20 line = indent + line[i + 1:] 21 if line: 22 result.append(line) 23 return result 24 25def build_condition_string(node: ltl2ba.GraphNode): 26 if not node.labels: 27 return "(true)" 28 29 result = "(" 30 31 first = True 32 for label in sorted(node.labels): 33 if not first: 34 result += " && " 35 result += label 36 first = False 37 38 result += ")" 39 40 return result 41 42def abbreviate_atoms(atoms: list[str]) -> list[str]: 43 def shorten(s: str) -> str: 44 skip = ["is", "by", "or", "and"] 45 return '_'.join([x[:2] for x in s.lower().split('_') if x not in skip]) 46 47 def find_share_length(atom: str) -> int: 48 for i in range(len(atom), -1, -1): 49 if sum(a.startswith(atom[:i]) for a in atoms) > 1: 50 return i 51 return 0 52 53 abbrs = [] 54 for atom in atoms: 55 share_len = find_share_length(atom) 56 share = atom[:share_len] 57 unique = atom[share_len:] 58 abbrs.append((shorten(share) + shorten(unique))) 59 return abbrs 60 61class ltl2k(generator.Monitor): 62 template_dir = "ltl2k" 63 64 def __init__(self, file_path, MonitorType, extra_params={}): 65 if MonitorType != "per_task": 66 raise NotImplementedError("Only per_task monitor is supported for LTL") 67 super().__init__(extra_params) 68 try: 69 with open(file_path) as f: 70 self.atoms, self.ba, self.ltl = ltl2ba.create_graph(f.read()) 71 except OSError as exc: 72 raise AutomataError(exc.strerror) from exc 73 self.atoms_abbr = abbreviate_atoms(self.atoms) 74 self.name = extra_params.get("model_name") 75 if not self.name: 76 self.name = Path(file_path).stem 77 78 def _fill_states(self) -> list[str]: 79 buf = [ 80 "enum ltl_buchi_state {", 81 ] 82 83 for node in self.ba: 84 buf.append(f"\tS{node.id},") 85 buf.append("\tRV_NUM_BA_STATES") 86 buf.append("};") 87 buf.append("static_assert(RV_NUM_BA_STATES <= RV_MAX_BA_STATES);") 88 return buf 89 90 def _fill_atoms(self): 91 buf = ["enum ltl_atom {"] 92 for a in sorted(self.atoms): 93 buf.append(f"\tLTL_{a},") 94 buf.append("\tLTL_NUM_ATOM") 95 buf.append("};") 96 buf.append("static_assert(LTL_NUM_ATOM <= RV_MAX_LTL_ATOM);") 97 return buf 98 99 def _fill_atoms_to_string(self): 100 buf = [ 101 "static const char *ltl_atom_str(enum ltl_atom atom)", 102 "{", 103 "\tstatic const char *const names[] = {" 104 ] 105 106 for name in self.atoms_abbr: 107 buf.append(f"\t\t\"{name}\",") 108 109 buf.extend([ 110 "\t};", 111 "", 112 "\treturn names[atom];", 113 "}" 114 ]) 115 return buf 116 117 def _fill_atom_values(self, required_values): 118 buf = [] 119 for node in self.ltl: 120 if str(node) not in required_values: 121 continue 122 123 if isinstance(node.op, ltl2ba.AndOp): 124 buf.append(f"\tbool {node} = {node.op.left} && {node.op.right};") 125 required_values |= {str(node.op.left), str(node.op.right)} 126 elif isinstance(node.op, ltl2ba.OrOp): 127 buf.append(f"\tbool {node} = {node.op.left} || {node.op.right};") 128 required_values |= {str(node.op.left), str(node.op.right)} 129 elif isinstance(node.op, ltl2ba.NotOp): 130 buf.append(f"\tbool {node} = !{node.op.child};") 131 required_values.add(str(node.op.child)) 132 133 for atom in self.atoms: 134 if atom.lower() not in required_values: 135 continue 136 buf.append(f"\tbool {atom.lower()} = test_bit(LTL_{atom}, mon->atoms);") 137 138 buf.reverse() 139 140 buf2 = [] 141 for line in buf: 142 buf2.extend(break_long_line(line, "\t ")) 143 return buf2 144 145 def _fill_transitions(self): 146 buf = [ 147 "static void", 148 "ltl_possible_next_states(struct ltl_monitor *mon, unsigned int state, unsigned long *next)", 149 "{" 150 ] 151 152 required_values = set() 153 for node in self.ba: 154 for o in sorted(node.outgoing): 155 required_values |= o.labels 156 157 buf.extend(self._fill_atom_values(required_values)) 158 buf.extend([ 159 "", 160 "\tswitch (state) {" 161 ]) 162 163 for node in self.ba: 164 buf.append(f"\tcase S{node.id}:") 165 166 for o in sorted(node.outgoing): 167 line = "\t\tif " 168 indent = "\t\t " 169 170 line += build_condition_string(o) 171 lines = break_long_line(line, indent) 172 buf.extend(lines) 173 174 buf.append(f"\t\t\t__set_bit(S{o.id}, next);") 175 buf.append("\t\tbreak;") 176 buf.extend([ 177 "\t}", 178 "}" 179 ]) 180 181 return buf 182 183 def _fill_start(self): 184 buf = [ 185 "static void ltl_start(struct task_struct *task, struct ltl_monitor *mon)", 186 "{" 187 ] 188 189 required_values = set() 190 for node in self.ba: 191 if node.init: 192 required_values |= node.labels 193 194 buf.extend(self._fill_atom_values(required_values)) 195 buf.append("") 196 197 for node in self.ba: 198 if not node.init: 199 continue 200 201 line = "\tif " 202 indent = "\t " 203 204 line += build_condition_string(node) 205 lines = break_long_line(line, indent) 206 buf.extend(lines) 207 208 buf.append(f"\t\t__set_bit(S{node.id}, mon->states);") 209 buf.append("}") 210 return buf 211 212 def fill_tracepoint_handlers_skel(self): 213 buff = [] 214 buff.append("static void handle_example_event(void *data, /* XXX: fill header */)") 215 buff.append("{") 216 buff.append(f"\tltl_atom_update(task, LTL_{self.atoms[0]}, true/false);") 217 buff.append("}") 218 buff.append("") 219 return '\n'.join(buff) 220 221 def fill_tracepoint_attach_probe(self): 222 return f"\trv_attach_trace_probe(\"{self.name}\", /* XXX: tracepoint */, handle_example_event);" 223 224 def fill_tracepoint_detach_helper(self): 225 return f"\trv_detach_trace_probe(\"{self.name}\", /* XXX: tracepoint */, handle_sample_event);" 226 227 def fill_atoms_init(self): 228 buff = [] 229 for a in self.atoms: 230 buff.append(f"\tltl_atom_set(mon, LTL_{a}, true/false);") 231 return '\n'.join(buff) 232 233 def fill_model_h(self): 234 buf = [ 235 "/* SPDX-License-Identifier: GPL-2.0 */", 236 "", 237 "/*", 238 " * C implementation of Buchi automaton, automatically generated by", 239 " * tools/verification/rvgen from the linear temporal logic specification.", 240 " * For further information, see kernel documentation:", 241 " * Documentation/trace/rv/linear_temporal_logic.rst", 242 " */", 243 "", 244 "#include <linux/rv.h>", 245 "", 246 "#define MONITOR_NAME " + self.name, 247 "" 248 ] 249 250 buf.extend(self._fill_atoms()) 251 buf.append('') 252 253 buf.extend(self._fill_atoms_to_string()) 254 buf.append('') 255 256 buf.extend(self._fill_states()) 257 buf.append('') 258 259 buf.extend(self._fill_start()) 260 buf.append('') 261 262 buf.extend(self._fill_transitions()) 263 buf.append('') 264 265 return '\n'.join(buf) 266 267 def fill_monitor_class_type(self): 268 return "LTL_MON_EVENTS_ID" 269 270 def fill_monitor_class(self): 271 return "ltl_monitor_id" 272 273 def fill_main_c(self): 274 main_c = super().fill_main_c() 275 main_c = main_c.replace("%%ATOMS_INIT%%", self.fill_atoms_init()) 276 277 return main_c 278