1 /* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, 2010-2011, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38 #include "socket.h" 39 #include "msg.h" 40 #include "bcast.h" 41 #include "name_distr.h" 42 #include "core.h" 43 44 #define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ 45 #define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ 46 #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ 47 48 const char tipc_bclink_name[] = "broadcast-link"; 49 50 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 51 struct tipc_node_map *nm_b, 52 struct tipc_node_map *nm_diff); 53 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); 54 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); 55 56 static void tipc_bclink_lock(struct net *net) 57 { 58 struct tipc_net *tn = net_generic(net, tipc_net_id); 59 60 spin_lock_bh(&tn->bclink->lock); 61 } 62 63 static void tipc_bclink_unlock(struct net *net) 64 { 65 struct tipc_net *tn = net_generic(net, tipc_net_id); 66 67 spin_unlock_bh(&tn->bclink->lock); 68 } 69 70 void tipc_bclink_input(struct net *net) 71 { 72 struct tipc_net *tn = net_generic(net, tipc_net_id); 73 74 tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq); 75 } 76 77 uint tipc_bclink_get_mtu(void) 78 { 79 return MAX_PKT_DEFAULT_MCAST; 80 } 81 82 static u32 bcbuf_acks(struct sk_buff *buf) 83 { 84 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 85 } 86 87 static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 88 { 89 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 90 } 91 92 static void bcbuf_decr_acks(struct sk_buff *buf) 93 { 94 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 95 } 96 97 void tipc_bclink_add_node(struct net *net, u32 addr) 98 { 99 struct tipc_net *tn = net_generic(net, tipc_net_id); 100 101 tipc_bclink_lock(net); 102 tipc_nmap_add(&tn->bclink->bcast_nodes, addr); 103 tipc_bclink_unlock(net); 104 } 105 106 void tipc_bclink_remove_node(struct net *net, u32 addr) 107 { 108 struct tipc_net *tn = net_generic(net, tipc_net_id); 109 110 tipc_bclink_lock(net); 111 tipc_nmap_remove(&tn->bclink->bcast_nodes, addr); 112 113 /* Last node? => reset backlog queue */ 114 if (!tn->bclink->bcast_nodes.count) 115 tipc_link_purge_backlog(&tn->bclink->link); 116 117 tipc_bclink_unlock(net); 118 } 119 120 static void bclink_set_last_sent(struct net *net) 121 { 122 struct tipc_net *tn = net_generic(net, tipc_net_id); 123 struct tipc_link *bcl = tn->bcl; 124 125 bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1); 126 } 127 128 u32 tipc_bclink_get_last_sent(struct net *net) 129 { 130 struct tipc_net *tn = net_generic(net, tipc_net_id); 131 132 return tn->bcl->silent_intv_cnt; 133 } 134 135 static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) 136 { 137 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? 138 seqno : node->bclink.last_sent; 139 } 140 141 /** 142 * tipc_bclink_retransmit_to - get most recent node to request retransmission 143 * 144 * Called with bclink_lock locked 145 */ 146 struct tipc_node *tipc_bclink_retransmit_to(struct net *net) 147 { 148 struct tipc_net *tn = net_generic(net, tipc_net_id); 149 150 return tn->bclink->retransmit_to; 151 } 152 153 /** 154 * bclink_retransmit_pkt - retransmit broadcast packets 155 * @after: sequence number of last packet to *not* retransmit 156 * @to: sequence number of last packet to retransmit 157 * 158 * Called with bclink_lock locked 159 */ 160 static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to) 161 { 162 struct sk_buff *skb; 163 struct tipc_link *bcl = tn->bcl; 164 165 skb_queue_walk(&bcl->transmq, skb) { 166 if (more(buf_seqno(skb), after)) { 167 tipc_link_retransmit(bcl, skb, mod(to - after)); 168 break; 169 } 170 } 171 } 172 173 /** 174 * bclink_prepare_wakeup - prepare users for wakeup after congestion 175 * @bcl: broadcast link 176 * @resultq: queue for users which can be woken up 177 * Move a number of waiting users, as permitted by available space in 178 * the send queue, from link wait queue to specified queue for wakeup 179 */ 180 static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq) 181 { 182 int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; 183 int imp, lim; 184 struct sk_buff *skb, *tmp; 185 186 skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) { 187 imp = TIPC_SKB_CB(skb)->chain_imp; 188 lim = bcl->window + bcl->backlog[imp].limit; 189 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; 190 if ((pnd[imp] + bcl->backlog[imp].len) >= lim) 191 continue; 192 skb_unlink(skb, &bcl->wakeupq); 193 skb_queue_tail(resultq, skb); 194 } 195 } 196 197 /** 198 * tipc_bclink_wakeup_users - wake up pending users 199 * 200 * Called with no locks taken 201 */ 202 void tipc_bclink_wakeup_users(struct net *net) 203 { 204 struct tipc_net *tn = net_generic(net, tipc_net_id); 205 struct tipc_link *bcl = tn->bcl; 206 struct sk_buff_head resultq; 207 208 skb_queue_head_init(&resultq); 209 bclink_prepare_wakeup(bcl, &resultq); 210 tipc_sk_rcv(net, &resultq); 211 } 212 213 /** 214 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets 215 * @n_ptr: node that sent acknowledgement info 216 * @acked: broadcast sequence # that has been acknowledged 217 * 218 * Node is locked, bclink_lock unlocked. 219 */ 220 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 221 { 222 struct sk_buff *skb, *tmp; 223 unsigned int released = 0; 224 struct net *net = n_ptr->net; 225 struct tipc_net *tn = net_generic(net, tipc_net_id); 226 227 if (unlikely(!n_ptr->bclink.recv_permitted)) 228 return; 229 230 tipc_bclink_lock(net); 231 232 /* Bail out if tx queue is empty (no clean up is required) */ 233 skb = skb_peek(&tn->bcl->transmq); 234 if (!skb) 235 goto exit; 236 237 /* Determine which messages need to be acknowledged */ 238 if (acked == INVALID_LINK_SEQ) { 239 /* 240 * Contact with specified node has been lost, so need to 241 * acknowledge sent messages only (if other nodes still exist) 242 * or both sent and unsent messages (otherwise) 243 */ 244 if (tn->bclink->bcast_nodes.count) 245 acked = tn->bcl->silent_intv_cnt; 246 else 247 acked = tn->bcl->snd_nxt; 248 } else { 249 /* 250 * Bail out if specified sequence number does not correspond 251 * to a message that has been sent and not yet acknowledged 252 */ 253 if (less(acked, buf_seqno(skb)) || 254 less(tn->bcl->silent_intv_cnt, acked) || 255 less_eq(acked, n_ptr->bclink.acked)) 256 goto exit; 257 } 258 259 /* Skip over packets that node has previously acknowledged */ 260 skb_queue_walk(&tn->bcl->transmq, skb) { 261 if (more(buf_seqno(skb), n_ptr->bclink.acked)) 262 break; 263 } 264 265 /* Update packets that node is now acknowledging */ 266 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) { 267 if (more(buf_seqno(skb), acked)) 268 break; 269 bcbuf_decr_acks(skb); 270 bclink_set_last_sent(net); 271 if (bcbuf_acks(skb) == 0) { 272 __skb_unlink(skb, &tn->bcl->transmq); 273 kfree_skb(skb); 274 released = 1; 275 } 276 } 277 n_ptr->bclink.acked = acked; 278 279 /* Try resolving broadcast link congestion, if necessary */ 280 if (unlikely(skb_peek(&tn->bcl->backlogq))) { 281 tipc_link_push_packets(tn->bcl); 282 bclink_set_last_sent(net); 283 } 284 if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq))) 285 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; 286 exit: 287 tipc_bclink_unlock(net); 288 } 289 290 /** 291 * tipc_bclink_update_link_state - update broadcast link state 292 * 293 * RCU and node lock set 294 */ 295 void tipc_bclink_update_link_state(struct tipc_node *n_ptr, 296 u32 last_sent) 297 { 298 struct sk_buff *buf; 299 struct net *net = n_ptr->net; 300 struct tipc_net *tn = net_generic(net, tipc_net_id); 301 302 /* Ignore "stale" link state info */ 303 if (less_eq(last_sent, n_ptr->bclink.last_in)) 304 return; 305 306 /* Update link synchronization state; quit if in sync */ 307 bclink_update_last_sent(n_ptr, last_sent); 308 309 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 310 return; 311 312 /* Update out-of-sync state; quit if loss is still unconfirmed */ 313 if ((++n_ptr->bclink.oos_state) == 1) { 314 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 315 return; 316 n_ptr->bclink.oos_state++; 317 } 318 319 /* Don't NACK if one has been recently sent (or seen) */ 320 if (n_ptr->bclink.oos_state & 0x1) 321 return; 322 323 /* Send NACK */ 324 buf = tipc_buf_acquire(INT_H_SIZE); 325 if (buf) { 326 struct tipc_msg *msg = buf_msg(buf); 327 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq); 328 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; 329 330 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG, 331 INT_H_SIZE, n_ptr->addr); 332 msg_set_non_seq(msg, 1); 333 msg_set_mc_netid(msg, tn->net_id); 334 msg_set_bcast_ack(msg, n_ptr->bclink.last_in); 335 msg_set_bcgap_after(msg, n_ptr->bclink.last_in); 336 msg_set_bcgap_to(msg, to); 337 338 tipc_bclink_lock(net); 339 tipc_bearer_send(net, MAX_BEARERS, buf, NULL); 340 tn->bcl->stats.sent_nacks++; 341 tipc_bclink_unlock(net); 342 kfree_skb(buf); 343 344 n_ptr->bclink.oos_state++; 345 } 346 } 347 348 void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr) 349 { 350 u16 last = msg_last_bcast(hdr); 351 int mtyp = msg_type(hdr); 352 353 if (unlikely(msg_user(hdr) != LINK_PROTOCOL)) 354 return; 355 if (mtyp == STATE_MSG) { 356 tipc_bclink_update_link_state(n, last); 357 return; 358 } 359 /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization, 360 * and transfer synch info in LINK_PROTOCOL messages. 361 */ 362 if (tipc_node_is_up(n)) 363 return; 364 if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG)) 365 return; 366 n->bclink.last_sent = last; 367 n->bclink.last_in = last; 368 n->bclink.oos_state = 0; 369 } 370 371 /** 372 * bclink_peek_nack - monitor retransmission requests sent by other nodes 373 * 374 * Delay any upcoming NACK by this node if another node has already 375 * requested the first message this node is going to ask for. 376 */ 377 static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) 378 { 379 struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg)); 380 381 if (unlikely(!n_ptr)) 382 return; 383 384 tipc_node_lock(n_ptr); 385 if (n_ptr->bclink.recv_permitted && 386 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && 387 (n_ptr->bclink.last_in == msg_bcgap_after(msg))) 388 n_ptr->bclink.oos_state = 2; 389 tipc_node_unlock(n_ptr); 390 tipc_node_put(n_ptr); 391 } 392 393 /* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster 394 * and to identified node local sockets 395 * @net: the applicable net namespace 396 * @list: chain of buffers containing message 397 * Consumes the buffer chain, except when returning -ELINKCONG 398 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 399 */ 400 int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) 401 { 402 struct tipc_net *tn = net_generic(net, tipc_net_id); 403 struct tipc_link *bcl = tn->bcl; 404 struct tipc_bclink *bclink = tn->bclink; 405 int rc = 0; 406 int bc = 0; 407 struct sk_buff *skb; 408 struct sk_buff_head arrvq; 409 struct sk_buff_head inputq; 410 411 /* Prepare clone of message for local node */ 412 skb = tipc_msg_reassemble(list); 413 if (unlikely(!skb)) 414 return -EHOSTUNREACH; 415 416 /* Broadcast to all nodes */ 417 if (likely(bclink)) { 418 tipc_bclink_lock(net); 419 if (likely(bclink->bcast_nodes.count)) { 420 rc = __tipc_link_xmit(net, bcl, list); 421 if (likely(!rc)) { 422 u32 len = skb_queue_len(&bcl->transmq); 423 424 bclink_set_last_sent(net); 425 bcl->stats.queue_sz_counts++; 426 bcl->stats.accu_queue_sz += len; 427 } 428 bc = 1; 429 } 430 tipc_bclink_unlock(net); 431 } 432 433 if (unlikely(!bc)) 434 __skb_queue_purge(list); 435 436 if (unlikely(rc)) { 437 kfree_skb(skb); 438 return rc; 439 } 440 /* Deliver message clone */ 441 __skb_queue_head_init(&arrvq); 442 skb_queue_head_init(&inputq); 443 __skb_queue_tail(&arrvq, skb); 444 tipc_sk_mcast_rcv(net, &arrvq, &inputq); 445 return rc; 446 } 447 448 /** 449 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 450 * 451 * Called with both sending node's lock and bclink_lock taken. 452 */ 453 static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) 454 { 455 struct tipc_net *tn = net_generic(node->net, tipc_net_id); 456 457 bclink_update_last_sent(node, seqno); 458 node->bclink.last_in = seqno; 459 node->bclink.oos_state = 0; 460 tn->bcl->stats.recv_info++; 461 462 /* 463 * Unicast an ACK periodically, ensuring that 464 * all nodes in the cluster don't ACK at the same time 465 */ 466 if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) { 467 tipc_link_proto_xmit(node_active_link(node, node->addr), 468 STATE_MSG, 0, 0, 0, 0); 469 tn->bcl->stats.sent_acks++; 470 } 471 } 472 473 /** 474 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards 475 * 476 * RCU is locked, no other locks set 477 */ 478 void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) 479 { 480 struct tipc_net *tn = net_generic(net, tipc_net_id); 481 struct tipc_link *bcl = tn->bcl; 482 struct tipc_msg *msg = buf_msg(buf); 483 struct tipc_node *node; 484 u32 next_in; 485 u32 seqno; 486 int deferred = 0; 487 int pos = 0; 488 struct sk_buff *iskb; 489 struct sk_buff_head *arrvq, *inputq; 490 491 /* Screen out unwanted broadcast messages */ 492 if (msg_mc_netid(msg) != tn->net_id) 493 goto exit; 494 495 node = tipc_node_find(net, msg_prevnode(msg)); 496 if (unlikely(!node)) 497 goto exit; 498 499 tipc_node_lock(node); 500 if (unlikely(!node->bclink.recv_permitted)) 501 goto unlock; 502 503 /* Handle broadcast protocol message */ 504 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 505 if (msg_type(msg) != STATE_MSG) 506 goto unlock; 507 if (msg_destnode(msg) == tn->own_addr) { 508 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 509 tipc_bclink_lock(net); 510 bcl->stats.recv_nacks++; 511 tn->bclink->retransmit_to = node; 512 bclink_retransmit_pkt(tn, msg_bcgap_after(msg), 513 msg_bcgap_to(msg)); 514 tipc_bclink_unlock(net); 515 tipc_node_unlock(node); 516 } else { 517 tipc_node_unlock(node); 518 bclink_peek_nack(net, msg); 519 } 520 tipc_node_put(node); 521 goto exit; 522 } 523 524 /* Handle in-sequence broadcast message */ 525 seqno = msg_seqno(msg); 526 next_in = mod(node->bclink.last_in + 1); 527 arrvq = &tn->bclink->arrvq; 528 inputq = &tn->bclink->inputq; 529 530 if (likely(seqno == next_in)) { 531 receive: 532 /* Deliver message to destination */ 533 if (likely(msg_isdata(msg))) { 534 tipc_bclink_lock(net); 535 bclink_accept_pkt(node, seqno); 536 spin_lock_bh(&inputq->lock); 537 __skb_queue_tail(arrvq, buf); 538 spin_unlock_bh(&inputq->lock); 539 node->action_flags |= TIPC_BCAST_MSG_EVT; 540 tipc_bclink_unlock(net); 541 tipc_node_unlock(node); 542 } else if (msg_user(msg) == MSG_BUNDLER) { 543 tipc_bclink_lock(net); 544 bclink_accept_pkt(node, seqno); 545 bcl->stats.recv_bundles++; 546 bcl->stats.recv_bundled += msg_msgcnt(msg); 547 pos = 0; 548 while (tipc_msg_extract(buf, &iskb, &pos)) { 549 spin_lock_bh(&inputq->lock); 550 __skb_queue_tail(arrvq, iskb); 551 spin_unlock_bh(&inputq->lock); 552 } 553 node->action_flags |= TIPC_BCAST_MSG_EVT; 554 tipc_bclink_unlock(net); 555 tipc_node_unlock(node); 556 } else if (msg_user(msg) == MSG_FRAGMENTER) { 557 tipc_bclink_lock(net); 558 bclink_accept_pkt(node, seqno); 559 tipc_buf_append(&node->bclink.reasm_buf, &buf); 560 if (unlikely(!buf && !node->bclink.reasm_buf)) { 561 tipc_bclink_unlock(net); 562 goto unlock; 563 } 564 bcl->stats.recv_fragments++; 565 if (buf) { 566 bcl->stats.recv_fragmented++; 567 msg = buf_msg(buf); 568 tipc_bclink_unlock(net); 569 goto receive; 570 } 571 tipc_bclink_unlock(net); 572 tipc_node_unlock(node); 573 } else { 574 tipc_bclink_lock(net); 575 bclink_accept_pkt(node, seqno); 576 tipc_bclink_unlock(net); 577 tipc_node_unlock(node); 578 kfree_skb(buf); 579 } 580 buf = NULL; 581 582 /* Determine new synchronization state */ 583 tipc_node_lock(node); 584 if (unlikely(!tipc_node_is_up(node))) 585 goto unlock; 586 587 if (node->bclink.last_in == node->bclink.last_sent) 588 goto unlock; 589 590 if (skb_queue_empty(&node->bclink.deferdq)) { 591 node->bclink.oos_state = 1; 592 goto unlock; 593 } 594 595 msg = buf_msg(skb_peek(&node->bclink.deferdq)); 596 seqno = msg_seqno(msg); 597 next_in = mod(next_in + 1); 598 if (seqno != next_in) 599 goto unlock; 600 601 /* Take in-sequence message from deferred queue & deliver it */ 602 buf = __skb_dequeue(&node->bclink.deferdq); 603 goto receive; 604 } 605 606 /* Handle out-of-sequence broadcast message */ 607 if (less(next_in, seqno)) { 608 deferred = tipc_link_defer_pkt(&node->bclink.deferdq, 609 buf); 610 bclink_update_last_sent(node, seqno); 611 buf = NULL; 612 } 613 614 tipc_bclink_lock(net); 615 616 if (deferred) 617 bcl->stats.deferred_recv++; 618 else 619 bcl->stats.duplicates++; 620 621 tipc_bclink_unlock(net); 622 623 unlock: 624 tipc_node_unlock(node); 625 tipc_node_put(node); 626 exit: 627 kfree_skb(buf); 628 } 629 630 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 631 { 632 return (n_ptr->bclink.recv_permitted && 633 (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked)); 634 } 635 636 637 /** 638 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer 639 * 640 * Send packet over as many bearers as necessary to reach all nodes 641 * that have joined the broadcast link. 642 * 643 * Returns 0 (packet sent successfully) under all circumstances, 644 * since the broadcast link's pseudo-bearer never blocks 645 */ 646 static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, 647 struct tipc_bearer *unused1, 648 struct tipc_media_addr *unused2) 649 { 650 int bp_index; 651 struct tipc_msg *msg = buf_msg(buf); 652 struct tipc_net *tn = net_generic(net, tipc_net_id); 653 struct tipc_bcbearer *bcbearer = tn->bcbearer; 654 struct tipc_bclink *bclink = tn->bclink; 655 656 /* Prepare broadcast link message for reliable transmission, 657 * if first time trying to send it; 658 * preparation is skipped for broadcast link protocol messages 659 * since they are sent in an unreliable manner and don't need it 660 */ 661 if (likely(!msg_non_seq(buf_msg(buf)))) { 662 bcbuf_set_acks(buf, bclink->bcast_nodes.count); 663 msg_set_non_seq(msg, 1); 664 msg_set_mc_netid(msg, tn->net_id); 665 tn->bcl->stats.sent_info++; 666 if (WARN_ON(!bclink->bcast_nodes.count)) { 667 dump_stack(); 668 return 0; 669 } 670 } 671 672 /* Send buffer over bearers until all targets reached */ 673 bcbearer->remains = bclink->bcast_nodes; 674 675 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 676 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; 677 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; 678 struct tipc_bearer *bp[2] = {p, s}; 679 struct tipc_bearer *b = bp[msg_link_selector(msg)]; 680 struct sk_buff *tbuf; 681 682 if (!p) 683 break; /* No more bearers to try */ 684 if (!b) 685 b = p; 686 tipc_nmap_diff(&bcbearer->remains, &b->nodes, 687 &bcbearer->remains_new); 688 if (bcbearer->remains_new.count == bcbearer->remains.count) 689 continue; /* Nothing added by bearer pair */ 690 691 if (bp_index == 0) { 692 /* Use original buffer for first bearer */ 693 tipc_bearer_send(net, b->identity, buf, &b->bcast_addr); 694 } else { 695 /* Avoid concurrent buffer access */ 696 tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC); 697 if (!tbuf) 698 break; 699 tipc_bearer_send(net, b->identity, tbuf, 700 &b->bcast_addr); 701 kfree_skb(tbuf); /* Bearer keeps a clone */ 702 } 703 if (bcbearer->remains_new.count == 0) 704 break; /* All targets reached */ 705 706 bcbearer->remains = bcbearer->remains_new; 707 } 708 709 return 0; 710 } 711 712 /** 713 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 714 */ 715 void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr, 716 u32 node, bool action) 717 { 718 struct tipc_net *tn = net_generic(net, tipc_net_id); 719 struct tipc_bcbearer *bcbearer = tn->bcbearer; 720 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; 721 struct tipc_bcbearer_pair *bp_curr; 722 struct tipc_bearer *b; 723 int b_index; 724 int pri; 725 726 tipc_bclink_lock(net); 727 728 if (action) 729 tipc_nmap_add(nm_ptr, node); 730 else 731 tipc_nmap_remove(nm_ptr, node); 732 733 /* Group bearers by priority (can assume max of two per priority) */ 734 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 735 736 rcu_read_lock(); 737 for (b_index = 0; b_index < MAX_BEARERS; b_index++) { 738 b = rcu_dereference_rtnl(tn->bearer_list[b_index]); 739 if (!b || !b->nodes.count) 740 continue; 741 742 if (!bp_temp[b->priority].primary) 743 bp_temp[b->priority].primary = b; 744 else 745 bp_temp[b->priority].secondary = b; 746 } 747 rcu_read_unlock(); 748 749 /* Create array of bearer pairs for broadcasting */ 750 bp_curr = bcbearer->bpairs; 751 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 752 753 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 754 755 if (!bp_temp[pri].primary) 756 continue; 757 758 bp_curr->primary = bp_temp[pri].primary; 759 760 if (bp_temp[pri].secondary) { 761 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 762 &bp_temp[pri].secondary->nodes)) { 763 bp_curr->secondary = bp_temp[pri].secondary; 764 } else { 765 bp_curr++; 766 bp_curr->primary = bp_temp[pri].secondary; 767 } 768 } 769 770 bp_curr++; 771 } 772 773 tipc_bclink_unlock(net); 774 } 775 776 static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, 777 struct tipc_stats *stats) 778 { 779 int i; 780 struct nlattr *nest; 781 782 struct nla_map { 783 __u32 key; 784 __u32 val; 785 }; 786 787 struct nla_map map[] = { 788 {TIPC_NLA_STATS_RX_INFO, stats->recv_info}, 789 {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments}, 790 {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented}, 791 {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles}, 792 {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled}, 793 {TIPC_NLA_STATS_TX_INFO, stats->sent_info}, 794 {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments}, 795 {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented}, 796 {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles}, 797 {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled}, 798 {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks}, 799 {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv}, 800 {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks}, 801 {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks}, 802 {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted}, 803 {TIPC_NLA_STATS_DUPLICATES, stats->duplicates}, 804 {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs}, 805 {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz}, 806 {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ? 807 (stats->accu_queue_sz / stats->queue_sz_counts) : 0} 808 }; 809 810 nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS); 811 if (!nest) 812 return -EMSGSIZE; 813 814 for (i = 0; i < ARRAY_SIZE(map); i++) 815 if (nla_put_u32(skb, map[i].key, map[i].val)) 816 goto msg_full; 817 818 nla_nest_end(skb, nest); 819 820 return 0; 821 msg_full: 822 nla_nest_cancel(skb, nest); 823 824 return -EMSGSIZE; 825 } 826 827 int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) 828 { 829 int err; 830 void *hdr; 831 struct nlattr *attrs; 832 struct nlattr *prop; 833 struct tipc_net *tn = net_generic(net, tipc_net_id); 834 struct tipc_link *bcl = tn->bcl; 835 836 if (!bcl) 837 return 0; 838 839 tipc_bclink_lock(net); 840 841 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, 842 NLM_F_MULTI, TIPC_NL_LINK_GET); 843 if (!hdr) 844 return -EMSGSIZE; 845 846 attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK); 847 if (!attrs) 848 goto msg_full; 849 850 /* The broadcast link is always up */ 851 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) 852 goto attr_msg_full; 853 854 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST)) 855 goto attr_msg_full; 856 if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name)) 857 goto attr_msg_full; 858 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->rcv_nxt)) 859 goto attr_msg_full; 860 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->snd_nxt)) 861 goto attr_msg_full; 862 863 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP); 864 if (!prop) 865 goto attr_msg_full; 866 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window)) 867 goto prop_msg_full; 868 nla_nest_end(msg->skb, prop); 869 870 err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats); 871 if (err) 872 goto attr_msg_full; 873 874 tipc_bclink_unlock(net); 875 nla_nest_end(msg->skb, attrs); 876 genlmsg_end(msg->skb, hdr); 877 878 return 0; 879 880 prop_msg_full: 881 nla_nest_cancel(msg->skb, prop); 882 attr_msg_full: 883 nla_nest_cancel(msg->skb, attrs); 884 msg_full: 885 tipc_bclink_unlock(net); 886 genlmsg_cancel(msg->skb, hdr); 887 888 return -EMSGSIZE; 889 } 890 891 int tipc_bclink_reset_stats(struct net *net) 892 { 893 struct tipc_net *tn = net_generic(net, tipc_net_id); 894 struct tipc_link *bcl = tn->bcl; 895 896 if (!bcl) 897 return -ENOPROTOOPT; 898 899 tipc_bclink_lock(net); 900 memset(&bcl->stats, 0, sizeof(bcl->stats)); 901 tipc_bclink_unlock(net); 902 return 0; 903 } 904 905 int tipc_bclink_set_queue_limits(struct net *net, u32 limit) 906 { 907 struct tipc_net *tn = net_generic(net, tipc_net_id); 908 struct tipc_link *bcl = tn->bcl; 909 910 if (!bcl) 911 return -ENOPROTOOPT; 912 if (limit < BCLINK_WIN_MIN) 913 limit = BCLINK_WIN_MIN; 914 if (limit > TIPC_MAX_LINK_WIN) 915 return -EINVAL; 916 tipc_bclink_lock(net); 917 tipc_link_set_queue_limits(bcl, limit); 918 tipc_bclink_unlock(net); 919 return 0; 920 } 921 922 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]) 923 { 924 int err; 925 u32 win; 926 struct nlattr *props[TIPC_NLA_PROP_MAX + 1]; 927 928 if (!attrs[TIPC_NLA_LINK_PROP]) 929 return -EINVAL; 930 931 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props); 932 if (err) 933 return err; 934 935 if (!props[TIPC_NLA_PROP_WIN]) 936 return -EOPNOTSUPP; 937 938 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); 939 940 return tipc_bclink_set_queue_limits(net, win); 941 } 942 943 int tipc_bclink_init(struct net *net) 944 { 945 struct tipc_net *tn = net_generic(net, tipc_net_id); 946 struct tipc_bcbearer *bcbearer; 947 struct tipc_bclink *bclink; 948 struct tipc_link *bcl; 949 950 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); 951 if (!bcbearer) 952 return -ENOMEM; 953 954 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); 955 if (!bclink) { 956 kfree(bcbearer); 957 return -ENOMEM; 958 } 959 960 bcl = &bclink->link; 961 bcbearer->bearer.media = &bcbearer->media; 962 bcbearer->media.send_msg = tipc_bcbearer_send; 963 sprintf(bcbearer->media.name, "tipc-broadcast"); 964 965 spin_lock_init(&bclink->lock); 966 __skb_queue_head_init(&bcl->transmq); 967 __skb_queue_head_init(&bcl->backlogq); 968 __skb_queue_head_init(&bcl->deferdq); 969 skb_queue_head_init(&bcl->wakeupq); 970 bcl->snd_nxt = 1; 971 spin_lock_init(&bclink->node.lock); 972 __skb_queue_head_init(&bclink->arrvq); 973 skb_queue_head_init(&bclink->inputq); 974 bcl->owner = &bclink->node; 975 bcl->owner->net = net; 976 bcl->mtu = MAX_PKT_DEFAULT_MCAST; 977 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 978 bcl->bearer_id = MAX_BEARERS; 979 rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); 980 bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg; 981 msg_set_prevnode(bcl->pmsg, tn->own_addr); 982 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 983 tn->bcbearer = bcbearer; 984 tn->bclink = bclink; 985 tn->bcl = bcl; 986 return 0; 987 } 988 989 void tipc_bclink_stop(struct net *net) 990 { 991 struct tipc_net *tn = net_generic(net, tipc_net_id); 992 993 tipc_bclink_lock(net); 994 tipc_link_purge_queues(tn->bcl); 995 tipc_bclink_unlock(net); 996 997 RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL); 998 synchronize_net(); 999 kfree(tn->bcbearer); 1000 kfree(tn->bclink); 1001 } 1002 1003 /** 1004 * tipc_nmap_add - add a node to a node map 1005 */ 1006 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) 1007 { 1008 int n = tipc_node(node); 1009 int w = n / WSIZE; 1010 u32 mask = (1 << (n % WSIZE)); 1011 1012 if ((nm_ptr->map[w] & mask) == 0) { 1013 nm_ptr->count++; 1014 nm_ptr->map[w] |= mask; 1015 } 1016 } 1017 1018 /** 1019 * tipc_nmap_remove - remove a node from a node map 1020 */ 1021 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) 1022 { 1023 int n = tipc_node(node); 1024 int w = n / WSIZE; 1025 u32 mask = (1 << (n % WSIZE)); 1026 1027 if ((nm_ptr->map[w] & mask) != 0) { 1028 nm_ptr->map[w] &= ~mask; 1029 nm_ptr->count--; 1030 } 1031 } 1032 1033 /** 1034 * tipc_nmap_diff - find differences between node maps 1035 * @nm_a: input node map A 1036 * @nm_b: input node map B 1037 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B) 1038 */ 1039 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 1040 struct tipc_node_map *nm_b, 1041 struct tipc_node_map *nm_diff) 1042 { 1043 int stop = ARRAY_SIZE(nm_a->map); 1044 int w; 1045 int b; 1046 u32 map; 1047 1048 memset(nm_diff, 0, sizeof(*nm_diff)); 1049 for (w = 0; w < stop; w++) { 1050 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]); 1051 nm_diff->map[w] = map; 1052 if (map != 0) { 1053 for (b = 0 ; b < WSIZE; b++) { 1054 if (map & (1 << b)) 1055 nm_diff->count++; 1056 } 1057 } 1058 } 1059 } 1060