1 /* 2 * net/tipc/subscr.c: TIPC network topology service 3 * 4 * Copyright (c) 2000-2006, Ericsson AB 5 * Copyright (c) 2005-2007, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37 #include "core.h" 38 #include "dbg.h" 39 #include "name_table.h" 40 #include "port.h" 41 #include "ref.h" 42 #include "subscr.h" 43 44 /** 45 * struct subscriber - TIPC network topology subscriber 46 * @port_ref: object reference to server port connecting to subscriber 47 * @lock: pointer to spinlock controlling access to subscriber's server port 48 * @subscriber_list: adjacent subscribers in top. server's list of subscribers 49 * @subscription_list: list of subscription objects for this subscriber 50 */ 51 52 struct subscriber { 53 u32 port_ref; 54 spinlock_t *lock; 55 struct list_head subscriber_list; 56 struct list_head subscription_list; 57 }; 58 59 /** 60 * struct top_srv - TIPC network topology subscription service 61 * @user_ref: TIPC userid of subscription service 62 * @setup_port: reference to TIPC port that handles subscription requests 63 * @subscription_count: number of active subscriptions (not subscribers!) 64 * @subscriber_list: list of ports subscribing to service 65 * @lock: spinlock govering access to subscriber list 66 */ 67 68 struct top_srv { 69 u32 user_ref; 70 u32 setup_port; 71 atomic_t subscription_count; 72 struct list_head subscriber_list; 73 spinlock_t lock; 74 }; 75 76 static struct top_srv topsrv = { 0 }; 77 78 /** 79 * subscr_send_event - send a message containing a tipc_event to the subscriber 80 * 81 * Note: Must not hold subscriber's server port lock, since tipc_send() will 82 * try to take the lock if the message is rejected and returned! 83 */ 84 85 static void subscr_send_event(struct subscription *sub, 86 u32 found_lower, 87 u32 found_upper, 88 u32 event, 89 u32 port_ref, 90 u32 node) 91 { 92 struct iovec msg_sect; 93 94 msg_sect.iov_base = (void *)&sub->evt; 95 msg_sect.iov_len = sizeof(struct tipc_event); 96 97 sub->evt.event = htonl(event); 98 sub->evt.found_lower = htonl(found_lower); 99 sub->evt.found_upper = htonl(found_upper); 100 sub->evt.port.ref = htonl(port_ref); 101 sub->evt.port.node = htonl(node); 102 tipc_send(sub->server_ref, 1, &msg_sect); 103 } 104 105 /** 106 * tipc_subscr_overlap - test for subscription overlap with the given values 107 * 108 * Returns 1 if there is overlap, otherwise 0. 109 */ 110 111 int tipc_subscr_overlap(struct subscription *sub, 112 u32 found_lower, 113 u32 found_upper) 114 115 { 116 if (found_lower < sub->seq.lower) 117 found_lower = sub->seq.lower; 118 if (found_upper > sub->seq.upper) 119 found_upper = sub->seq.upper; 120 if (found_lower > found_upper) 121 return 0; 122 return 1; 123 } 124 125 /** 126 * tipc_subscr_report_overlap - issue event if there is subscription overlap 127 * 128 * Protected by nameseq.lock in name_table.c 129 */ 130 131 void tipc_subscr_report_overlap(struct subscription *sub, 132 u32 found_lower, 133 u32 found_upper, 134 u32 event, 135 u32 port_ref, 136 u32 node, 137 int must) 138 { 139 if (!tipc_subscr_overlap(sub, found_lower, found_upper)) 140 return; 141 if (!must && !(sub->filter & TIPC_SUB_PORTS)) 142 return; 143 144 sub->event_cb(sub, found_lower, found_upper, event, port_ref, node); 145 } 146 147 /** 148 * subscr_timeout - subscription timeout has occurred 149 */ 150 151 static void subscr_timeout(struct subscription *sub) 152 { 153 struct port *server_port; 154 155 /* Validate server port reference (in case subscriber is terminating) */ 156 157 server_port = tipc_port_lock(sub->server_ref); 158 if (server_port == NULL) 159 return; 160 161 /* Validate timeout (in case subscription is being cancelled) */ 162 163 if (sub->timeout == TIPC_WAIT_FOREVER) { 164 tipc_port_unlock(server_port); 165 return; 166 } 167 168 /* Unlink subscription from name table */ 169 170 tipc_nametbl_unsubscribe(sub); 171 172 /* Unlink subscription from subscriber */ 173 174 list_del(&sub->subscription_list); 175 176 /* Release subscriber's server port */ 177 178 tipc_port_unlock(server_port); 179 180 /* Notify subscriber of timeout */ 181 182 subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, 183 TIPC_SUBSCR_TIMEOUT, 0, 0); 184 185 /* Now destroy subscription */ 186 187 k_term_timer(&sub->timer); 188 kfree(sub); 189 atomic_dec(&topsrv.subscription_count); 190 } 191 192 /** 193 * subscr_del - delete a subscription within a subscription list 194 * 195 * Called with subscriber port locked. 196 */ 197 198 static void subscr_del(struct subscription *sub) 199 { 200 tipc_nametbl_unsubscribe(sub); 201 list_del(&sub->subscription_list); 202 kfree(sub); 203 atomic_dec(&topsrv.subscription_count); 204 } 205 206 /** 207 * subscr_terminate - terminate communication with a subscriber 208 * 209 * Called with subscriber port locked. Routine must temporarily release lock 210 * to enable subscription timeout routine(s) to finish without deadlocking; 211 * the lock is then reclaimed to allow caller to release it upon return. 212 * (This should work even in the unlikely event some other thread creates 213 * a new object reference in the interim that uses this lock; this routine will 214 * simply wait for it to be released, then claim it.) 215 */ 216 217 static void subscr_terminate(struct subscriber *subscriber) 218 { 219 u32 port_ref; 220 struct subscription *sub; 221 struct subscription *sub_temp; 222 223 /* Invalidate subscriber reference */ 224 225 port_ref = subscriber->port_ref; 226 subscriber->port_ref = 0; 227 spin_unlock_bh(subscriber->lock); 228 229 /* Sever connection to subscriber */ 230 231 tipc_shutdown(port_ref); 232 tipc_deleteport(port_ref); 233 234 /* Destroy any existing subscriptions for subscriber */ 235 236 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 237 subscription_list) { 238 if (sub->timeout != TIPC_WAIT_FOREVER) { 239 k_cancel_timer(&sub->timer); 240 k_term_timer(&sub->timer); 241 } 242 dbg("Term: Removing sub %u,%u,%u from subscriber %x list\n", 243 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber); 244 subscr_del(sub); 245 } 246 247 /* Remove subscriber from topology server's subscriber list */ 248 249 spin_lock_bh(&topsrv.lock); 250 list_del(&subscriber->subscriber_list); 251 spin_unlock_bh(&topsrv.lock); 252 253 /* Reclaim subscriber lock */ 254 255 spin_lock_bh(subscriber->lock); 256 257 /* Now destroy subscriber */ 258 259 kfree(subscriber); 260 } 261 262 /** 263 * subscr_cancel - handle subscription cancellation request 264 * 265 * Called with subscriber port locked. Routine must temporarily release lock 266 * to enable the subscription timeout routine to finish without deadlocking; 267 * the lock is then reclaimed to allow caller to release it upon return. 268 * 269 * Note that fields of 's' use subscriber's endianness! 270 */ 271 272 static void subscr_cancel(struct tipc_subscr *s, 273 struct subscriber *subscriber) 274 { 275 struct subscription *sub; 276 struct subscription *sub_temp; 277 __u32 type, lower, upper, timeout, filter; 278 int found = 0; 279 280 /* Find first matching subscription, exit if not found */ 281 282 type = ntohl(s->seq.type); 283 lower = ntohl(s->seq.lower); 284 upper = ntohl(s->seq.upper); 285 timeout = ntohl(s->timeout); 286 filter = ntohl(s->filter) & ~TIPC_SUB_CANCEL; 287 288 list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, 289 subscription_list) { 290 if ((type == sub->seq.type) && 291 (lower == sub->seq.lower) && 292 (upper == sub->seq.upper) && 293 (timeout == sub->timeout) && 294 (filter == sub->filter) && 295 !memcmp(s->usr_handle,sub->evt.s.usr_handle, 296 sizeof(s->usr_handle)) ){ 297 found = 1; 298 break; 299 } 300 } 301 if (!found) 302 return; 303 304 /* Cancel subscription timer (if used), then delete subscription */ 305 306 if (sub->timeout != TIPC_WAIT_FOREVER) { 307 sub->timeout = TIPC_WAIT_FOREVER; 308 spin_unlock_bh(subscriber->lock); 309 k_cancel_timer(&sub->timer); 310 k_term_timer(&sub->timer); 311 spin_lock_bh(subscriber->lock); 312 } 313 dbg("Cancel: removing sub %u,%u,%u from subscriber %p list\n", 314 sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber); 315 subscr_del(sub); 316 } 317 318 /** 319 * subscr_subscribe - create subscription for subscriber 320 * 321 * Called with subscriber port locked. 322 */ 323 324 static struct subscription *subscr_subscribe(struct tipc_subscr *s, 325 struct subscriber *subscriber) 326 { 327 struct subscription *sub; 328 329 /* Detect & process a subscription cancellation request */ 330 331 if (ntohl(s->filter) & TIPC_SUB_CANCEL) { 332 subscr_cancel(s, subscriber); 333 return NULL; 334 } 335 336 /* Refuse subscription if global limit exceeded */ 337 338 if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) { 339 warn("Subscription rejected, subscription limit reached (%u)\n", 340 tipc_max_subscriptions); 341 subscr_terminate(subscriber); 342 return NULL; 343 } 344 345 /* Allocate subscription object */ 346 347 sub = kmalloc(sizeof(*sub), GFP_ATOMIC); 348 if (!sub) { 349 warn("Subscription rejected, no memory\n"); 350 subscr_terminate(subscriber); 351 return NULL; 352 } 353 354 /* Initialize subscription object */ 355 356 sub->seq.type = ntohl(s->seq.type); 357 sub->seq.lower = ntohl(s->seq.lower); 358 sub->seq.upper = ntohl(s->seq.upper); 359 sub->timeout = ntohl(s->timeout); 360 sub->filter = ntohl(s->filter); 361 if ((sub->filter && (sub->filter != TIPC_SUB_PORTS)) || 362 (sub->seq.lower > sub->seq.upper)) { 363 warn("Subscription rejected, illegal request\n"); 364 kfree(sub); 365 subscr_terminate(subscriber); 366 return NULL; 367 } 368 sub->event_cb = subscr_send_event; 369 INIT_LIST_HEAD(&sub->nameseq_list); 370 list_add(&sub->subscription_list, &subscriber->subscription_list); 371 sub->server_ref = subscriber->port_ref; 372 memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); 373 atomic_inc(&topsrv.subscription_count); 374 if (sub->timeout != TIPC_WAIT_FOREVER) { 375 k_init_timer(&sub->timer, 376 (Handler)subscr_timeout, (unsigned long)sub); 377 k_start_timer(&sub->timer, sub->timeout); 378 } 379 380 return sub; 381 } 382 383 /** 384 * subscr_conn_shutdown_event - handle termination request from subscriber 385 * 386 * Called with subscriber's server port unlocked. 387 */ 388 389 static void subscr_conn_shutdown_event(void *usr_handle, 390 u32 port_ref, 391 struct sk_buff **buf, 392 unsigned char const *data, 393 unsigned int size, 394 int reason) 395 { 396 struct subscriber *subscriber = usr_handle; 397 spinlock_t *subscriber_lock; 398 399 if (tipc_port_lock(port_ref) == NULL) 400 return; 401 402 subscriber_lock = subscriber->lock; 403 subscr_terminate(subscriber); 404 spin_unlock_bh(subscriber_lock); 405 } 406 407 /** 408 * subscr_conn_msg_event - handle new subscription request from subscriber 409 * 410 * Called with subscriber's server port unlocked. 411 */ 412 413 static void subscr_conn_msg_event(void *usr_handle, 414 u32 port_ref, 415 struct sk_buff **buf, 416 const unchar *data, 417 u32 size) 418 { 419 struct subscriber *subscriber = usr_handle; 420 spinlock_t *subscriber_lock; 421 struct subscription *sub; 422 423 /* 424 * Lock subscriber's server port (& make a local copy of lock pointer, 425 * in case subscriber is deleted while processing subscription request) 426 */ 427 428 if (tipc_port_lock(port_ref) == NULL) 429 return; 430 431 subscriber_lock = subscriber->lock; 432 433 if (size != sizeof(struct tipc_subscr)) { 434 subscr_terminate(subscriber); 435 spin_unlock_bh(subscriber_lock); 436 } else { 437 sub = subscr_subscribe((struct tipc_subscr *)data, subscriber); 438 spin_unlock_bh(subscriber_lock); 439 if (sub != NULL) { 440 441 /* 442 * We must release the server port lock before adding a 443 * subscription to the name table since TIPC needs to be 444 * able to (re)acquire the port lock if an event message 445 * issued by the subscription process is rejected and 446 * returned. The subscription cannot be deleted while 447 * it is being added to the name table because: 448 * a) the single-threading of the native API port code 449 * ensures the subscription cannot be cancelled and 450 * the subscriber connection cannot be broken, and 451 * b) the name table lock ensures the subscription 452 * timeout code cannot delete the subscription, 453 * so the subscription object is still protected. 454 */ 455 456 tipc_nametbl_subscribe(sub); 457 } 458 } 459 } 460 461 /** 462 * subscr_named_msg_event - handle request to establish a new subscriber 463 */ 464 465 static void subscr_named_msg_event(void *usr_handle, 466 u32 port_ref, 467 struct sk_buff **buf, 468 const unchar *data, 469 u32 size, 470 u32 importance, 471 struct tipc_portid const *orig, 472 struct tipc_name_seq const *dest) 473 { 474 static struct iovec msg_sect = {NULL, 0}; 475 476 struct subscriber *subscriber; 477 u32 server_port_ref; 478 479 /* Create subscriber object */ 480 481 subscriber = kzalloc(sizeof(struct subscriber), GFP_ATOMIC); 482 if (subscriber == NULL) { 483 warn("Subscriber rejected, no memory\n"); 484 return; 485 } 486 INIT_LIST_HEAD(&subscriber->subscription_list); 487 INIT_LIST_HEAD(&subscriber->subscriber_list); 488 489 /* Create server port & establish connection to subscriber */ 490 491 tipc_createport(topsrv.user_ref, 492 subscriber, 493 importance, 494 NULL, 495 NULL, 496 subscr_conn_shutdown_event, 497 NULL, 498 NULL, 499 subscr_conn_msg_event, 500 NULL, 501 &subscriber->port_ref); 502 if (subscriber->port_ref == 0) { 503 warn("Subscriber rejected, unable to create port\n"); 504 kfree(subscriber); 505 return; 506 } 507 tipc_connect2port(subscriber->port_ref, orig); 508 509 /* Lock server port (& save lock address for future use) */ 510 511 subscriber->lock = tipc_port_lock(subscriber->port_ref)->publ.lock; 512 513 /* Add subscriber to topology server's subscriber list */ 514 515 spin_lock_bh(&topsrv.lock); 516 list_add(&subscriber->subscriber_list, &topsrv.subscriber_list); 517 spin_unlock_bh(&topsrv.lock); 518 519 /* Unlock server port */ 520 521 server_port_ref = subscriber->port_ref; 522 spin_unlock_bh(subscriber->lock); 523 524 /* Send an ACK- to complete connection handshaking */ 525 526 tipc_send(server_port_ref, 1, &msg_sect); 527 528 /* Handle optional subscription request */ 529 530 if (size != 0) { 531 subscr_conn_msg_event(subscriber, server_port_ref, 532 buf, data, size); 533 } 534 } 535 536 int tipc_subscr_start(void) 537 { 538 struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; 539 int res = -1; 540 541 memset(&topsrv, 0, sizeof (topsrv)); 542 spin_lock_init(&topsrv.lock); 543 INIT_LIST_HEAD(&topsrv.subscriber_list); 544 545 spin_lock_bh(&topsrv.lock); 546 res = tipc_attach(&topsrv.user_ref, NULL, NULL); 547 if (res) { 548 spin_unlock_bh(&topsrv.lock); 549 return res; 550 } 551 552 res = tipc_createport(topsrv.user_ref, 553 NULL, 554 TIPC_CRITICAL_IMPORTANCE, 555 NULL, 556 NULL, 557 NULL, 558 NULL, 559 subscr_named_msg_event, 560 NULL, 561 NULL, 562 &topsrv.setup_port); 563 if (res) 564 goto failed; 565 566 res = tipc_nametbl_publish_rsv(topsrv.setup_port, TIPC_NODE_SCOPE, &seq); 567 if (res) 568 goto failed; 569 570 spin_unlock_bh(&topsrv.lock); 571 return 0; 572 573 failed: 574 err("Failed to create subscription service\n"); 575 tipc_detach(topsrv.user_ref); 576 topsrv.user_ref = 0; 577 spin_unlock_bh(&topsrv.lock); 578 return res; 579 } 580 581 void tipc_subscr_stop(void) 582 { 583 struct subscriber *subscriber; 584 struct subscriber *subscriber_temp; 585 spinlock_t *subscriber_lock; 586 587 if (topsrv.user_ref) { 588 tipc_deleteport(topsrv.setup_port); 589 list_for_each_entry_safe(subscriber, subscriber_temp, 590 &topsrv.subscriber_list, 591 subscriber_list) { 592 subscriber_lock = subscriber->lock; 593 spin_lock_bh(subscriber_lock); 594 subscr_terminate(subscriber); 595 spin_unlock_bh(subscriber_lock); 596 } 597 tipc_detach(topsrv.user_ref); 598 topsrv.user_ref = 0; 599 } 600 } 601 602 603 int tipc_ispublished(struct tipc_name const *name) 604 { 605 u32 domain = 0; 606 607 return(tipc_nametbl_translate(name->type, name->instance,&domain) != 0); 608 } 609 610