xref: /linux/tools/verification/rvgen/rvgen/ltl2k.py (revision 0fc8f6200d2313278fbf4539bbab74677c685531)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0-only
3
4from pathlib import Path
5from . import generator
6from . import ltl2ba
7from .automata import AutomataError
8
9COLUMN_LIMIT = 100
10
11def line_len(line: str) -> int:
12    tabs = line.count('\t')
13    return tabs * 7 + len(line)
14
15def break_long_line(line: str, indent='') -> list[str]:
16    result = []
17    while line_len(line) > COLUMN_LIMIT:
18        i = line[:COLUMN_LIMIT - line_len(line)].rfind(' ')
19        result.append(line[:i])
20        line = indent + line[i + 1:]
21    if line:
22        result.append(line)
23    return result
24
25def build_condition_string(node: ltl2ba.GraphNode):
26    if not node.labels:
27        return "(true)"
28
29    result = "("
30
31    first = True
32    for label in sorted(node.labels):
33        if not first:
34            result += " && "
35        result += label
36        first = False
37
38    result += ")"
39
40    return result
41
42def abbreviate_atoms(atoms: list[str]) -> list[str]:
43    def shorten(s: str) -> str:
44        skip = ["is", "by", "or", "and"]
45        return '_'.join([x[:2] for x in s.lower().split('_') if x not in skip])
46
47    def find_share_length(atom: str) -> int:
48        for i in range(len(atom), -1, -1):
49            if sum(a.startswith(atom[:i]) for a in atoms) > 1:
50                return i
51        return 0
52
53    abbrs = []
54    for atom in atoms:
55        share_len = find_share_length(atom)
56        share = atom[:share_len]
57        unique = atom[share_len:]
58        abbrs.append((shorten(share) + shorten(unique)))
59    return abbrs
60
61class ltl2k(generator.Monitor):
62    template_dir = "ltl2k"
63
64    def __init__(self, file_path, MonitorType, extra_params={}):
65        if MonitorType != "per_task":
66            raise NotImplementedError("Only per_task monitor is supported for LTL")
67        super().__init__(extra_params)
68        try:
69            with open(file_path) as f:
70                self.atoms, self.ba, self.ltl = ltl2ba.create_graph(f.read())
71        except OSError as exc:
72            raise AutomataError(exc.strerror) from exc
73        self.atoms_abbr = abbreviate_atoms(self.atoms)
74        self.name = extra_params.get("model_name")
75        if not self.name:
76            self.name = Path(file_path).stem
77
78    def _fill_states(self) -> list[str]:
79        buf = [
80            "enum ltl_buchi_state {",
81        ]
82
83        for node in self.ba:
84            buf.append(f"\tS{node.id},")
85        buf.append("\tRV_NUM_BA_STATES")
86        buf.append("};")
87        buf.append("static_assert(RV_NUM_BA_STATES <= RV_MAX_BA_STATES);")
88        return buf
89
90    def _fill_atoms(self):
91        buf = ["enum ltl_atom {"]
92        for a in sorted(self.atoms):
93            buf.append(f"\tLTL_{a},")
94        buf.append("\tLTL_NUM_ATOM")
95        buf.append("};")
96        buf.append("static_assert(LTL_NUM_ATOM <= RV_MAX_LTL_ATOM);")
97        return buf
98
99    def _fill_atoms_to_string(self):
100        buf = [
101            "static const char *ltl_atom_str(enum ltl_atom atom)",
102            "{",
103            "\tstatic const char *const names[] = {"
104        ]
105
106        for name in self.atoms_abbr:
107            buf.append(f"\t\t\"{name}\",")
108
109        buf.extend([
110            "\t};",
111            "",
112            "\treturn names[atom];",
113            "}"
114        ])
115        return buf
116
117    def _fill_atom_values(self, required_values):
118        buf = []
119        for node in self.ltl:
120            if str(node) not in required_values:
121                continue
122
123            if isinstance(node.op, ltl2ba.AndOp):
124                buf.append(f"\tbool {node} = {node.op.left} && {node.op.right};")
125                required_values |= {str(node.op.left), str(node.op.right)}
126            elif isinstance(node.op, ltl2ba.OrOp):
127                buf.append(f"\tbool {node} = {node.op.left} || {node.op.right};")
128                required_values |= {str(node.op.left), str(node.op.right)}
129            elif isinstance(node.op, ltl2ba.NotOp):
130                buf.append(f"\tbool {node} = !{node.op.child};")
131                required_values.add(str(node.op.child))
132
133        for atom in self.atoms:
134            if atom.lower() not in required_values:
135                continue
136            buf.append(f"\tbool {atom.lower()} = test_bit(LTL_{atom}, mon->atoms);")
137
138        buf.reverse()
139
140        buf2 = []
141        for line in buf:
142            buf2.extend(break_long_line(line, "\t     "))
143        return buf2
144
145    def _fill_transitions(self):
146        buf = [
147            "static void",
148            "ltl_possible_next_states(struct ltl_monitor *mon, unsigned int state, unsigned long *next)",
149            "{"
150        ]
151
152        required_values = set()
153        for node in self.ba:
154            for o in sorted(node.outgoing):
155                required_values |= o.labels
156
157        buf.extend(self._fill_atom_values(required_values))
158        buf.extend([
159            "",
160            "\tswitch (state) {"
161        ])
162
163        for node in self.ba:
164            buf.append(f"\tcase S{node.id}:")
165
166            for o in sorted(node.outgoing):
167                line   = "\t\tif "
168                indent = "\t\t   "
169
170                line += build_condition_string(o)
171                lines = break_long_line(line, indent)
172                buf.extend(lines)
173
174                buf.append(f"\t\t\t__set_bit(S{o.id}, next);")
175            buf.append("\t\tbreak;")
176        buf.extend([
177            "\t}",
178            "}"
179        ])
180
181        return buf
182
183    def _fill_start(self):
184        buf = [
185            "static void ltl_start(struct task_struct *task, struct ltl_monitor *mon)",
186            "{"
187        ]
188
189        required_values = set()
190        for node in self.ba:
191            if node.init:
192                required_values |= node.labels
193
194        buf.extend(self._fill_atom_values(required_values))
195        buf.append("")
196
197        for node in self.ba:
198            if not node.init:
199                continue
200
201            line   = "\tif "
202            indent = "\t   "
203
204            line += build_condition_string(node)
205            lines = break_long_line(line, indent)
206            buf.extend(lines)
207
208            buf.append(f"\t\t__set_bit(S{node.id}, mon->states);")
209        buf.append("}")
210        return buf
211
212    def fill_tracepoint_handlers_skel(self):
213        buff = []
214        buff.append("static void handle_example_event(void *data, /* XXX: fill header */)")
215        buff.append("{")
216        buff.append(f"\tltl_atom_update(task, LTL_{self.atoms[0]}, true/false);")
217        buff.append("}")
218        buff.append("")
219        return '\n'.join(buff)
220
221    def fill_tracepoint_attach_probe(self):
222        return f"\trv_attach_trace_probe(\"{self.name}\", /* XXX: tracepoint */, handle_example_event);"
223
224    def fill_tracepoint_detach_helper(self):
225        return f"\trv_detach_trace_probe(\"{self.name}\", /* XXX: tracepoint */, handle_sample_event);"
226
227    def fill_atoms_init(self):
228        buff = []
229        for a in self.atoms:
230            buff.append(f"\tltl_atom_set(mon, LTL_{a}, true/false);")
231        return '\n'.join(buff)
232
233    def fill_model_h(self):
234        buf = [
235            "/* SPDX-License-Identifier: GPL-2.0 */",
236            "",
237            "/*",
238            " * C implementation of Buchi automaton, automatically generated by",
239            " * tools/verification/rvgen from the linear temporal logic specification.",
240            " * For further information, see kernel documentation:",
241            " *   Documentation/trace/rv/linear_temporal_logic.rst",
242            " */",
243            "",
244            "#include <linux/rv.h>",
245            "",
246            "#define MONITOR_NAME " + self.name,
247            ""
248        ]
249
250        buf.extend(self._fill_atoms())
251        buf.append('')
252
253        buf.extend(self._fill_atoms_to_string())
254        buf.append('')
255
256        buf.extend(self._fill_states())
257        buf.append('')
258
259        buf.extend(self._fill_start())
260        buf.append('')
261
262        buf.extend(self._fill_transitions())
263        buf.append('')
264
265        return '\n'.join(buf)
266
267    def fill_monitor_class_type(self):
268        return "LTL_MON_EVENTS_ID"
269
270    def fill_monitor_class(self):
271        return "ltl_monitor_id"
272
273    def fill_main_c(self):
274        main_c = super().fill_main_c()
275        main_c = main_c.replace("%%ATOMS_INIT%%", self.fill_atoms_init())
276
277        return main_c
278