1 /* 2 * net/tipc/subscr.c: TIPC network topology service 3 * 4 * Copyright (c) 2000-2006, Ericsson AB 5 * Copyright (c) 2005-2007, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37 #include "core.h" 38 #include "dbg.h" 39 #include "name_table.h" 40 #include "port.h" 41 #include "ref.h" 42 #include "subscr.h" 43 44 /** 45 * struct subscriber - TIPC network topology subscriber 46 * @port_ref: object reference to server port connecting to subscriber 47 * @lock: pointer to spinlock controlling access to subscriber's server port 48 * @subscriber_list: adjacent subscribers in top. server's list of subscribers 49 * @subscription_list: list of subscription objects for this subscriber 50 */ 51 52 struct subscriber { 53 u32 port_ref; 54 spinlock_t *lock; 55 struct list_head subscriber_list; 56 struct list_head subscription_list; 57 }; 58 59 /** 60 * struct top_srv - TIPC network topology subscription service 61 * @user_ref: TIPC userid of subscription service 62 * @setup_port: reference to TIPC port that handles subscription requests 63 * @subscription_count: number of active subscriptions (not subscribers!) 64 * @subscriber_list: list of ports subscribing to service 65 * @lock: spinlock govering access to subscriber list 66 */ 67 68 struct top_srv { 69 u32 user_ref; 70 u32 setup_port; 71 atomic_t subscription_count; 72 struct list_head subscriber_list; 73 spinlock_t lock; 74 }; 75 76 static struct top_srv topsrv = { 0 }; 77 78 /** 79 * htohl - convert value to endianness used by destination 80 * @in: value to convert 81 * @swap: non-zero if endianness must be reversed 82 * 83 * Returns converted value 84 */ 85 86 static u32 htohl(u32 in, int swap) 87 { 88 return swap ? swab32(in) : in; 89 } 90 91 /** 92 * subscr_send_event - send a message containing a tipc_event to the subscriber 93 * 94 * Note: Must not hold subscriber's server port lock, since tipc_send() will 95 * try to take the lock if the message is rejected and returned! 96 */ 97 98 static void subscr_send_event(struct subscription *sub, 99 u32 found_lower, 100 u32 found_upper, 101 u32 event, 102 u32 port_ref, 103 u32 node) 104 { 105 struct iovec msg_sect; 106 107 msg_sect.iov_base = (void *)&sub->evt; 108 msg_sect.iov_len = sizeof(struct tipc_event); 109 110 sub->evt.event = htohl(event, sub->swap); 111 sub->evt.found_lower = htohl(found_lower, sub->swap); 112 sub->evt.found_upper = htohl(found_upper, sub->swap); 113 sub->evt.port.ref = htohl(port_ref, sub->swap); 114 sub->evt.port.node = htohl(node, sub->swap); 115 tipc_send(sub->server_ref, 1, &msg_sect); 116 } 117 118 /** 119 * tipc_subscr_overlap - test for subscription overlap with the given values 120 * 121 * Returns 1 if there is overlap, otherwise 0. 122 */ 123 124 int tipc_subscr_overlap(struct subscription *sub, 125 u32 found_lower, 126 u32 found_upper) 127 128 { 129 if (found_lower < sub->seq.lower) 130 found_lower = sub->seq.lower; 131 if (found_upper > sub->seq.upper) 132 found_upper = sub->seq.upper; 133 if (found_lower > found_upper) 134 return 0; 135 return 1; 136 } 137 138 /** 139 * tipc_subscr_report_overlap - issue event if there is subscription overlap 140 * 141 * Protected by nameseq.lock in name_table.c 142 */ 143 144 void tipc_subscr_report_overlap(struct subscription *sub, 145 u32 found_lower, 146 u32 found_upper, 147 u32 event, 148 u32 port_ref, 149 u32 node, 150 int must) 151 { 152 if (!tipc_subscr_overlap(sub, found_lower, found_upper)) 153 return; 154 if (!must && !(sub->filter & TIPC_SUB_PORTS)) 155 return; 156 157 sub->event_cb(sub, found_lower, found_upper, event, port_ref, node); 158 } 159 160 /** 161 * subscr_timeout - subscription timeout has occurred 162 */ 163 164 static void subscr_timeout(struct subscription *sub) 165 { 166 struct port *server_port; 167 168 /* Validate server port reference (in case subscriber is terminating) */ 169 170 server_port = tipc_port_lock(sub->server_ref); 171 if (server_port == NULL) 172 return; 173 174 /* Validate timeout (in case subscription is being cancelled) */ 175 176 if (sub->timeout == TIPC_WAIT_FOREVER) { 177 tipc_port_unlock(server_port); 178 return; 179 } 180 181 /* Unlink subscription from name table */ 182 183 tipc_nametbl_unsubscribe(sub); 184 185 /* Unlink subscription from subscriber */ 186 187 list_del(&sub->subscription_list); 188 189 /* Release subscriber's server port */ 190 191 tipc_port_unlock(server_port); 192 193 /* Notify subscriber of timeout */ 194 195 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, 196 TIPC_SUBSCR_TIMEOUT, 0, 0); 197 198 /* Now destroy subscription */ 199 200 k_term_timer(&sub->timer); 201 kfree(sub); 202 atomic_dec(&topsrv.subscription_count); 203 } 204 205 /** 206 * subscr_del - delete a subscription within a subscription list 207 * 208 * Called with subscriber port locked. 209 */ 210 211 static void subscr_del(struct subscription *sub) 212 { 213 tipc_nametbl_unsubscribe(sub); 214 list_del(&sub->subscription_list); 215 kfree(sub); 216 atomic_dec(&topsrv.subscription_count); 217 } 218 219 /** 220 * subscr_terminate - terminate communication with a subscriber 221 * 222 * Called with subscriber port locked. Routine must temporarily release lock 223 * to enable subscription timeout routine(s) to finish without deadlocking; 224 * the lock is then reclaimed to allow caller to release it upon return. 225 * (This should work even in the unlikely event some other thread creates 226 * a new object reference in the interim that uses this lock; this routine will 227 * simply wait for it to be released, then claim it.) 228 */ 229 230 static void subscr_terminate(struct subscriber *subscriber) 231 { 232 u32 port_ref; 233 struct subscription *sub; 234 struct subscription *sub_temp; 235 236 /* Invalidate subscriber reference */ 237 238 port_ref = subscriber->port_ref; 239 subscriber->port_ref = 0; 240 spin_unlock_bh(subscriber->lock); 241 242 /* Sever connection to subscriber */ 243 244 tipc_shutdown(port_ref); 245 tipc_deleteport(port_ref); 246 247 /* Destroy any existing subscriptions for subscriber */ 248 249 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 250 subscription_list) { 251 if (sub->timeout != TIPC_WAIT_FOREVER) { 252 k_cancel_timer(&sub->timer); 253 k_term_timer(&sub->timer); 254 } 255 dbg("Term: Removing sub %u,%u,%u from subscriber %x list\n", 256 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber); 257 subscr_del(sub); 258 } 259 260 /* Remove subscriber from topology server's subscriber list */ 261 262 spin_lock_bh(&topsrv.lock); 263 list_del(&subscriber->subscriber_list); 264 spin_unlock_bh(&topsrv.lock); 265 266 /* Reclaim subscriber lock */ 267 268 spin_lock_bh(subscriber->lock); 269 270 /* Now destroy subscriber */ 271 272 kfree(subscriber); 273 } 274 275 /** 276 * subscr_cancel - handle subscription cancellation request 277 * 278 * Called with subscriber port locked. Routine must temporarily release lock 279 * to enable the subscription timeout routine to finish without deadlocking; 280 * the lock is then reclaimed to allow caller to release it upon return. 281 * 282 * Note that fields of 's' use subscriber's endianness! 283 */ 284 285 static void subscr_cancel(struct tipc_subscr *s, 286 struct subscriber *subscriber) 287 { 288 struct subscription *sub; 289 struct subscription *sub_temp; 290 int found = 0; 291 292 /* Find first matching subscription, exit if not found */ 293 294 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 295 subscription_list) { 296 if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { 297 found = 1; 298 break; 299 } 300 } 301 if (!found) 302 return; 303 304 /* Cancel subscription timer (if used), then delete subscription */ 305 306 if (sub->timeout != TIPC_WAIT_FOREVER) { 307 sub->timeout = TIPC_WAIT_FOREVER; 308 spin_unlock_bh(subscriber->lock); 309 k_cancel_timer(&sub->timer); 310 k_term_timer(&sub->timer); 311 spin_lock_bh(subscriber->lock); 312 } 313 dbg("Cancel: removing sub %u,%u,%u from subscriber %x list\n", 314 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber); 315 subscr_del(sub); 316 } 317 318 /** 319 * subscr_subscribe - create subscription for subscriber 320 * 321 * Called with subscriber port locked. 322 */ 323 324 static struct subscription *subscr_subscribe(struct tipc_subscr *s, 325 struct subscriber *subscriber) 326 { 327 struct subscription *sub; 328 int swap; 329 330 /* Determine subscriber's endianness */ 331 332 swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE)); 333 334 /* Detect & process a subscription cancellation request */ 335 336 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { 337 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); 338 subscr_cancel(s, subscriber); 339 return NULL; 340 } 341 342 /* Refuse subscription if global limit exceeded */ 343 344 if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) { 345 warn("Subscription rejected, subscription limit reached (%u)\n", 346 tipc_max_subscriptions); 347 subscr_terminate(subscriber); 348 return NULL; 349 } 350 351 /* Allocate subscription object */ 352 353 sub = kmalloc(sizeof(*sub), GFP_ATOMIC); 354 if (!sub) { 355 warn("Subscription rejected, no memory\n"); 356 subscr_terminate(subscriber); 357 return NULL; 358 } 359 360 /* Initialize subscription object */ 361 362 sub->seq.type = htohl(s->seq.type, swap); 363 sub->seq.lower = htohl(s->seq.lower, swap); 364 sub->seq.upper = htohl(s->seq.upper, swap); 365 sub->timeout = htohl(s->timeout, swap); 366 sub->filter = htohl(s->filter, swap); 367 if ((!(sub->filter & TIPC_SUB_PORTS) 368 == !(sub->filter & TIPC_SUB_SERVICE)) 369 || (sub->seq.lower > sub->seq.upper)) { 370 warn("Subscription rejected, illegal request\n"); 371 kfree(sub); 372 subscr_terminate(subscriber); 373 return NULL; 374 } 375 sub->event_cb = subscr_send_event; 376 INIT_LIST_HEAD(&sub->nameseq_list); 377 list_add(&sub->subscription_list, &subscriber->subscription_list); 378 sub->server_ref = subscriber->port_ref; 379 sub->swap = swap; 380 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); 381 atomic_inc(&topsrv.subscription_count); 382 if (sub->timeout != TIPC_WAIT_FOREVER) { 383 k_init_timer(&sub->timer, 384 (Handler)subscr_timeout, (unsigned long)sub); 385 k_start_timer(&sub->timer, sub->timeout); 386 } 387 388 return sub; 389 } 390 391 /** 392 * subscr_conn_shutdown_event - handle termination request from subscriber 393 * 394 * Called with subscriber's server port unlocked. 395 */ 396 397 static void subscr_conn_shutdown_event(void *usr_handle, 398 u32 port_ref, 399 struct sk_buff **buf, 400 unsigned char const *data, 401 unsigned int size, 402 int reason) 403 { 404 struct subscriber *subscriber = usr_handle; 405 spinlock_t *subscriber_lock; 406 407 if (tipc_port_lock(port_ref) == NULL) 408 return; 409 410 subscriber_lock = subscriber->lock; 411 subscr_terminate(subscriber); 412 spin_unlock_bh(subscriber_lock); 413 } 414 415 /** 416 * subscr_conn_msg_event - handle new subscription request from subscriber 417 * 418 * Called with subscriber's server port unlocked. 419 */ 420 421 static void subscr_conn_msg_event(void *usr_handle, 422 u32 port_ref, 423 struct sk_buff **buf, 424 const unchar *data, 425 u32 size) 426 { 427 struct subscriber *subscriber = usr_handle; 428 spinlock_t *subscriber_lock; 429 struct subscription *sub; 430 431 /* 432 * Lock subscriber's server port (& make a local copy of lock pointer, 433 * in case subscriber is deleted while processing subscription request) 434 */ 435 436 if (tipc_port_lock(port_ref) == NULL) 437 return; 438 439 subscriber_lock = subscriber->lock; 440 441 if (size != sizeof(struct tipc_subscr)) { 442 subscr_terminate(subscriber); 443 spin_unlock_bh(subscriber_lock); 444 } else { 445 sub = subscr_subscribe((struct tipc_subscr *)data, subscriber); 446 spin_unlock_bh(subscriber_lock); 447 if (sub != NULL) { 448 449 /* 450 * We must release the server port lock before adding a 451 * subscription to the name table since TIPC needs to be 452 * able to (re)acquire the port lock if an event message 453 * issued by the subscription process is rejected and 454 * returned. The subscription cannot be deleted while 455 * it is being added to the name table because: 456 * a) the single-threading of the native API port code 457 * ensures the subscription cannot be cancelled and 458 * the subscriber connection cannot be broken, and 459 * b) the name table lock ensures the subscription 460 * timeout code cannot delete the subscription, 461 * so the subscription object is still protected. 462 */ 463 464 tipc_nametbl_subscribe(sub); 465 } 466 } 467 } 468 469 /** 470 * subscr_named_msg_event - handle request to establish a new subscriber 471 */ 472 473 static void subscr_named_msg_event(void *usr_handle, 474 u32 port_ref, 475 struct sk_buff **buf, 476 const unchar *data, 477 u32 size, 478 u32 importance, 479 struct tipc_portid const *orig, 480 struct tipc_name_seq const *dest) 481 { 482 static struct iovec msg_sect = {NULL, 0}; 483 484 struct subscriber *subscriber; 485 u32 server_port_ref; 486 487 /* Create subscriber object */ 488 489 subscriber = kzalloc(sizeof(struct subscriber), GFP_ATOMIC); 490 if (subscriber == NULL) { 491 warn("Subscriber rejected, no memory\n"); 492 return; 493 } 494 INIT_LIST_HEAD(&subscriber->subscription_list); 495 INIT_LIST_HEAD(&subscriber->subscriber_list); 496 497 /* Create server port & establish connection to subscriber */ 498 499 tipc_createport(topsrv.user_ref, 500 subscriber, 501 importance, 502 NULL, 503 NULL, 504 subscr_conn_shutdown_event, 505 NULL, 506 NULL, 507 subscr_conn_msg_event, 508 NULL, 509 &subscriber->port_ref); 510 if (subscriber->port_ref == 0) { 511 warn("Subscriber rejected, unable to create port\n"); 512 kfree(subscriber); 513 return; 514 } 515 tipc_connect2port(subscriber->port_ref, orig); 516 517 /* Lock server port (& save lock address for future use) */ 518 519 subscriber->lock = tipc_port_lock(subscriber->port_ref)->publ.lock; 520 521 /* Add subscriber to topology server's subscriber list */ 522 523 spin_lock_bh(&topsrv.lock); 524 list_add(&subscriber->subscriber_list, &topsrv.subscriber_list); 525 spin_unlock_bh(&topsrv.lock); 526 527 /* Unlock server port */ 528 529 server_port_ref = subscriber->port_ref; 530 spin_unlock_bh(subscriber->lock); 531 532 /* Send an ACK- to complete connection handshaking */ 533 534 tipc_send(server_port_ref, 1, &msg_sect); 535 536 /* Handle optional subscription request */ 537 538 if (size != 0) { 539 subscr_conn_msg_event(subscriber, server_port_ref, 540 buf, data, size); 541 } 542 } 543 544 int tipc_subscr_start(void) 545 { 546 struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; 547 int res = -1; 548 549 memset(&topsrv, 0, sizeof (topsrv)); 550 spin_lock_init(&topsrv.lock); 551 INIT_LIST_HEAD(&topsrv.subscriber_list); 552 553 spin_lock_bh(&topsrv.lock); 554 res = tipc_attach(&topsrv.user_ref, NULL, NULL); 555 if (res) { 556 spin_unlock_bh(&topsrv.lock); 557 return res; 558 } 559 560 res = tipc_createport(topsrv.user_ref, 561 NULL, 562 TIPC_CRITICAL_IMPORTANCE, 563 NULL, 564 NULL, 565 NULL, 566 NULL, 567 subscr_named_msg_event, 568 NULL, 569 NULL, 570 &topsrv.setup_port); 571 if (res) 572 goto failed; 573 574 res = tipc_nametbl_publish_rsv(topsrv.setup_port, TIPC_NODE_SCOPE, &seq); 575 if (res) 576 goto failed; 577 578 spin_unlock_bh(&topsrv.lock); 579 return 0; 580 581 failed: 582 err("Failed to create subscription service\n"); 583 tipc_detach(topsrv.user_ref); 584 topsrv.user_ref = 0; 585 spin_unlock_bh(&topsrv.lock); 586 return res; 587 } 588 589 void tipc_subscr_stop(void) 590 { 591 struct subscriber *subscriber; 592 struct subscriber *subscriber_temp; 593 spinlock_t *subscriber_lock; 594 595 if (topsrv.user_ref) { 596 tipc_deleteport(topsrv.setup_port); 597 list_for_each_entry_safe(subscriber, subscriber_temp, 598 &topsrv.subscriber_list, 599 subscriber_list) { 600 subscriber_lock = subscriber->lock; 601 spin_lock_bh(subscriber_lock); 602 subscr_terminate(subscriber); 603 spin_unlock_bh(subscriber_lock); 604 } 605 tipc_detach(topsrv.user_ref); 606 topsrv.user_ref = 0; 607 } 608 } 609 610 611 int tipc_ispublished(struct tipc_name const *name) 612 { 613 u32 domain = 0; 614 615 return(tipc_nametbl_translate(name->type, name->instance,&domain) != 0); 616 } 617 618