1 /* 2 * net/tipc/subscr.c: TIPC network topology service 3 * 4 * Copyright (c) 2000-2006, Ericsson AB 5 * Copyright (c) 2005-2007, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37 #include "core.h" 38 #include "dbg.h" 39 #include "name_table.h" 40 #include "port.h" 41 #include "ref.h" 42 #include "subscr.h" 43 44 /** 45 * struct subscriber - TIPC network topology subscriber 46 * @port_ref: object reference to server port connecting to subscriber 47 * @lock: pointer to spinlock controlling access to subscriber's server port 48 * @subscriber_list: adjacent subscribers in top. server's list of subscribers 49 * @subscription_list: list of subscription objects for this subscriber 50 */ 51 52 struct subscriber { 53 u32 port_ref; 54 spinlock_t *lock; 55 struct list_head subscriber_list; 56 struct list_head subscription_list; 57 }; 58 59 /** 60 * struct top_srv - TIPC network topology subscription service 61 * @user_ref: TIPC userid of subscription service 62 * @setup_port: reference to TIPC port that handles subscription requests 63 * @subscription_count: number of active subscriptions (not subscribers!) 64 * @subscriber_list: list of ports subscribing to service 65 * @lock: spinlock govering access to subscriber list 66 */ 67 68 struct top_srv { 69 u32 user_ref; 70 u32 setup_port; 71 atomic_t subscription_count; 72 struct list_head subscriber_list; 73 spinlock_t lock; 74 }; 75 76 static struct top_srv topsrv = { 0 }; 77 78 /** 79 * subscr_send_event - send a message containing a tipc_event to the subscriber 80 * 81 * Note: Must not hold subscriber's server port lock, since tipc_send() will 82 * try to take the lock if the message is rejected and returned! 83 */ 84 85 static void subscr_send_event(struct subscription *sub, 86 u32 found_lower, 87 u32 found_upper, 88 u32 event, 89 u32 port_ref, 90 u32 node) 91 { 92 struct iovec msg_sect; 93 94 msg_sect.iov_base = (void *)&sub->evt; 95 msg_sect.iov_len = sizeof(struct tipc_event); 96 97 sub->evt.event = htonl(event); 98 sub->evt.found_lower = htonl(found_lower); 99 sub->evt.found_upper = htonl(found_upper); 100 sub->evt.port.ref = htonl(port_ref); 101 sub->evt.port.node = htonl(node); 102 tipc_send(sub->server_ref, 1, &msg_sect); 103 } 104 105 /** 106 * tipc_subscr_overlap - test for subscription overlap with the given values 107 * 108 * Returns 1 if there is overlap, otherwise 0. 109 */ 110 111 int tipc_subscr_overlap(struct subscription *sub, 112 u32 found_lower, 113 u32 found_upper) 114 115 { 116 if (found_lower < sub->seq.lower) 117 found_lower = sub->seq.lower; 118 if (found_upper > sub->seq.upper) 119 found_upper = sub->seq.upper; 120 if (found_lower > found_upper) 121 return 0; 122 return 1; 123 } 124 125 /** 126 * tipc_subscr_report_overlap - issue event if there is subscription overlap 127 * 128 * Protected by nameseq.lock in name_table.c 129 */ 130 131 void tipc_subscr_report_overlap(struct subscription *sub, 132 u32 found_lower, 133 u32 found_upper, 134 u32 event, 135 u32 port_ref, 136 u32 node, 137 int must) 138 { 139 if (!tipc_subscr_overlap(sub, found_lower, found_upper)) 140 return; 141 if (!must && !(sub->filter & TIPC_SUB_PORTS)) 142 return; 143 144 sub->event_cb(sub, found_lower, found_upper, event, port_ref, node); 145 } 146 147 /** 148 * subscr_timeout - subscription timeout has occurred 149 */ 150 151 static void subscr_timeout(struct subscription *sub) 152 { 153 struct port *server_port; 154 155 /* Validate server port reference (in case subscriber is terminating) */ 156 157 server_port = tipc_port_lock(sub->server_ref); 158 if (server_port == NULL) 159 return; 160 161 /* Validate timeout (in case subscription is being cancelled) */ 162 163 if (sub->timeout == TIPC_WAIT_FOREVER) { 164 tipc_port_unlock(server_port); 165 return; 166 } 167 168 /* Unlink subscription from name table */ 169 170 tipc_nametbl_unsubscribe(sub); 171 172 /* Unlink subscription from subscriber */ 173 174 list_del(&sub->subscription_list); 175 176 /* Release subscriber's server port */ 177 178 tipc_port_unlock(server_port); 179 180 /* Notify subscriber of timeout */ 181 182 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, 183 TIPC_SUBSCR_TIMEOUT, 0, 0); 184 185 /* Now destroy subscription */ 186 187 k_term_timer(&sub->timer); 188 kfree(sub); 189 atomic_dec(&topsrv.subscription_count); 190 } 191 192 /** 193 * subscr_del - delete a subscription within a subscription list 194 * 195 * Called with subscriber port locked. 196 */ 197 198 static void subscr_del(struct subscription *sub) 199 { 200 tipc_nametbl_unsubscribe(sub); 201 list_del(&sub->subscription_list); 202 kfree(sub); 203 atomic_dec(&topsrv.subscription_count); 204 } 205 206 /** 207 * subscr_terminate - terminate communication with a subscriber 208 * 209 * Called with subscriber port locked. Routine must temporarily release lock 210 * to enable subscription timeout routine(s) to finish without deadlocking; 211 * the lock is then reclaimed to allow caller to release it upon return. 212 * (This should work even in the unlikely event some other thread creates 213 * a new object reference in the interim that uses this lock; this routine will 214 * simply wait for it to be released, then claim it.) 215 */ 216 217 static void subscr_terminate(struct subscriber *subscriber) 218 { 219 u32 port_ref; 220 struct subscription *sub; 221 struct subscription *sub_temp; 222 223 /* Invalidate subscriber reference */ 224 225 port_ref = subscriber->port_ref; 226 subscriber->port_ref = 0; 227 spin_unlock_bh(subscriber->lock); 228 229 /* Sever connection to subscriber */ 230 231 tipc_shutdown(port_ref); 232 tipc_deleteport(port_ref); 233 234 /* Destroy any existing subscriptions for subscriber */ 235 236 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 237 subscription_list) { 238 if (sub->timeout != TIPC_WAIT_FOREVER) { 239 k_cancel_timer(&sub->timer); 240 k_term_timer(&sub->timer); 241 } 242 dbg("Term: Removing sub %u,%u,%u from subscriber %x list\n", 243 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber); 244 subscr_del(sub); 245 } 246 247 /* Remove subscriber from topology server's subscriber list */ 248 249 spin_lock_bh(&topsrv.lock); 250 list_del(&subscriber->subscriber_list); 251 spin_unlock_bh(&topsrv.lock); 252 253 /* Reclaim subscriber lock */ 254 255 spin_lock_bh(subscriber->lock); 256 257 /* Now destroy subscriber */ 258 259 kfree(subscriber); 260 } 261 262 /** 263 * subscr_cancel - handle subscription cancellation request 264 * 265 * Called with subscriber port locked. Routine must temporarily release lock 266 * to enable the subscription timeout routine to finish without deadlocking; 267 * the lock is then reclaimed to allow caller to release it upon return. 268 * 269 * Note that fields of 's' use subscriber's endianness! 270 */ 271 272 static void subscr_cancel(struct tipc_subscr *s, 273 struct subscriber *subscriber) 274 { 275 struct subscription *sub; 276 struct subscription *sub_temp; 277 __u32 type, lower, upper; 278 int found = 0; 279 280 /* Find first matching subscription, exit if not found */ 281 282 type = ntohl(s->seq.type); 283 lower = ntohl(s->seq.lower); 284 upper = ntohl(s->seq.upper); 285 286 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 287 subscription_list) { 288 if ((type == sub->seq.type) && 289 (lower == sub->seq.lower) && 290 (upper == sub->seq.upper)) { 291 found = 1; 292 break; 293 } 294 } 295 if (!found) 296 return; 297 298 /* Cancel subscription timer (if used), then delete subscription */ 299 300 if (sub->timeout != TIPC_WAIT_FOREVER) { 301 sub->timeout = TIPC_WAIT_FOREVER; 302 spin_unlock_bh(subscriber->lock); 303 k_cancel_timer(&sub->timer); 304 k_term_timer(&sub->timer); 305 spin_lock_bh(subscriber->lock); 306 } 307 dbg("Cancel: removing sub %u,%u,%u from subscriber %x list\n", 308 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber); 309 subscr_del(sub); 310 } 311 312 /** 313 * subscr_subscribe - create subscription for subscriber 314 * 315 * Called with subscriber port locked. 316 */ 317 318 static struct subscription *subscr_subscribe(struct tipc_subscr *s, 319 struct subscriber *subscriber) 320 { 321 struct subscription *sub; 322 323 /* Detect & process a subscription cancellation request */ 324 325 if (ntohl(s->filter) & TIPC_SUB_CANCEL) { 326 subscr_cancel(s, subscriber); 327 return NULL; 328 } 329 330 /* Refuse subscription if global limit exceeded */ 331 332 if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) { 333 warn("Subscription rejected, subscription limit reached (%u)\n", 334 tipc_max_subscriptions); 335 subscr_terminate(subscriber); 336 return NULL; 337 } 338 339 /* Allocate subscription object */ 340 341 sub = kmalloc(sizeof(*sub), GFP_ATOMIC); 342 if (!sub) { 343 warn("Subscription rejected, no memory\n"); 344 subscr_terminate(subscriber); 345 return NULL; 346 } 347 348 /* Initialize subscription object */ 349 350 sub->seq.type = ntohl(s->seq.type); 351 sub->seq.lower = ntohl(s->seq.lower); 352 sub->seq.upper = ntohl(s->seq.upper); 353 sub->timeout = ntohl(s->timeout); 354 sub->filter = ntohl(s->filter); 355 if ((!(sub->filter & TIPC_SUB_PORTS) == 356 !(sub->filter & TIPC_SUB_SERVICE)) || 357 (sub->seq.lower > sub->seq.upper)) { 358 warn("Subscription rejected, illegal request\n"); 359 kfree(sub); 360 subscr_terminate(subscriber); 361 return NULL; 362 } 363 sub->event_cb = subscr_send_event; 364 INIT_LIST_HEAD(&sub->nameseq_list); 365 list_add(&sub->subscription_list, &subscriber->subscription_list); 366 sub->server_ref = subscriber->port_ref; 367 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); 368 atomic_inc(&topsrv.subscription_count); 369 if (sub->timeout != TIPC_WAIT_FOREVER) { 370 k_init_timer(&sub->timer, 371 (Handler)subscr_timeout, (unsigned long)sub); 372 k_start_timer(&sub->timer, sub->timeout); 373 } 374 375 return sub; 376 } 377 378 /** 379 * subscr_conn_shutdown_event - handle termination request from subscriber 380 * 381 * Called with subscriber's server port unlocked. 382 */ 383 384 static void subscr_conn_shutdown_event(void *usr_handle, 385 u32 port_ref, 386 struct sk_buff **buf, 387 unsigned char const *data, 388 unsigned int size, 389 int reason) 390 { 391 struct subscriber *subscriber = usr_handle; 392 spinlock_t *subscriber_lock; 393 394 if (tipc_port_lock(port_ref) == NULL) 395 return; 396 397 subscriber_lock = subscriber->lock; 398 subscr_terminate(subscriber); 399 spin_unlock_bh(subscriber_lock); 400 } 401 402 /** 403 * subscr_conn_msg_event - handle new subscription request from subscriber 404 * 405 * Called with subscriber's server port unlocked. 406 */ 407 408 static void subscr_conn_msg_event(void *usr_handle, 409 u32 port_ref, 410 struct sk_buff **buf, 411 const unchar *data, 412 u32 size) 413 { 414 struct subscriber *subscriber = usr_handle; 415 spinlock_t *subscriber_lock; 416 struct subscription *sub; 417 418 /* 419 * Lock subscriber's server port (& make a local copy of lock pointer, 420 * in case subscriber is deleted while processing subscription request) 421 */ 422 423 if (tipc_port_lock(port_ref) == NULL) 424 return; 425 426 subscriber_lock = subscriber->lock; 427 428 if (size != sizeof(struct tipc_subscr)) { 429 subscr_terminate(subscriber); 430 spin_unlock_bh(subscriber_lock); 431 } else { 432 sub = subscr_subscribe((struct tipc_subscr *)data, subscriber); 433 spin_unlock_bh(subscriber_lock); 434 if (sub != NULL) { 435 436 /* 437 * We must release the server port lock before adding a 438 * subscription to the name table since TIPC needs to be 439 * able to (re)acquire the port lock if an event message 440 * issued by the subscription process is rejected and 441 * returned. The subscription cannot be deleted while 442 * it is being added to the name table because: 443 * a) the single-threading of the native API port code 444 * ensures the subscription cannot be cancelled and 445 * the subscriber connection cannot be broken, and 446 * b) the name table lock ensures the subscription 447 * timeout code cannot delete the subscription, 448 * so the subscription object is still protected. 449 */ 450 451 tipc_nametbl_subscribe(sub); 452 } 453 } 454 } 455 456 /** 457 * subscr_named_msg_event - handle request to establish a new subscriber 458 */ 459 460 static void subscr_named_msg_event(void *usr_handle, 461 u32 port_ref, 462 struct sk_buff **buf, 463 const unchar *data, 464 u32 size, 465 u32 importance, 466 struct tipc_portid const *orig, 467 struct tipc_name_seq const *dest) 468 { 469 static struct iovec msg_sect = {NULL, 0}; 470 471 struct subscriber *subscriber; 472 u32 server_port_ref; 473 474 /* Create subscriber object */ 475 476 subscriber = kzalloc(sizeof(struct subscriber), GFP_ATOMIC); 477 if (subscriber == NULL) { 478 warn("Subscriber rejected, no memory\n"); 479 return; 480 } 481 INIT_LIST_HEAD(&subscriber->subscription_list); 482 INIT_LIST_HEAD(&subscriber->subscriber_list); 483 484 /* Create server port & establish connection to subscriber */ 485 486 tipc_createport(topsrv.user_ref, 487 subscriber, 488 importance, 489 NULL, 490 NULL, 491 subscr_conn_shutdown_event, 492 NULL, 493 NULL, 494 subscr_conn_msg_event, 495 NULL, 496 &subscriber->port_ref); 497 if (subscriber->port_ref == 0) { 498 warn("Subscriber rejected, unable to create port\n"); 499 kfree(subscriber); 500 return; 501 } 502 tipc_connect2port(subscriber->port_ref, orig); 503 504 /* Lock server port (& save lock address for future use) */ 505 506 subscriber->lock = tipc_port_lock(subscriber->port_ref)->publ.lock; 507 508 /* Add subscriber to topology server's subscriber list */ 509 510 spin_lock_bh(&topsrv.lock); 511 list_add(&subscriber->subscriber_list, &topsrv.subscriber_list); 512 spin_unlock_bh(&topsrv.lock); 513 514 /* Unlock server port */ 515 516 server_port_ref = subscriber->port_ref; 517 spin_unlock_bh(subscriber->lock); 518 519 /* Send an ACK- to complete connection handshaking */ 520 521 tipc_send(server_port_ref, 1, &msg_sect); 522 523 /* Handle optional subscription request */ 524 525 if (size != 0) { 526 subscr_conn_msg_event(subscriber, server_port_ref, 527 buf, data, size); 528 } 529 } 530 531 int tipc_subscr_start(void) 532 { 533 struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; 534 int res = -1; 535 536 memset(&topsrv, 0, sizeof (topsrv)); 537 spin_lock_init(&topsrv.lock); 538 INIT_LIST_HEAD(&topsrv.subscriber_list); 539 540 spin_lock_bh(&topsrv.lock); 541 res = tipc_attach(&topsrv.user_ref, NULL, NULL); 542 if (res) { 543 spin_unlock_bh(&topsrv.lock); 544 return res; 545 } 546 547 res = tipc_createport(topsrv.user_ref, 548 NULL, 549 TIPC_CRITICAL_IMPORTANCE, 550 NULL, 551 NULL, 552 NULL, 553 NULL, 554 subscr_named_msg_event, 555 NULL, 556 NULL, 557 &topsrv.setup_port); 558 if (res) 559 goto failed; 560 561 res = tipc_nametbl_publish_rsv(topsrv.setup_port, TIPC_NODE_SCOPE, &seq); 562 if (res) 563 goto failed; 564 565 spin_unlock_bh(&topsrv.lock); 566 return 0; 567 568 failed: 569 err("Failed to create subscription service\n"); 570 tipc_detach(topsrv.user_ref); 571 topsrv.user_ref = 0; 572 spin_unlock_bh(&topsrv.lock); 573 return res; 574 } 575 576 void tipc_subscr_stop(void) 577 { 578 struct subscriber *subscriber; 579 struct subscriber *subscriber_temp; 580 spinlock_t *subscriber_lock; 581 582 if (topsrv.user_ref) { 583 tipc_deleteport(topsrv.setup_port); 584 list_for_each_entry_safe(subscriber, subscriber_temp, 585 &topsrv.subscriber_list, 586 subscriber_list) { 587 subscriber_lock = subscriber->lock; 588 spin_lock_bh(subscriber_lock); 589 subscr_terminate(subscriber); 590 spin_unlock_bh(subscriber_lock); 591 } 592 tipc_detach(topsrv.user_ref); 593 topsrv.user_ref = 0; 594 } 595 } 596 597 598 int tipc_ispublished(struct tipc_name const *name) 599 { 600 u32 domain = 0; 601 602 return(tipc_nametbl_translate(name->type, name->instance,&domain) != 0); 603 } 604 605