xref: /linux/tools/verification/rvgen/rvgen/ltl2k.py (revision 4ff261e725d7376c12e745fdbe8a33cd6dbd5a83)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0-only
3
4from pathlib import Path
5from . import generator
6from . import ltl2ba
7
8COLUMN_LIMIT = 100
9
10def line_len(line: str) -> int:
11    tabs = line.count('\t')
12    return tabs * 7 + len(line)
13
14def break_long_line(line: str, indent='') -> list[str]:
15    result = []
16    while line_len(line) > COLUMN_LIMIT:
17        i = line[:COLUMN_LIMIT - line_len(line)].rfind(' ')
18        result.append(line[:i])
19        line = indent + line[i + 1:]
20    if line:
21        result.append(line)
22    return result
23
24def build_condition_string(node: ltl2ba.GraphNode):
25    if not node.labels:
26        return "(true)"
27
28    result = "("
29
30    first = True
31    for label in sorted(node.labels):
32        if not first:
33            result += " && "
34        result += label
35        first = False
36
37    result += ")"
38
39    return result
40
41def abbreviate_atoms(atoms: list[str]) -> list[str]:
42    def shorten(s: str) -> str:
43        skip = ["is", "by", "or", "and"]
44        return '_'.join([x[:2] for x in s.lower().split('_') if x not in skip])
45
46    abbrs = []
47    for atom in atoms:
48        for i in range(len(atom), -1, -1):
49            if sum(a.startswith(atom[:i]) for a in atoms) > 1:
50                break
51        share = atom[:i]
52        unique = atom[i:]
53        abbrs.append((shorten(share) + shorten(unique)))
54    return abbrs
55
56class ltl2k(generator.Monitor):
57    template_dir = "ltl2k"
58
59    def __init__(self, file_path, MonitorType, extra_params={}):
60        if MonitorType != "per_task":
61            raise NotImplementedError("Only per_task monitor is supported for LTL")
62        super().__init__(extra_params)
63        with open(file_path) as f:
64            self.atoms, self.ba, self.ltl = ltl2ba.create_graph(f.read())
65        self.atoms_abbr = abbreviate_atoms(self.atoms)
66        self.name = extra_params.get("model_name")
67        if not self.name:
68            self.name = Path(file_path).stem
69
70    def _fill_states(self) -> str:
71        buf = [
72            "enum ltl_buchi_state {",
73        ]
74
75        for node in self.ba:
76            buf.append("\tS%i," % node.id)
77        buf.append("\tRV_NUM_BA_STATES")
78        buf.append("};")
79        buf.append("static_assert(RV_NUM_BA_STATES <= RV_MAX_BA_STATES);")
80        return buf
81
82    def _fill_atoms(self):
83        buf = ["enum ltl_atom {"]
84        for a in sorted(self.atoms):
85            buf.append("\tLTL_%s," % a)
86        buf.append("\tLTL_NUM_ATOM")
87        buf.append("};")
88        buf.append("static_assert(LTL_NUM_ATOM <= RV_MAX_LTL_ATOM);")
89        return buf
90
91    def _fill_atoms_to_string(self):
92        buf = [
93            "static const char *ltl_atom_str(enum ltl_atom atom)",
94            "{",
95            "\tstatic const char *const names[] = {"
96        ]
97
98        for name in self.atoms_abbr:
99            buf.append("\t\t\"%s\"," % name)
100
101        buf.extend([
102            "\t};",
103            "",
104            "\treturn names[atom];",
105            "}"
106        ])
107        return buf
108
109    def _fill_atom_values(self, required_values):
110        buf = []
111        for node in self.ltl:
112            if str(node) not in required_values:
113                continue
114
115            if isinstance(node.op, ltl2ba.AndOp):
116                buf.append("\tbool %s = %s && %s;" % (node, node.op.left, node.op.right))
117                required_values |= {str(node.op.left), str(node.op.right)}
118            elif isinstance(node.op, ltl2ba.OrOp):
119                buf.append("\tbool %s = %s || %s;" % (node, node.op.left, node.op.right))
120                required_values |= {str(node.op.left), str(node.op.right)}
121            elif isinstance(node.op, ltl2ba.NotOp):
122                buf.append("\tbool %s = !%s;" % (node, node.op.child))
123                required_values.add(str(node.op.child))
124
125        for atom in self.atoms:
126            if atom.lower() not in required_values:
127                continue
128            buf.append("\tbool %s = test_bit(LTL_%s, mon->atoms);" % (atom.lower(), atom))
129
130        buf.reverse()
131
132        buf2 = []
133        for line in buf:
134            buf2.extend(break_long_line(line, "\t     "))
135        return buf2
136
137    def _fill_transitions(self):
138        buf = [
139            "static void",
140            "ltl_possible_next_states(struct ltl_monitor *mon, unsigned int state, unsigned long *next)",
141            "{"
142        ]
143
144        required_values = set()
145        for node in self.ba:
146            for o in sorted(node.outgoing):
147                required_values |= o.labels
148
149        buf.extend(self._fill_atom_values(required_values))
150        buf.extend([
151            "",
152            "\tswitch (state) {"
153        ])
154
155        for node in self.ba:
156            buf.append("\tcase S%i:" % node.id)
157
158            for o in sorted(node.outgoing):
159                line   = "\t\tif "
160                indent = "\t\t   "
161
162                line += build_condition_string(o)
163                lines = break_long_line(line, indent)
164                buf.extend(lines)
165
166                buf.append("\t\t\t__set_bit(S%i, next);" % o.id)
167            buf.append("\t\tbreak;")
168        buf.extend([
169            "\t}",
170            "}"
171        ])
172
173        return buf
174
175    def _fill_start(self):
176        buf = [
177            "static void ltl_start(struct task_struct *task, struct ltl_monitor *mon)",
178            "{"
179        ]
180
181        required_values = set()
182        for node in self.ba:
183            if node.init:
184                required_values |= node.labels
185
186        buf.extend(self._fill_atom_values(required_values))
187        buf.append("")
188
189        for node in self.ba:
190            if not node.init:
191                continue
192
193            line   = "\tif "
194            indent = "\t   "
195
196            line += build_condition_string(node)
197            lines = break_long_line(line, indent)
198            buf.extend(lines)
199
200            buf.append("\t\t__set_bit(S%i, mon->states);" % node.id)
201        buf.append("}")
202        return buf
203
204    def fill_tracepoint_handlers_skel(self):
205        buff = []
206        buff.append("static void handle_example_event(void *data, /* XXX: fill header */)")
207        buff.append("{")
208        buff.append("\tltl_atom_update(task, LTL_%s, true/false);" % self.atoms[0])
209        buff.append("}")
210        buff.append("")
211        return '\n'.join(buff)
212
213    def fill_tracepoint_attach_probe(self):
214        return "\trv_attach_trace_probe(\"%s\", /* XXX: tracepoint */, handle_example_event);" \
215                % self.name
216
217    def fill_tracepoint_detach_helper(self):
218        return "\trv_detach_trace_probe(\"%s\", /* XXX: tracepoint */, handle_sample_event);" \
219                % self.name
220
221    def fill_atoms_init(self):
222        buff = []
223        for a in self.atoms:
224            buff.append("\tltl_atom_set(mon, LTL_%s, true/false);" % a)
225        return '\n'.join(buff)
226
227    def fill_model_h(self):
228        buf = [
229            "/* SPDX-License-Identifier: GPL-2.0 */",
230            "",
231            "/*",
232            " * C implementation of Buchi automaton, automatically generated by",
233            " * tools/verification/rvgen from the linear temporal logic specification.",
234            " * For further information, see kernel documentation:",
235            " *   Documentation/trace/rv/linear_temporal_logic.rst",
236            " */",
237            "",
238            "#include <linux/rv.h>",
239            "",
240            "#define MONITOR_NAME " + self.name,
241            ""
242        ]
243
244        buf.extend(self._fill_atoms())
245        buf.append('')
246
247        buf.extend(self._fill_atoms_to_string())
248        buf.append('')
249
250        buf.extend(self._fill_states())
251        buf.append('')
252
253        buf.extend(self._fill_start())
254        buf.append('')
255
256        buf.extend(self._fill_transitions())
257        buf.append('')
258
259        return '\n'.join(buf)
260
261    def fill_monitor_class_type(self):
262        return "LTL_MON_EVENTS_ID"
263
264    def fill_monitor_class(self):
265        return "ltl_monitor_id"
266
267    def fill_main_c(self):
268        main_c = super().fill_main_c()
269        main_c = main_c.replace("%%ATOMS_INIT%%", self.fill_atoms_init())
270
271        return main_c
272