1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0-only 3 4from pathlib import Path 5from . import generator 6from . import ltl2ba 7 8COLUMN_LIMIT = 100 9 10def line_len(line: str) -> int: 11 tabs = line.count('\t') 12 return tabs * 7 + len(line) 13 14def break_long_line(line: str, indent='') -> list[str]: 15 result = [] 16 while line_len(line) > COLUMN_LIMIT: 17 i = line[:COLUMN_LIMIT - line_len(line)].rfind(' ') 18 result.append(line[:i]) 19 line = indent + line[i + 1:] 20 if line: 21 result.append(line) 22 return result 23 24def build_condition_string(node: ltl2ba.GraphNode): 25 if not node.labels: 26 return "(true)" 27 28 result = "(" 29 30 first = True 31 for label in sorted(node.labels): 32 if not first: 33 result += " && " 34 result += label 35 first = False 36 37 result += ")" 38 39 return result 40 41def abbreviate_atoms(atoms: list[str]) -> list[str]: 42 def shorten(s: str) -> str: 43 skip = ["is", "by", "or", "and"] 44 return '_'.join([x[:2] for x in s.lower().split('_') if x not in skip]) 45 46 abbrs = [] 47 for atom in atoms: 48 for i in range(len(atom), -1, -1): 49 if sum(a.startswith(atom[:i]) for a in atoms) > 1: 50 break 51 share = atom[:i] 52 unique = atom[i:] 53 abbrs.append((shorten(share) + shorten(unique))) 54 return abbrs 55 56class ltl2k(generator.Monitor): 57 template_dir = "ltl2k" 58 59 def __init__(self, file_path, MonitorType, extra_params={}): 60 if MonitorType != "per_task": 61 raise NotImplementedError("Only per_task monitor is supported for LTL") 62 super().__init__(extra_params) 63 with open(file_path) as f: 64 self.atoms, self.ba, self.ltl = ltl2ba.create_graph(f.read()) 65 self.atoms_abbr = abbreviate_atoms(self.atoms) 66 self.name = extra_params.get("model_name") 67 if not self.name: 68 self.name = Path(file_path).stem 69 70 def _fill_states(self) -> str: 71 buf = [ 72 "enum ltl_buchi_state {", 73 ] 74 75 for node in self.ba: 76 buf.append("\tS%i," % node.id) 77 buf.append("\tRV_NUM_BA_STATES") 78 buf.append("};") 79 buf.append("static_assert(RV_NUM_BA_STATES <= RV_MAX_BA_STATES);") 80 return buf 81 82 def _fill_atoms(self): 83 buf = ["enum ltl_atom {"] 84 for a in sorted(self.atoms): 85 buf.append("\tLTL_%s," % a) 86 buf.append("\tLTL_NUM_ATOM") 87 buf.append("};") 88 buf.append("static_assert(LTL_NUM_ATOM <= RV_MAX_LTL_ATOM);") 89 return buf 90 91 def _fill_atoms_to_string(self): 92 buf = [ 93 "static const char *ltl_atom_str(enum ltl_atom atom)", 94 "{", 95 "\tstatic const char *const names[] = {" 96 ] 97 98 for name in self.atoms_abbr: 99 buf.append("\t\t\"%s\"," % name) 100 101 buf.extend([ 102 "\t};", 103 "", 104 "\treturn names[atom];", 105 "}" 106 ]) 107 return buf 108 109 def _fill_atom_values(self, required_values): 110 buf = [] 111 for node in self.ltl: 112 if str(node) not in required_values: 113 continue 114 115 if isinstance(node.op, ltl2ba.AndOp): 116 buf.append("\tbool %s = %s && %s;" % (node, node.op.left, node.op.right)) 117 required_values |= {str(node.op.left), str(node.op.right)} 118 elif isinstance(node.op, ltl2ba.OrOp): 119 buf.append("\tbool %s = %s || %s;" % (node, node.op.left, node.op.right)) 120 required_values |= {str(node.op.left), str(node.op.right)} 121 elif isinstance(node.op, ltl2ba.NotOp): 122 buf.append("\tbool %s = !%s;" % (node, node.op.child)) 123 required_values.add(str(node.op.child)) 124 125 for atom in self.atoms: 126 if atom.lower() not in required_values: 127 continue 128 buf.append("\tbool %s = test_bit(LTL_%s, mon->atoms);" % (atom.lower(), atom)) 129 130 buf.reverse() 131 132 buf2 = [] 133 for line in buf: 134 buf2.extend(break_long_line(line, "\t ")) 135 return buf2 136 137 def _fill_transitions(self): 138 buf = [ 139 "static void", 140 "ltl_possible_next_states(struct ltl_monitor *mon, unsigned int state, unsigned long *next)", 141 "{" 142 ] 143 144 required_values = set() 145 for node in self.ba: 146 for o in sorted(node.outgoing): 147 required_values |= o.labels 148 149 buf.extend(self._fill_atom_values(required_values)) 150 buf.extend([ 151 "", 152 "\tswitch (state) {" 153 ]) 154 155 for node in self.ba: 156 buf.append("\tcase S%i:" % node.id) 157 158 for o in sorted(node.outgoing): 159 line = "\t\tif " 160 indent = "\t\t " 161 162 line += build_condition_string(o) 163 lines = break_long_line(line, indent) 164 buf.extend(lines) 165 166 buf.append("\t\t\t__set_bit(S%i, next);" % o.id) 167 buf.append("\t\tbreak;") 168 buf.extend([ 169 "\t}", 170 "}" 171 ]) 172 173 return buf 174 175 def _fill_start(self): 176 buf = [ 177 "static void ltl_start(struct task_struct *task, struct ltl_monitor *mon)", 178 "{" 179 ] 180 181 required_values = set() 182 for node in self.ba: 183 if node.init: 184 required_values |= node.labels 185 186 buf.extend(self._fill_atom_values(required_values)) 187 buf.append("") 188 189 for node in self.ba: 190 if not node.init: 191 continue 192 193 line = "\tif " 194 indent = "\t " 195 196 line += build_condition_string(node) 197 lines = break_long_line(line, indent) 198 buf.extend(lines) 199 200 buf.append("\t\t__set_bit(S%i, mon->states);" % node.id) 201 buf.append("}") 202 return buf 203 204 def fill_tracepoint_handlers_skel(self): 205 buff = [] 206 buff.append("static void handle_example_event(void *data, /* XXX: fill header */)") 207 buff.append("{") 208 buff.append("\tltl_atom_update(task, LTL_%s, true/false);" % self.atoms[0]) 209 buff.append("}") 210 buff.append("") 211 return '\n'.join(buff) 212 213 def fill_tracepoint_attach_probe(self): 214 return "\trv_attach_trace_probe(\"%s\", /* XXX: tracepoint */, handle_example_event);" \ 215 % self.name 216 217 def fill_tracepoint_detach_helper(self): 218 return "\trv_detach_trace_probe(\"%s\", /* XXX: tracepoint */, handle_sample_event);" \ 219 % self.name 220 221 def fill_atoms_init(self): 222 buff = [] 223 for a in self.atoms: 224 buff.append("\tltl_atom_set(mon, LTL_%s, true/false);" % a) 225 return '\n'.join(buff) 226 227 def fill_model_h(self): 228 buf = [ 229 "/* SPDX-License-Identifier: GPL-2.0 */", 230 "", 231 "/*", 232 " * C implementation of Buchi automaton, automatically generated by", 233 " * tools/verification/rvgen from the linear temporal logic specification.", 234 " * For further information, see kernel documentation:", 235 " * Documentation/trace/rv/linear_temporal_logic.rst", 236 " */", 237 "", 238 "#include <linux/rv.h>", 239 "", 240 "#define MONITOR_NAME " + self.name, 241 "" 242 ] 243 244 buf.extend(self._fill_atoms()) 245 buf.append('') 246 247 buf.extend(self._fill_atoms_to_string()) 248 buf.append('') 249 250 buf.extend(self._fill_states()) 251 buf.append('') 252 253 buf.extend(self._fill_start()) 254 buf.append('') 255 256 buf.extend(self._fill_transitions()) 257 buf.append('') 258 259 return '\n'.join(buf) 260 261 def fill_monitor_class_type(self): 262 return "LTL_MON_EVENTS_ID" 263 264 def fill_monitor_class(self): 265 return "ltl_monitor_id" 266 267 def fill_main_c(self): 268 main_c = super().fill_main_c() 269 main_c = main_c.replace("%%ATOMS_INIT%%", self.fill_atoms_init()) 270 271 return main_c 272