1 /* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, 2010-2011, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38 #include "core.h" 39 #include "link.h" 40 #include "port.h" 41 #include "bcast.h" 42 43 #define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ 44 45 #define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */ 46 47 /* 48 * Loss rate for incoming broadcast frames; used to test retransmission code. 49 * Set to N to cause every N'th frame to be discarded; 0 => don't discard any. 50 */ 51 52 #define TIPC_BCAST_LOSS_RATE 0 53 54 /** 55 * struct bcbearer_pair - a pair of bearers used by broadcast link 56 * @primary: pointer to primary bearer 57 * @secondary: pointer to secondary bearer 58 * 59 * Bearers must have same priority and same set of reachable destinations 60 * to be paired. 61 */ 62 63 struct bcbearer_pair { 64 struct tipc_bearer *primary; 65 struct tipc_bearer *secondary; 66 }; 67 68 /** 69 * struct bcbearer - bearer used by broadcast link 70 * @bearer: (non-standard) broadcast bearer structure 71 * @media: (non-standard) broadcast media structure 72 * @bpairs: array of bearer pairs 73 * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort() 74 * @remains: temporary node map used by tipc_bcbearer_send() 75 * @remains_new: temporary node map used tipc_bcbearer_send() 76 * 77 * Note: The fields labelled "temporary" are incorporated into the bearer 78 * to avoid consuming potentially limited stack space through the use of 79 * large local variables within multicast routines. Concurrent access is 80 * prevented through use of the spinlock "bc_lock". 81 */ 82 83 struct bcbearer { 84 struct tipc_bearer bearer; 85 struct media media; 86 struct bcbearer_pair bpairs[MAX_BEARERS]; 87 struct bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1]; 88 struct tipc_node_map remains; 89 struct tipc_node_map remains_new; 90 }; 91 92 /** 93 * struct bclink - link used for broadcast messages 94 * @link: (non-standard) broadcast link structure 95 * @node: (non-standard) node structure representing b'cast link's peer node 96 * @retransmit_to: node that most recently requested a retransmit 97 * 98 * Handles sequence numbering, fragmentation, bundling, etc. 99 */ 100 101 struct bclink { 102 struct link link; 103 struct tipc_node node; 104 struct tipc_node *retransmit_to; 105 }; 106 107 108 static struct bcbearer *bcbearer; 109 static struct bclink *bclink; 110 static struct link *bcl; 111 static DEFINE_SPINLOCK(bc_lock); 112 113 /* broadcast-capable node map */ 114 struct tipc_node_map tipc_bcast_nmap; 115 116 const char tipc_bclink_name[] = "broadcast-link"; 117 118 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 119 struct tipc_node_map *nm_b, 120 struct tipc_node_map *nm_diff); 121 122 static u32 buf_seqno(struct sk_buff *buf) 123 { 124 return msg_seqno(buf_msg(buf)); 125 } 126 127 static u32 bcbuf_acks(struct sk_buff *buf) 128 { 129 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 130 } 131 132 static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 133 { 134 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 135 } 136 137 static void bcbuf_decr_acks(struct sk_buff *buf) 138 { 139 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 140 } 141 142 143 static void bclink_set_last_sent(void) 144 { 145 if (bcl->next_out) 146 bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1); 147 else 148 bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1); 149 } 150 151 u32 tipc_bclink_get_last_sent(void) 152 { 153 return bcl->fsm_msg_cnt; 154 } 155 156 /** 157 * bclink_set_gap - set gap according to contents of current deferred pkt queue 158 * 159 * Called with 'node' locked, bc_lock unlocked 160 */ 161 162 static void bclink_set_gap(struct tipc_node *n_ptr) 163 { 164 struct sk_buff *buf = n_ptr->bclink.deferred_head; 165 166 n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 167 mod(n_ptr->bclink.last_in); 168 if (unlikely(buf != NULL)) 169 n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1); 170 } 171 172 /** 173 * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment 174 * 175 * This mechanism endeavours to prevent all nodes in network from trying 176 * to ACK or NACK at the same time. 177 * 178 * Note: TIPC uses a different trigger to distribute ACKs than it does to 179 * distribute NACKs, but tries to use the same spacing (divide by 16). 180 */ 181 182 static int bclink_ack_allowed(u32 n) 183 { 184 return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag; 185 } 186 187 188 /** 189 * tipc_bclink_retransmit_to - get most recent node to request retransmission 190 * 191 * Called with bc_lock locked 192 */ 193 194 struct tipc_node *tipc_bclink_retransmit_to(void) 195 { 196 return bclink->retransmit_to; 197 } 198 199 /** 200 * bclink_retransmit_pkt - retransmit broadcast packets 201 * @after: sequence number of last packet to *not* retransmit 202 * @to: sequence number of last packet to retransmit 203 * 204 * Called with bc_lock locked 205 */ 206 207 static void bclink_retransmit_pkt(u32 after, u32 to) 208 { 209 struct sk_buff *buf; 210 211 buf = bcl->first_out; 212 while (buf && less_eq(buf_seqno(buf), after)) 213 buf = buf->next; 214 tipc_link_retransmit(bcl, buf, mod(to - after)); 215 } 216 217 /** 218 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets 219 * @n_ptr: node that sent acknowledgement info 220 * @acked: broadcast sequence # that has been acknowledged 221 * 222 * Node is locked, bc_lock unlocked. 223 */ 224 225 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 226 { 227 struct sk_buff *crs; 228 struct sk_buff *next; 229 unsigned int released = 0; 230 231 if (less_eq(acked, n_ptr->bclink.acked)) 232 return; 233 234 spin_lock_bh(&bc_lock); 235 236 /* Skip over packets that node has previously acknowledged */ 237 238 crs = bcl->first_out; 239 while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) 240 crs = crs->next; 241 242 /* Update packets that node is now acknowledging */ 243 244 while (crs && less_eq(buf_seqno(crs), acked)) { 245 next = crs->next; 246 bcbuf_decr_acks(crs); 247 if (bcbuf_acks(crs) == 0) { 248 bcl->first_out = next; 249 bcl->out_queue_size--; 250 buf_discard(crs); 251 released = 1; 252 } 253 crs = next; 254 } 255 n_ptr->bclink.acked = acked; 256 257 /* Try resolving broadcast link congestion, if necessary */ 258 259 if (unlikely(bcl->next_out)) { 260 tipc_link_push_queue(bcl); 261 bclink_set_last_sent(); 262 } 263 if (unlikely(released && !list_empty(&bcl->waiting_ports))) 264 tipc_link_wakeup_ports(bcl, 0); 265 spin_unlock_bh(&bc_lock); 266 } 267 268 /** 269 * bclink_send_ack - unicast an ACK msg 270 * 271 * tipc_net_lock and node lock set 272 */ 273 274 static void bclink_send_ack(struct tipc_node *n_ptr) 275 { 276 struct link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; 277 278 if (l_ptr != NULL) 279 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 280 } 281 282 /** 283 * bclink_send_nack- broadcast a NACK msg 284 * 285 * tipc_net_lock and node lock set 286 */ 287 288 static void bclink_send_nack(struct tipc_node *n_ptr) 289 { 290 struct sk_buff *buf; 291 struct tipc_msg *msg; 292 293 if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) 294 return; 295 296 buf = tipc_buf_acquire(INT_H_SIZE); 297 if (buf) { 298 msg = buf_msg(buf); 299 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 300 INT_H_SIZE, n_ptr->addr); 301 msg_set_non_seq(msg, 1); 302 msg_set_mc_netid(msg, tipc_net_id); 303 msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); 304 msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); 305 msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); 306 msg_set_bcast_tag(msg, tipc_own_tag); 307 308 if (tipc_bearer_send(&bcbearer->bearer, buf, NULL)) { 309 bcl->stats.sent_nacks++; 310 buf_discard(buf); 311 } else { 312 tipc_bearer_schedule(bcl->b_ptr, bcl); 313 bcl->proto_msg_queue = buf; 314 bcl->stats.bearer_congs++; 315 } 316 317 /* 318 * Ensure we doesn't send another NACK msg to the node 319 * until 16 more deferred messages arrive from it 320 * (i.e. helps prevent all nodes from NACK'ing at same time) 321 */ 322 323 n_ptr->bclink.nack_sync = tipc_own_tag; 324 } 325 } 326 327 /** 328 * tipc_bclink_check_gap - send a NACK if a sequence gap exists 329 * 330 * tipc_net_lock and node lock set 331 */ 332 333 void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent) 334 { 335 if (!n_ptr->bclink.supported || 336 less_eq(last_sent, mod(n_ptr->bclink.last_in))) 337 return; 338 339 bclink_set_gap(n_ptr); 340 if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to) 341 n_ptr->bclink.gap_to = last_sent; 342 bclink_send_nack(n_ptr); 343 } 344 345 /** 346 * tipc_bclink_peek_nack - process a NACK msg meant for another node 347 * 348 * Only tipc_net_lock set. 349 */ 350 351 static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) 352 { 353 struct tipc_node *n_ptr = tipc_node_find(dest); 354 u32 my_after, my_to; 355 356 if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) 357 return; 358 tipc_node_lock(n_ptr); 359 /* 360 * Modify gap to suppress unnecessary NACKs from this node 361 */ 362 my_after = n_ptr->bclink.gap_after; 363 my_to = n_ptr->bclink.gap_to; 364 365 if (less_eq(gap_after, my_after)) { 366 if (less(my_after, gap_to) && less(gap_to, my_to)) 367 n_ptr->bclink.gap_after = gap_to; 368 else if (less_eq(my_to, gap_to)) 369 n_ptr->bclink.gap_to = n_ptr->bclink.gap_after; 370 } else if (less_eq(gap_after, my_to)) { 371 if (less_eq(my_to, gap_to)) 372 n_ptr->bclink.gap_to = gap_after; 373 } else { 374 /* 375 * Expand gap if missing bufs not in deferred queue: 376 */ 377 struct sk_buff *buf = n_ptr->bclink.deferred_head; 378 u32 prev = n_ptr->bclink.gap_to; 379 380 for (; buf; buf = buf->next) { 381 u32 seqno = buf_seqno(buf); 382 383 if (mod(seqno - prev) != 1) { 384 buf = NULL; 385 break; 386 } 387 if (seqno == gap_after) 388 break; 389 prev = seqno; 390 } 391 if (buf == NULL) 392 n_ptr->bclink.gap_to = gap_after; 393 } 394 /* 395 * Some nodes may send a complementary NACK now: 396 */ 397 if (bclink_ack_allowed(sender_tag + 1)) { 398 if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) { 399 bclink_send_nack(n_ptr); 400 bclink_set_gap(n_ptr); 401 } 402 } 403 tipc_node_unlock(n_ptr); 404 } 405 406 /** 407 * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster 408 */ 409 410 int tipc_bclink_send_msg(struct sk_buff *buf) 411 { 412 int res; 413 414 spin_lock_bh(&bc_lock); 415 416 res = tipc_link_send_buf(bcl, buf); 417 if (unlikely(res == -ELINKCONG)) 418 buf_discard(buf); 419 else 420 bclink_set_last_sent(); 421 422 bcl->stats.queue_sz_counts++; 423 bcl->stats.accu_queue_sz += bcl->out_queue_size; 424 425 spin_unlock_bh(&bc_lock); 426 return res; 427 } 428 429 /** 430 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards 431 * 432 * tipc_net_lock is read_locked, no other locks set 433 */ 434 435 void tipc_bclink_recv_pkt(struct sk_buff *buf) 436 { 437 #if (TIPC_BCAST_LOSS_RATE) 438 static int rx_count; 439 #endif 440 struct tipc_msg *msg = buf_msg(buf); 441 struct tipc_node *node = tipc_node_find(msg_prevnode(msg)); 442 u32 next_in; 443 u32 seqno; 444 struct sk_buff *deferred; 445 446 if (unlikely(!node || !tipc_node_is_up(node) || !node->bclink.supported || 447 (msg_mc_netid(msg) != tipc_net_id))) { 448 buf_discard(buf); 449 return; 450 } 451 452 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 453 if (msg_destnode(msg) == tipc_own_addr) { 454 tipc_node_lock(node); 455 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 456 tipc_node_unlock(node); 457 spin_lock_bh(&bc_lock); 458 bcl->stats.recv_nacks++; 459 bclink->retransmit_to = node; 460 bclink_retransmit_pkt(msg_bcgap_after(msg), 461 msg_bcgap_to(msg)); 462 spin_unlock_bh(&bc_lock); 463 } else { 464 tipc_bclink_peek_nack(msg_destnode(msg), 465 msg_bcast_tag(msg), 466 msg_bcgap_after(msg), 467 msg_bcgap_to(msg)); 468 } 469 buf_discard(buf); 470 return; 471 } 472 473 #if (TIPC_BCAST_LOSS_RATE) 474 if (++rx_count == TIPC_BCAST_LOSS_RATE) { 475 rx_count = 0; 476 buf_discard(buf); 477 return; 478 } 479 #endif 480 481 tipc_node_lock(node); 482 receive: 483 deferred = node->bclink.deferred_head; 484 next_in = mod(node->bclink.last_in + 1); 485 seqno = msg_seqno(msg); 486 487 if (likely(seqno == next_in)) { 488 bcl->stats.recv_info++; 489 node->bclink.last_in++; 490 bclink_set_gap(node); 491 if (unlikely(bclink_ack_allowed(seqno))) { 492 bclink_send_ack(node); 493 bcl->stats.sent_acks++; 494 } 495 if (likely(msg_isdata(msg))) { 496 tipc_node_unlock(node); 497 tipc_port_recv_mcast(buf, NULL); 498 } else if (msg_user(msg) == MSG_BUNDLER) { 499 bcl->stats.recv_bundles++; 500 bcl->stats.recv_bundled += msg_msgcnt(msg); 501 tipc_node_unlock(node); 502 tipc_link_recv_bundle(buf); 503 } else if (msg_user(msg) == MSG_FRAGMENTER) { 504 bcl->stats.recv_fragments++; 505 if (tipc_link_recv_fragment(&node->bclink.defragm, 506 &buf, &msg)) 507 bcl->stats.recv_fragmented++; 508 tipc_node_unlock(node); 509 tipc_net_route_msg(buf); 510 } else { 511 tipc_node_unlock(node); 512 tipc_net_route_msg(buf); 513 } 514 if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { 515 tipc_node_lock(node); 516 buf = deferred; 517 msg = buf_msg(buf); 518 node->bclink.deferred_head = deferred->next; 519 goto receive; 520 } 521 return; 522 } else if (less(next_in, seqno)) { 523 u32 gap_after = node->bclink.gap_after; 524 u32 gap_to = node->bclink.gap_to; 525 526 if (tipc_link_defer_pkt(&node->bclink.deferred_head, 527 &node->bclink.deferred_tail, 528 buf)) { 529 node->bclink.nack_sync++; 530 bcl->stats.deferred_recv++; 531 if (seqno == mod(gap_after + 1)) 532 node->bclink.gap_after = seqno; 533 else if (less(gap_after, seqno) && less(seqno, gap_to)) 534 node->bclink.gap_to = seqno; 535 } 536 if (bclink_ack_allowed(node->bclink.nack_sync)) { 537 if (gap_to != gap_after) 538 bclink_send_nack(node); 539 bclink_set_gap(node); 540 } 541 } else { 542 bcl->stats.duplicates++; 543 buf_discard(buf); 544 } 545 tipc_node_unlock(node); 546 } 547 548 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 549 { 550 return (n_ptr->bclink.supported && 551 (tipc_bclink_get_last_sent() != n_ptr->bclink.acked)); 552 } 553 554 555 /** 556 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer 557 * 558 * Send through as many bearers as necessary to reach all nodes 559 * that support TIPC multicasting. 560 * 561 * Returns 0 if packet sent successfully, non-zero if not 562 */ 563 564 static int tipc_bcbearer_send(struct sk_buff *buf, 565 struct tipc_bearer *unused1, 566 struct tipc_media_addr *unused2) 567 { 568 int bp_index; 569 570 /* Prepare buffer for broadcasting (if first time trying to send it) */ 571 572 if (likely(!msg_non_seq(buf_msg(buf)))) { 573 struct tipc_msg *msg; 574 575 assert(tipc_bcast_nmap.count != 0); 576 bcbuf_set_acks(buf, tipc_bcast_nmap.count); 577 msg = buf_msg(buf); 578 msg_set_non_seq(msg, 1); 579 msg_set_mc_netid(msg, tipc_net_id); 580 bcl->stats.sent_info++; 581 } 582 583 /* Send buffer over bearers until all targets reached */ 584 585 bcbearer->remains = tipc_bcast_nmap; 586 587 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 588 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; 589 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; 590 591 if (!p) 592 break; /* no more bearers to try */ 593 594 tipc_nmap_diff(&bcbearer->remains, &p->nodes, &bcbearer->remains_new); 595 if (bcbearer->remains_new.count == bcbearer->remains.count) 596 continue; /* bearer pair doesn't add anything */ 597 598 if (p->blocked || 599 p->media->send_msg(buf, p, &p->media->bcast_addr)) { 600 /* unable to send on primary bearer */ 601 if (!s || s->blocked || 602 s->media->send_msg(buf, s, 603 &s->media->bcast_addr)) { 604 /* unable to send on either bearer */ 605 continue; 606 } 607 } 608 609 if (s) { 610 bcbearer->bpairs[bp_index].primary = s; 611 bcbearer->bpairs[bp_index].secondary = p; 612 } 613 614 if (bcbearer->remains_new.count == 0) 615 return 0; 616 617 bcbearer->remains = bcbearer->remains_new; 618 } 619 620 /* 621 * Unable to reach all targets (indicate success, since currently 622 * there isn't code in place to properly block & unblock the 623 * pseudo-bearer used by the broadcast link) 624 */ 625 626 return TIPC_OK; 627 } 628 629 /** 630 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 631 */ 632 633 void tipc_bcbearer_sort(void) 634 { 635 struct bcbearer_pair *bp_temp = bcbearer->bpairs_temp; 636 struct bcbearer_pair *bp_curr; 637 int b_index; 638 int pri; 639 640 spin_lock_bh(&bc_lock); 641 642 /* Group bearers by priority (can assume max of two per priority) */ 643 644 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 645 646 for (b_index = 0; b_index < MAX_BEARERS; b_index++) { 647 struct tipc_bearer *b = &tipc_bearers[b_index]; 648 649 if (!b->active || !b->nodes.count) 650 continue; 651 652 if (!bp_temp[b->priority].primary) 653 bp_temp[b->priority].primary = b; 654 else 655 bp_temp[b->priority].secondary = b; 656 } 657 658 /* Create array of bearer pairs for broadcasting */ 659 660 bp_curr = bcbearer->bpairs; 661 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 662 663 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 664 665 if (!bp_temp[pri].primary) 666 continue; 667 668 bp_curr->primary = bp_temp[pri].primary; 669 670 if (bp_temp[pri].secondary) { 671 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 672 &bp_temp[pri].secondary->nodes)) { 673 bp_curr->secondary = bp_temp[pri].secondary; 674 } else { 675 bp_curr++; 676 bp_curr->primary = bp_temp[pri].secondary; 677 } 678 } 679 680 bp_curr++; 681 } 682 683 spin_unlock_bh(&bc_lock); 684 } 685 686 /** 687 * tipc_bcbearer_push - resolve bearer congestion 688 * 689 * Forces bclink to push out any unsent packets, until all packets are gone 690 * or congestion reoccurs. 691 * No locks set when function called 692 */ 693 694 void tipc_bcbearer_push(void) 695 { 696 struct tipc_bearer *b_ptr; 697 698 spin_lock_bh(&bc_lock); 699 b_ptr = &bcbearer->bearer; 700 if (b_ptr->blocked) { 701 b_ptr->blocked = 0; 702 tipc_bearer_lock_push(b_ptr); 703 } 704 spin_unlock_bh(&bc_lock); 705 } 706 707 708 int tipc_bclink_stats(char *buf, const u32 buf_size) 709 { 710 struct print_buf pb; 711 712 if (!bcl) 713 return 0; 714 715 tipc_printbuf_init(&pb, buf, buf_size); 716 717 spin_lock_bh(&bc_lock); 718 719 tipc_printf(&pb, "Link <%s>\n" 720 " Window:%u packets\n", 721 bcl->name, bcl->queue_limit[0]); 722 tipc_printf(&pb, " RX packets:%u fragments:%u/%u bundles:%u/%u\n", 723 bcl->stats.recv_info, 724 bcl->stats.recv_fragments, 725 bcl->stats.recv_fragmented, 726 bcl->stats.recv_bundles, 727 bcl->stats.recv_bundled); 728 tipc_printf(&pb, " TX packets:%u fragments:%u/%u bundles:%u/%u\n", 729 bcl->stats.sent_info, 730 bcl->stats.sent_fragments, 731 bcl->stats.sent_fragmented, 732 bcl->stats.sent_bundles, 733 bcl->stats.sent_bundled); 734 tipc_printf(&pb, " RX naks:%u defs:%u dups:%u\n", 735 bcl->stats.recv_nacks, 736 bcl->stats.deferred_recv, 737 bcl->stats.duplicates); 738 tipc_printf(&pb, " TX naks:%u acks:%u dups:%u\n", 739 bcl->stats.sent_nacks, 740 bcl->stats.sent_acks, 741 bcl->stats.retransmitted); 742 tipc_printf(&pb, " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", 743 bcl->stats.bearer_congs, 744 bcl->stats.link_congs, 745 bcl->stats.max_queue_sz, 746 bcl->stats.queue_sz_counts 747 ? (bcl->stats.accu_queue_sz / bcl->stats.queue_sz_counts) 748 : 0); 749 750 spin_unlock_bh(&bc_lock); 751 return tipc_printbuf_validate(&pb); 752 } 753 754 int tipc_bclink_reset_stats(void) 755 { 756 if (!bcl) 757 return -ENOPROTOOPT; 758 759 spin_lock_bh(&bc_lock); 760 memset(&bcl->stats, 0, sizeof(bcl->stats)); 761 spin_unlock_bh(&bc_lock); 762 return 0; 763 } 764 765 int tipc_bclink_set_queue_limits(u32 limit) 766 { 767 if (!bcl) 768 return -ENOPROTOOPT; 769 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) 770 return -EINVAL; 771 772 spin_lock_bh(&bc_lock); 773 tipc_link_set_queue_limits(bcl, limit); 774 spin_unlock_bh(&bc_lock); 775 return 0; 776 } 777 778 int tipc_bclink_init(void) 779 { 780 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); 781 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); 782 if (!bcbearer || !bclink) { 783 warn("Multicast link creation failed, no memory\n"); 784 kfree(bcbearer); 785 bcbearer = NULL; 786 kfree(bclink); 787 bclink = NULL; 788 return -ENOMEM; 789 } 790 791 INIT_LIST_HEAD(&bcbearer->bearer.cong_links); 792 bcbearer->bearer.media = &bcbearer->media; 793 bcbearer->media.send_msg = tipc_bcbearer_send; 794 sprintf(bcbearer->media.name, "tipc-multicast"); 795 796 bcl = &bclink->link; 797 INIT_LIST_HEAD(&bcl->waiting_ports); 798 bcl->next_out_no = 1; 799 spin_lock_init(&bclink->node.lock); 800 bcl->owner = &bclink->node; 801 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 802 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 803 bcl->b_ptr = &bcbearer->bearer; 804 bcl->state = WORKING_WORKING; 805 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 806 807 return 0; 808 } 809 810 void tipc_bclink_stop(void) 811 { 812 spin_lock_bh(&bc_lock); 813 if (bcbearer) { 814 tipc_link_stop(bcl); 815 bcl = NULL; 816 kfree(bclink); 817 bclink = NULL; 818 kfree(bcbearer); 819 bcbearer = NULL; 820 } 821 spin_unlock_bh(&bc_lock); 822 } 823 824 825 /** 826 * tipc_nmap_add - add a node to a node map 827 */ 828 829 void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) 830 { 831 int n = tipc_node(node); 832 int w = n / WSIZE; 833 u32 mask = (1 << (n % WSIZE)); 834 835 if ((nm_ptr->map[w] & mask) == 0) { 836 nm_ptr->count++; 837 nm_ptr->map[w] |= mask; 838 } 839 } 840 841 /** 842 * tipc_nmap_remove - remove a node from a node map 843 */ 844 845 void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) 846 { 847 int n = tipc_node(node); 848 int w = n / WSIZE; 849 u32 mask = (1 << (n % WSIZE)); 850 851 if ((nm_ptr->map[w] & mask) != 0) { 852 nm_ptr->map[w] &= ~mask; 853 nm_ptr->count--; 854 } 855 } 856 857 /** 858 * tipc_nmap_diff - find differences between node maps 859 * @nm_a: input node map A 860 * @nm_b: input node map B 861 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B) 862 */ 863 864 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 865 struct tipc_node_map *nm_b, 866 struct tipc_node_map *nm_diff) 867 { 868 int stop = ARRAY_SIZE(nm_a->map); 869 int w; 870 int b; 871 u32 map; 872 873 memset(nm_diff, 0, sizeof(*nm_diff)); 874 for (w = 0; w < stop; w++) { 875 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]); 876 nm_diff->map[w] = map; 877 if (map != 0) { 878 for (b = 0 ; b < WSIZE; b++) { 879 if (map & (1 << b)) 880 nm_diff->count++; 881 } 882 } 883 } 884 } 885 886 /** 887 * tipc_port_list_add - add a port to a port list, ensuring no duplicates 888 */ 889 890 void tipc_port_list_add(struct port_list *pl_ptr, u32 port) 891 { 892 struct port_list *item = pl_ptr; 893 int i; 894 int item_sz = PLSIZE; 895 int cnt = pl_ptr->count; 896 897 for (; ; cnt -= item_sz, item = item->next) { 898 if (cnt < PLSIZE) 899 item_sz = cnt; 900 for (i = 0; i < item_sz; i++) 901 if (item->ports[i] == port) 902 return; 903 if (i < PLSIZE) { 904 item->ports[i] = port; 905 pl_ptr->count++; 906 return; 907 } 908 if (!item->next) { 909 item->next = kmalloc(sizeof(*item), GFP_ATOMIC); 910 if (!item->next) { 911 warn("Incomplete multicast delivery, no memory\n"); 912 return; 913 } 914 item->next->next = NULL; 915 } 916 } 917 } 918 919 /** 920 * tipc_port_list_free - free dynamically created entries in port_list chain 921 * 922 */ 923 924 void tipc_port_list_free(struct port_list *pl_ptr) 925 { 926 struct port_list *item; 927 struct port_list *next; 928 929 for (item = pl_ptr->next; item; item = next) { 930 next = item->next; 931 kfree(item); 932 } 933 } 934 935