1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0-only 3# 4# Implementation based on 5# Gerth, R., Peled, D., Vardi, M.Y., Wolper, P. (1996). 6# Simple On-the-fly Automatic Verification of Linear Temporal Logic. 7# https://doi.org/10.1007/978-0-387-34892-6_1 8# With extra optimizations 9 10from ply.lex import lex 11from ply.yacc import yacc 12from .automata import AutomataError 13 14# Grammar: 15# ltl ::= opd | ( ltl ) | ltl binop ltl | unop ltl 16# 17# Operands (opd): 18# true, false, user-defined names 19# 20# Unary Operators (unop): 21# always 22# eventually 23# next 24# not 25# 26# Binary Operators (binop): 27# until 28# and 29# or 30# imply 31# equivalent 32 33tokens = ( 34 'AND', 35 'OR', 36 'IMPLY', 37 'UNTIL', 38 'ALWAYS', 39 'EVENTUALLY', 40 'NEXT', 41 'VARIABLE', 42 'LITERAL', 43 'NOT', 44 'LPAREN', 45 'RPAREN', 46 'ASSIGN', 47) 48 49t_AND = r'and' 50t_OR = r'or' 51t_IMPLY = r'imply' 52t_UNTIL = r'until' 53t_ALWAYS = r'always' 54t_NEXT = r'next' 55t_EVENTUALLY = r'eventually' 56t_VARIABLE = r'[A-Z_0-9]+' 57t_LITERAL = r'true|false' 58t_NOT = r'not' 59t_LPAREN = r'\(' 60t_RPAREN = r'\)' 61t_ASSIGN = r'=' 62t_ignore_COMMENT = r'\#.*' 63t_ignore = ' \t\n' 64 65def t_error(t): 66 raise AutomataError(f"Illegal character '{t.value[0]}'") 67 68lexer = lex() 69 70class GraphNode: 71 uid = 0 72 73 def __init__(self, incoming: set['GraphNode'], new, old, _next): 74 self.init = False 75 self.outgoing = set() 76 self.labels = set() 77 self.incoming = incoming.copy() 78 self.new = new.copy() 79 self.old = old.copy() 80 self.next = _next.copy() 81 self.id = GraphNode.uid 82 GraphNode.uid += 1 83 84 def expand(self, node_set): 85 if not self.new: 86 for nd in node_set: 87 if nd.old == self.old and nd.next == self.next: 88 nd.incoming |= self.incoming 89 return node_set 90 91 new_current_node = GraphNode({self}, self.next, set(), set()) 92 return new_current_node.expand({self} | node_set) 93 n = self.new.pop() 94 return n.expand(self, node_set) 95 96 def __lt__(self, other): 97 return self.id < other.id 98 99class ASTNode: 100 uid = 1 101 102 def __init__(self, op): 103 self.op = op 104 self.id = ASTNode.uid 105 ASTNode.uid += 1 106 107 def __hash__(self): 108 return hash(self.op) 109 110 def __eq__(self, other): 111 return self is other 112 113 def __iter__(self): 114 yield self 115 yield from self.op 116 117 def negate(self): 118 self.op = self.op.negate() 119 return self 120 121 def expand(self, node, node_set): 122 return self.op.expand(self, node, node_set) 123 124 def __str__(self): 125 if isinstance(self.op, Literal): 126 return str(self.op.value) 127 if isinstance(self.op, Variable): 128 return self.op.name.lower() 129 return "val" + str(self.id) 130 131 def normalize(self): 132 # Get rid of: 133 # - ALWAYS 134 # - EVENTUALLY 135 # - IMPLY 136 # And move all the NOT to be inside 137 self.op = self.op.normalize() 138 return self 139 140class BinaryOp: 141 op_str = "not_supported" 142 143 def __init__(self, left: ASTNode, right: ASTNode): 144 self.left = left 145 self.right = right 146 147 def __hash__(self): 148 return hash((self.left, self.right)) 149 150 def __iter__(self): 151 yield from self.left 152 yield from self.right 153 154 def normalize(self): 155 raise NotImplementedError 156 157 def negate(self): 158 raise NotImplementedError 159 160 def _is_temporal(self): 161 raise NotImplementedError 162 163 def is_temporal(self): 164 if self.left.op.is_temporal(): 165 return True 166 if self.right.op.is_temporal(): 167 return True 168 return self._is_temporal() 169 170 @staticmethod 171 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 172 raise NotImplementedError 173 174class AndOp(BinaryOp): 175 op_str = '&&' 176 177 def normalize(self): 178 return self 179 180 def negate(self): 181 return OrOp(self.left.negate(), self.right.negate()) 182 183 def _is_temporal(self): 184 return False 185 186 @staticmethod 187 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 188 if not n.op.is_temporal(): 189 node.old.add(n) 190 return node.expand(node_set) 191 192 tmp = GraphNode(node.incoming, 193 node.new | ({n.op.left, n.op.right} - node.old), 194 node.old | {n}, 195 node.next) 196 return tmp.expand(node_set) 197 198class OrOp(BinaryOp): 199 op_str = '||' 200 201 def normalize(self): 202 return self 203 204 def negate(self): 205 return AndOp(self.left.negate(), self.right.negate()) 206 207 def _is_temporal(self): 208 return False 209 210 @staticmethod 211 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 212 if not n.op.is_temporal(): 213 node.old |= {n} 214 return node.expand(node_set) 215 216 node1 = GraphNode(node.incoming, 217 node.new | ({n.op.left} - node.old), 218 node.old | {n}, 219 node.next) 220 node2 = GraphNode(node.incoming, 221 node.new | ({n.op.right} - node.old), 222 node.old | {n}, 223 node.next) 224 return node2.expand(node1.expand(node_set)) 225 226class UntilOp(BinaryOp): 227 def normalize(self): 228 return self 229 230 def negate(self): 231 return VOp(self.left.negate(), self.right.negate()) 232 233 def _is_temporal(self): 234 return True 235 236 @staticmethod 237 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 238 node1 = GraphNode(node.incoming, 239 node.new | ({n.op.left} - node.old), 240 node.old | {n}, 241 node.next | {n}) 242 node2 = GraphNode(node.incoming, 243 node.new | ({n.op.right} - node.old), 244 node.old | {n}, 245 node.next) 246 return node2.expand(node1.expand(node_set)) 247 248class VOp(BinaryOp): 249 def normalize(self): 250 return self 251 252 def negate(self): 253 return UntilOp(self.left.negate(), self.right.negate()) 254 255 def _is_temporal(self): 256 return True 257 258 @staticmethod 259 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 260 node1 = GraphNode(node.incoming, 261 node.new | ({n.op.right} - node.old), 262 node.old | {n}, 263 node.next | {n}) 264 node2 = GraphNode(node.incoming, 265 node.new | ({n.op.left, n.op.right} - node.old), 266 node.old | {n}, 267 node.next) 268 return node2.expand(node1.expand(node_set)) 269 270class ImplyOp(BinaryOp): 271 def normalize(self): 272 # P -> Q === !P | Q 273 return OrOp(self.left.negate(), self.right) 274 275 def _is_temporal(self): 276 return False 277 278 def negate(self): 279 # !(P -> Q) === !(!P | Q) === P & !Q 280 return AndOp(self.left, self.right.negate()) 281 282class UnaryOp: 283 def __init__(self, child: ASTNode): 284 self.child = child 285 286 def __iter__(self): 287 yield from self.child 288 289 def __hash__(self): 290 return hash(self.child) 291 292 def normalize(self): 293 raise NotImplementedError 294 295 def _is_temporal(self): 296 raise NotImplementedError 297 298 def is_temporal(self): 299 if self.child.op.is_temporal(): 300 return True 301 return self._is_temporal() 302 303 def negate(self): 304 raise NotImplementedError 305 306class EventuallyOp(UnaryOp): 307 def __str__(self): 308 return "eventually " + str(self.child) 309 310 def normalize(self): 311 # <>F == true U F 312 return UntilOp(ASTNode(Literal(True)), self.child) 313 314 def _is_temporal(self): 315 return True 316 317 def negate(self): 318 # !<>F == [](!F) 319 return AlwaysOp(self.child.negate()).normalize() 320 321class AlwaysOp(UnaryOp): 322 def normalize(self): 323 # []F === !(true U !F) == false V F 324 new = ASTNode(Literal(False)) 325 return VOp(new, self.child) 326 327 def _is_temporal(self): 328 return True 329 330 def negate(self): 331 # ![]F == <>(!F) 332 return EventuallyOp(self.child.negate()).normalize() 333 334class NextOp(UnaryOp): 335 def normalize(self): 336 return self 337 338 def _is_temporal(self): 339 return True 340 341 def negate(self): 342 # not (next A) == next (not A) 343 self.child = self.child.negate() 344 return self 345 346 @staticmethod 347 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 348 tmp = GraphNode(node.incoming, 349 node.new, 350 node.old | {n}, 351 node.next | {n.op.child}) 352 return tmp.expand(node_set) 353 354class NotOp(UnaryOp): 355 def __str__(self): 356 return "!" + str(self.child) 357 358 def normalize(self): 359 return self.child.op.negate() 360 361 def negate(self): 362 return self.child.op 363 364 def _is_temporal(self): 365 return False 366 367 @staticmethod 368 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 369 for f in node.old: 370 if n.op.child is f: 371 return node_set 372 node.old |= {n} 373 return node.expand(node_set) 374 375class Variable: 376 def __init__(self, name: str): 377 self.name = name 378 379 def __hash__(self): 380 return hash(self.name) 381 382 def __iter__(self): 383 yield from () 384 385 def negate(self): 386 new = ASTNode(self) 387 return NotOp(new) 388 389 def normalize(self): 390 return self 391 392 def is_temporal(self): 393 return False 394 395 @staticmethod 396 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 397 for f in node.old: 398 if isinstance(f.op, NotOp) and f.op.child is n: 399 return node_set 400 node.old |= {n} 401 return node.expand(node_set) 402 403class Literal: 404 def __init__(self, value: bool): 405 self.value = value 406 407 def __iter__(self): 408 yield from () 409 410 def __hash__(self): 411 return hash(self.value) 412 413 def __str__(self): 414 if self.value: 415 return "true" 416 return "false" 417 418 def negate(self): 419 self.value = not self.value 420 return self 421 422 def normalize(self): 423 return self 424 425 def is_temporal(self): 426 return False 427 428 @staticmethod 429 def expand(n: ASTNode, node: GraphNode, node_set) -> set[GraphNode]: 430 if not n.op.value: 431 return node_set 432 node.old |= {n} 433 return node.expand(node_set) 434 435def p_spec(p): 436 ''' 437 spec : assign 438 | assign spec 439 ''' 440 if len(p) == 3: 441 p[2].append(p[1]) 442 p[0] = p[2] 443 else: 444 p[0] = [p[1]] 445 446def p_assign(p): 447 ''' 448 assign : VARIABLE ASSIGN ltl 449 ''' 450 p[0] = (p[1], p[3]) 451 452def p_ltl(p): 453 ''' 454 ltl : opd 455 | binop 456 | unop 457 ''' 458 p[0] = p[1] 459 460def p_opd(p): 461 ''' 462 opd : VARIABLE 463 | LITERAL 464 | LPAREN ltl RPAREN 465 ''' 466 if p[1] == "true": 467 p[0] = ASTNode(Literal(True)) 468 elif p[1] == "false": 469 p[0] = ASTNode(Literal(False)) 470 elif p[1] == '(': 471 p[0] = p[2] 472 else: 473 p[0] = ASTNode(Variable(p[1])) 474 475def p_unop(p): 476 ''' 477 unop : ALWAYS ltl 478 | EVENTUALLY ltl 479 | NEXT ltl 480 | NOT ltl 481 ''' 482 if p[1] == "always": 483 op = AlwaysOp(p[2]) 484 elif p[1] == "eventually": 485 op = EventuallyOp(p[2]) 486 elif p[1] == "next": 487 op = NextOp(p[2]) 488 elif p[1] == "not": 489 op = NotOp(p[2]) 490 else: 491 raise AutomataError(f"Invalid unary operator {p[1]}") 492 493 p[0] = ASTNode(op) 494 495def p_binop(p): 496 ''' 497 binop : opd UNTIL ltl 498 | opd AND ltl 499 | opd OR ltl 500 | opd IMPLY ltl 501 ''' 502 if p[2] == "and": 503 op = AndOp(p[1], p[3]) 504 elif p[2] == "until": 505 op = UntilOp(p[1], p[3]) 506 elif p[2] == "or": 507 op = OrOp(p[1], p[3]) 508 elif p[2] == "imply": 509 op = ImplyOp(p[1], p[3]) 510 else: 511 raise AutomataError(f"Invalid binary operator {p[2]}") 512 513 p[0] = ASTNode(op) 514 515parser = yacc() 516 517def parse_ltl(s: str) -> ASTNode: 518 spec = parser.parse(s) 519 520 rule = None 521 subexpr = {} 522 523 for assign in spec: 524 if assign[0] == "RULE": 525 rule = assign[1] 526 else: 527 subexpr[assign[0]] = assign[1] 528 529 if rule is None: 530 raise AutomataError("Please define your specification in the \"RULE = <LTL spec>\" format") 531 532 for node in rule: 533 if not isinstance(node.op, Variable): 534 continue 535 replace = subexpr.get(node.op.name) 536 if replace is not None: 537 node.op = replace.op 538 539 return rule 540 541def create_graph(s: str): 542 atoms = set() 543 544 ltl = parse_ltl(s) 545 for c in ltl: 546 c.normalize() 547 if isinstance(c.op, Variable): 548 atoms.add(c.op.name) 549 550 init = GraphNode(set(), set(), set(), set()) 551 head = GraphNode({init}, {ltl}, set(), set()) 552 graph = sorted(head.expand(set())) 553 554 for i, node in enumerate(graph): 555 # The id assignment during graph generation has gaps. Reassign them 556 node.id = i 557 558 for incoming in node.incoming: 559 if incoming is init: 560 node.init = True 561 else: 562 incoming.outgoing.add(node) 563 for o in node.old: 564 if not o.op.is_temporal(): 565 node.labels.add(str(o)) 566 567 return sorted(atoms), graph, ltl 568