1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 #include "core.h" 37 #include "addr.h" 38 #include "group.h" 39 #include "bcast.h" 40 #include "topsrv.h" 41 #include "msg.h" 42 #include "socket.h" 43 #include "node.h" 44 #include "name_table.h" 45 #include "subscr.h" 46 47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 48 #define ADV_IDLE ADV_UNIT 49 #define ADV_ACTIVE (ADV_UNIT * 12) 50 51 enum mbr_state { 52 MBR_JOINING, 53 MBR_PUBLISHED, 54 MBR_JOINED, 55 MBR_PENDING, 56 MBR_ACTIVE, 57 MBR_RECLAIMING, 58 MBR_REMITTED, 59 MBR_LEAVING 60 }; 61 62 struct tipc_member { 63 struct rb_node tree_node; 64 struct list_head list; 65 struct list_head small_win; 66 struct sk_buff_head deferredq; 67 struct tipc_group *group; 68 u32 node; 69 u32 port; 70 u32 instance; 71 enum mbr_state state; 72 u16 advertised; 73 u16 window; 74 u16 bc_rcv_nxt; 75 u16 bc_syncpt; 76 u16 bc_acked; 77 }; 78 79 struct tipc_group { 80 struct rb_root members; 81 struct list_head small_win; 82 struct list_head pending; 83 struct list_head active; 84 struct tipc_nlist dests; 85 struct net *net; 86 int subid; 87 u32 type; 88 u32 instance; 89 u32 scope; 90 u32 portid; 91 u16 member_cnt; 92 u16 active_cnt; 93 u16 max_active; 94 u16 bc_snd_nxt; 95 u16 bc_ackers; 96 bool *open; 97 bool loopback; 98 bool events; 99 }; 100 101 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 102 int mtyp, struct sk_buff_head *xmitq); 103 104 static void tipc_group_open(struct tipc_member *m, bool *wakeup) 105 { 106 *wakeup = false; 107 if (list_empty(&m->small_win)) 108 return; 109 list_del_init(&m->small_win); 110 *m->group->open = true; 111 *wakeup = true; 112 } 113 114 static void tipc_group_decr_active(struct tipc_group *grp, 115 struct tipc_member *m) 116 { 117 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING || 118 m->state == MBR_REMITTED) 119 grp->active_cnt--; 120 } 121 122 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 123 { 124 int max_active, active_pool, idle_pool; 125 int mcnt = grp->member_cnt + 1; 126 127 /* Limit simultaneous reception from other members */ 128 max_active = min(mcnt / 8, 64); 129 max_active = max(max_active, 16); 130 grp->max_active = max_active; 131 132 /* Reserve blocks for active and idle members */ 133 active_pool = max_active * ADV_ACTIVE; 134 idle_pool = (mcnt - max_active) * ADV_IDLE; 135 136 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 137 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 138 } 139 140 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 141 { 142 return grp->bc_snd_nxt; 143 } 144 145 static bool tipc_group_is_receiver(struct tipc_member *m) 146 { 147 return m && m->state != MBR_JOINING && m->state != MBR_LEAVING; 148 } 149 150 static bool tipc_group_is_sender(struct tipc_member *m) 151 { 152 return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED; 153 } 154 155 u32 tipc_group_exclude(struct tipc_group *grp) 156 { 157 if (!grp->loopback) 158 return grp->portid; 159 return 0; 160 } 161 162 int tipc_group_size(struct tipc_group *grp) 163 { 164 return grp->member_cnt; 165 } 166 167 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 168 struct tipc_group_req *mreq, 169 bool *group_is_open) 170 { 171 u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS; 172 bool global = mreq->scope != TIPC_NODE_SCOPE; 173 struct tipc_group *grp; 174 u32 type = mreq->type; 175 176 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 177 if (!grp) 178 return NULL; 179 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 180 INIT_LIST_HEAD(&grp->small_win); 181 INIT_LIST_HEAD(&grp->active); 182 INIT_LIST_HEAD(&grp->pending); 183 grp->members = RB_ROOT; 184 grp->net = net; 185 grp->portid = portid; 186 grp->type = type; 187 grp->instance = mreq->instance; 188 grp->scope = mreq->scope; 189 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 190 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 191 grp->open = group_is_open; 192 *grp->open = false; 193 filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE; 194 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, 195 filter, &grp->subid)) 196 return grp; 197 kfree(grp); 198 return NULL; 199 } 200 201 void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf) 202 { 203 struct rb_root *tree = &grp->members; 204 struct tipc_member *m, *tmp; 205 struct sk_buff_head xmitq; 206 207 skb_queue_head_init(&xmitq); 208 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 209 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq); 210 tipc_group_update_member(m, 0); 211 } 212 tipc_node_distr_xmit(net, &xmitq); 213 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 214 } 215 216 void tipc_group_delete(struct net *net, struct tipc_group *grp) 217 { 218 struct rb_root *tree = &grp->members; 219 struct tipc_member *m, *tmp; 220 struct sk_buff_head xmitq; 221 222 __skb_queue_head_init(&xmitq); 223 224 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 225 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 226 list_del(&m->list); 227 kfree(m); 228 } 229 tipc_node_distr_xmit(net, &xmitq); 230 tipc_nlist_purge(&grp->dests); 231 tipc_topsrv_kern_unsubscr(net, grp->subid); 232 kfree(grp); 233 } 234 235 struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 236 u32 node, u32 port) 237 { 238 struct rb_node *n = grp->members.rb_node; 239 u64 nkey, key = (u64)node << 32 | port; 240 struct tipc_member *m; 241 242 while (n) { 243 m = container_of(n, struct tipc_member, tree_node); 244 nkey = (u64)m->node << 32 | m->port; 245 if (key < nkey) 246 n = n->rb_left; 247 else if (key > nkey) 248 n = n->rb_right; 249 else 250 return m; 251 } 252 return NULL; 253 } 254 255 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 256 u32 node, u32 port) 257 { 258 struct tipc_member *m; 259 260 m = tipc_group_find_member(grp, node, port); 261 if (m && tipc_group_is_receiver(m)) 262 return m; 263 return NULL; 264 } 265 266 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 267 u32 node) 268 { 269 struct tipc_member *m; 270 struct rb_node *n; 271 272 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 273 m = container_of(n, struct tipc_member, tree_node); 274 if (m->node == node) 275 return m; 276 } 277 return NULL; 278 } 279 280 static void tipc_group_add_to_tree(struct tipc_group *grp, 281 struct tipc_member *m) 282 { 283 u64 nkey, key = (u64)m->node << 32 | m->port; 284 struct rb_node **n, *parent = NULL; 285 struct tipc_member *tmp; 286 287 n = &grp->members.rb_node; 288 while (*n) { 289 tmp = container_of(*n, struct tipc_member, tree_node); 290 parent = *n; 291 tmp = container_of(parent, struct tipc_member, tree_node); 292 nkey = (u64)tmp->node << 32 | tmp->port; 293 if (key < nkey) 294 n = &(*n)->rb_left; 295 else if (key > nkey) 296 n = &(*n)->rb_right; 297 else 298 return; 299 } 300 rb_link_node(&m->tree_node, parent, n); 301 rb_insert_color(&m->tree_node, &grp->members); 302 } 303 304 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 305 u32 node, u32 port, 306 u32 instance, int state) 307 { 308 struct tipc_member *m; 309 310 m = kzalloc(sizeof(*m), GFP_ATOMIC); 311 if (!m) 312 return NULL; 313 INIT_LIST_HEAD(&m->list); 314 INIT_LIST_HEAD(&m->small_win); 315 __skb_queue_head_init(&m->deferredq); 316 m->group = grp; 317 m->node = node; 318 m->port = port; 319 m->instance = instance; 320 m->bc_acked = grp->bc_snd_nxt - 1; 321 grp->member_cnt++; 322 tipc_group_add_to_tree(grp, m); 323 tipc_nlist_add(&grp->dests, m->node); 324 m->state = state; 325 return m; 326 } 327 328 void tipc_group_add_member(struct tipc_group *grp, u32 node, 329 u32 port, u32 instance) 330 { 331 tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED); 332 } 333 334 static void tipc_group_delete_member(struct tipc_group *grp, 335 struct tipc_member *m) 336 { 337 rb_erase(&m->tree_node, &grp->members); 338 grp->member_cnt--; 339 340 /* Check if we were waiting for replicast ack from this member */ 341 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 342 grp->bc_ackers--; 343 344 list_del_init(&m->list); 345 list_del_init(&m->small_win); 346 tipc_group_decr_active(grp, m); 347 348 /* If last member on a node, remove node from dest list */ 349 if (!tipc_group_find_node(grp, m->node)) 350 tipc_nlist_del(&grp->dests, m->node); 351 352 kfree(m); 353 } 354 355 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 356 { 357 return &grp->dests; 358 } 359 360 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 361 int *scope) 362 { 363 seq->type = grp->type; 364 seq->lower = grp->instance; 365 seq->upper = grp->instance; 366 *scope = grp->scope; 367 } 368 369 void tipc_group_update_member(struct tipc_member *m, int len) 370 { 371 struct tipc_group *grp = m->group; 372 struct tipc_member *_m, *tmp; 373 374 if (!tipc_group_is_receiver(m)) 375 return; 376 377 m->window -= len; 378 379 if (m->window >= ADV_IDLE) 380 return; 381 382 list_del_init(&m->small_win); 383 384 /* Sort member into small_window members' list */ 385 list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) { 386 if (_m->window > m->window) 387 break; 388 } 389 list_add_tail(&m->small_win, &_m->small_win); 390 } 391 392 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 393 { 394 u16 prev = grp->bc_snd_nxt - 1; 395 struct tipc_member *m; 396 struct rb_node *n; 397 u16 ackers = 0; 398 399 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 400 m = container_of(n, struct tipc_member, tree_node); 401 if (tipc_group_is_receiver(m)) { 402 tipc_group_update_member(m, len); 403 m->bc_acked = prev; 404 ackers++; 405 } 406 } 407 408 /* Mark number of acknowledges to expect, if any */ 409 if (ack) 410 grp->bc_ackers = ackers; 411 grp->bc_snd_nxt++; 412 } 413 414 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 415 int len, struct tipc_member **mbr) 416 { 417 struct sk_buff_head xmitq; 418 struct tipc_member *m; 419 int adv, state; 420 421 m = tipc_group_find_dest(grp, dnode, dport); 422 if (!tipc_group_is_receiver(m)) { 423 *mbr = NULL; 424 return false; 425 } 426 *mbr = m; 427 428 if (m->window >= len) 429 return false; 430 431 *grp->open = false; 432 433 /* If not fully advertised, do it now to prevent mutual blocking */ 434 adv = m->advertised; 435 state = m->state; 436 if (state == MBR_JOINED && adv == ADV_IDLE) 437 return true; 438 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 439 return true; 440 if (state == MBR_PENDING && adv == ADV_IDLE) 441 return true; 442 skb_queue_head_init(&xmitq); 443 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 444 tipc_node_distr_xmit(grp->net, &xmitq); 445 return true; 446 } 447 448 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 449 { 450 struct tipc_member *m = NULL; 451 452 /* If prev bcast was replicast, reject until all receivers have acked */ 453 if (grp->bc_ackers) { 454 *grp->open = false; 455 return true; 456 } 457 if (list_empty(&grp->small_win)) 458 return false; 459 460 m = list_first_entry(&grp->small_win, struct tipc_member, small_win); 461 if (m->window >= len) 462 return false; 463 464 return tipc_group_cong(grp, m->node, m->port, len, &m); 465 } 466 467 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 468 */ 469 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 470 { 471 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 472 u16 bc_seqno = msg_grp_bc_seqno(hdr); 473 struct sk_buff *_skb, *tmp; 474 int mtyp = msg_type(hdr); 475 476 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 477 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 478 skb_queue_walk_safe(defq, _skb, tmp) { 479 _hdr = buf_msg(_skb); 480 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 481 continue; 482 __skb_queue_before(defq, _skb, skb); 483 return; 484 } 485 /* Bcast was not bypassed, - add to tail */ 486 } 487 /* Unicasts are never bypassed, - always add to tail */ 488 __skb_queue_tail(defq, skb); 489 } 490 491 /* tipc_group_filter_msg() - determine if we should accept arriving message 492 */ 493 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 494 struct sk_buff_head *xmitq) 495 { 496 struct sk_buff *skb = __skb_dequeue(inputq); 497 bool ack, deliver, update, leave = false; 498 struct sk_buff_head *defq; 499 struct tipc_member *m; 500 struct tipc_msg *hdr; 501 u32 node, port; 502 int mtyp, blks; 503 504 if (!skb) 505 return; 506 507 hdr = buf_msg(skb); 508 node = msg_orignode(hdr); 509 port = msg_origport(hdr); 510 511 if (!msg_in_group(hdr)) 512 goto drop; 513 514 m = tipc_group_find_member(grp, node, port); 515 if (!tipc_group_is_sender(m)) 516 goto drop; 517 518 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 519 goto drop; 520 521 TIPC_SKB_CB(skb)->orig_member = m->instance; 522 defq = &m->deferredq; 523 tipc_group_sort_msg(skb, defq); 524 525 while ((skb = skb_peek(defq))) { 526 hdr = buf_msg(skb); 527 mtyp = msg_type(hdr); 528 blks = msg_blocks(hdr); 529 deliver = true; 530 ack = false; 531 update = false; 532 533 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 534 break; 535 536 /* Decide what to do with message */ 537 switch (mtyp) { 538 case TIPC_GRP_MCAST_MSG: 539 if (msg_nameinst(hdr) != grp->instance) { 540 update = true; 541 deliver = false; 542 } 543 /* Fall thru */ 544 case TIPC_GRP_BCAST_MSG: 545 m->bc_rcv_nxt++; 546 ack = msg_grp_bc_ack_req(hdr); 547 break; 548 case TIPC_GRP_UCAST_MSG: 549 break; 550 case TIPC_GRP_MEMBER_EVT: 551 if (m->state == MBR_LEAVING) 552 leave = true; 553 if (!grp->events) 554 deliver = false; 555 break; 556 default: 557 break; 558 } 559 560 /* Execute decisions */ 561 __skb_dequeue(defq); 562 if (deliver) 563 __skb_queue_tail(inputq, skb); 564 else 565 kfree_skb(skb); 566 567 if (ack) 568 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 569 570 if (leave) { 571 __skb_queue_purge(defq); 572 tipc_group_delete_member(grp, m); 573 break; 574 } 575 if (!update) 576 continue; 577 578 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 579 } 580 return; 581 drop: 582 kfree_skb(skb); 583 } 584 585 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 586 u32 port, struct sk_buff_head *xmitq) 587 { 588 struct list_head *active = &grp->active; 589 int max_active = grp->max_active; 590 int reclaim_limit = max_active * 3 / 4; 591 int active_cnt = grp->active_cnt; 592 struct tipc_member *m, *rm, *pm; 593 594 m = tipc_group_find_member(grp, node, port); 595 if (!m) 596 return; 597 598 m->advertised -= blks; 599 600 switch (m->state) { 601 case MBR_JOINED: 602 /* First, decide if member can go active */ 603 if (active_cnt <= max_active) { 604 m->state = MBR_ACTIVE; 605 list_add_tail(&m->list, active); 606 grp->active_cnt++; 607 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 608 } else { 609 m->state = MBR_PENDING; 610 list_add_tail(&m->list, &grp->pending); 611 } 612 613 if (active_cnt < reclaim_limit) 614 break; 615 616 /* Reclaim from oldest active member, if possible */ 617 if (!list_empty(active)) { 618 rm = list_first_entry(active, struct tipc_member, list); 619 rm->state = MBR_RECLAIMING; 620 list_del_init(&rm->list); 621 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 622 break; 623 } 624 /* Nobody to reclaim from; - revert oldest pending to JOINED */ 625 pm = list_first_entry(&grp->pending, struct tipc_member, list); 626 list_del_init(&pm->list); 627 pm->state = MBR_JOINED; 628 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 629 break; 630 case MBR_ACTIVE: 631 if (!list_is_last(&m->list, &grp->active)) 632 list_move_tail(&m->list, &grp->active); 633 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 634 break; 635 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 636 break; 637 case MBR_REMITTED: 638 if (m->advertised > ADV_IDLE) 639 break; 640 m->state = MBR_JOINED; 641 grp->active_cnt--; 642 if (m->advertised < ADV_IDLE) { 643 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 644 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 645 } 646 647 if (list_empty(&grp->pending)) 648 return; 649 650 /* Set oldest pending member to active and advertise */ 651 pm = list_first_entry(&grp->pending, struct tipc_member, list); 652 pm->state = MBR_ACTIVE; 653 list_move_tail(&pm->list, &grp->active); 654 grp->active_cnt++; 655 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 656 break; 657 case MBR_RECLAIMING: 658 case MBR_JOINING: 659 case MBR_LEAVING: 660 default: 661 break; 662 } 663 } 664 665 static void tipc_group_create_event(struct tipc_group *grp, 666 struct tipc_member *m, 667 u32 event, u16 seqno, 668 struct sk_buff_head *inputq) 669 { u32 dnode = tipc_own_addr(grp->net); 670 struct tipc_event evt; 671 struct sk_buff *skb; 672 struct tipc_msg *hdr; 673 674 evt.event = event; 675 evt.found_lower = m->instance; 676 evt.found_upper = m->instance; 677 evt.port.ref = m->port; 678 evt.port.node = m->node; 679 evt.s.seq.type = grp->type; 680 evt.s.seq.lower = m->instance; 681 evt.s.seq.upper = m->instance; 682 683 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT, 684 GROUP_H_SIZE, sizeof(evt), dnode, m->node, 685 grp->portid, m->port, 0); 686 if (!skb) 687 return; 688 689 hdr = buf_msg(skb); 690 msg_set_nametype(hdr, grp->type); 691 msg_set_grp_evt(hdr, event); 692 msg_set_dest_droppable(hdr, true); 693 msg_set_grp_bc_seqno(hdr, seqno); 694 memcpy(msg_data(hdr), &evt, sizeof(evt)); 695 TIPC_SKB_CB(skb)->orig_member = m->instance; 696 __skb_queue_tail(inputq, skb); 697 } 698 699 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 700 int mtyp, struct sk_buff_head *xmitq) 701 { 702 struct tipc_msg *hdr; 703 struct sk_buff *skb; 704 int adv = 0; 705 706 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 707 m->node, tipc_own_addr(grp->net), 708 m->port, grp->portid, 0); 709 if (!skb) 710 return; 711 712 if (m->state == MBR_ACTIVE) 713 adv = ADV_ACTIVE - m->advertised; 714 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 715 adv = ADV_IDLE - m->advertised; 716 717 hdr = buf_msg(skb); 718 719 if (mtyp == GRP_JOIN_MSG) { 720 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 721 msg_set_adv_win(hdr, adv); 722 m->advertised += adv; 723 } else if (mtyp == GRP_LEAVE_MSG) { 724 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 725 } else if (mtyp == GRP_ADV_MSG) { 726 msg_set_adv_win(hdr, adv); 727 m->advertised += adv; 728 } else if (mtyp == GRP_ACK_MSG) { 729 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 730 } else if (mtyp == GRP_REMIT_MSG) { 731 msg_set_grp_remitted(hdr, m->window); 732 } 733 msg_set_dest_droppable(hdr, true); 734 __skb_queue_tail(xmitq, skb); 735 } 736 737 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 738 struct tipc_msg *hdr, struct sk_buff_head *inputq, 739 struct sk_buff_head *xmitq) 740 { 741 u32 node = msg_orignode(hdr); 742 u32 port = msg_origport(hdr); 743 struct tipc_member *m, *pm; 744 u16 remitted, in_flight; 745 746 if (!grp) 747 return; 748 749 if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) 750 return; 751 752 m = tipc_group_find_member(grp, node, port); 753 754 switch (msg_type(hdr)) { 755 case GRP_JOIN_MSG: 756 if (!m) 757 m = tipc_group_create_member(grp, node, port, 758 0, MBR_JOINING); 759 if (!m) 760 return; 761 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 762 m->bc_rcv_nxt = m->bc_syncpt; 763 m->window += msg_adv_win(hdr); 764 765 /* Wait until PUBLISH event is received if necessary */ 766 if (m->state != MBR_PUBLISHED) 767 return; 768 769 /* Member can be taken into service */ 770 m->state = MBR_JOINED; 771 tipc_group_open(m, usr_wakeup); 772 tipc_group_update_member(m, 0); 773 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 774 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 775 m->bc_syncpt, inputq); 776 return; 777 case GRP_LEAVE_MSG: 778 if (!m) 779 return; 780 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 781 list_del_init(&m->list); 782 tipc_group_open(m, usr_wakeup); 783 tipc_group_decr_active(grp, m); 784 m->state = MBR_LEAVING; 785 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 786 m->bc_syncpt, inputq); 787 return; 788 case GRP_ADV_MSG: 789 if (!m) 790 return; 791 m->window += msg_adv_win(hdr); 792 tipc_group_open(m, usr_wakeup); 793 return; 794 case GRP_ACK_MSG: 795 if (!m) 796 return; 797 m->bc_acked = msg_grp_bc_acked(hdr); 798 if (--grp->bc_ackers) 799 return; 800 list_del_init(&m->small_win); 801 *m->group->open = true; 802 *usr_wakeup = true; 803 tipc_group_update_member(m, 0); 804 return; 805 case GRP_RECLAIM_MSG: 806 if (!m) 807 return; 808 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 809 m->window = ADV_IDLE; 810 tipc_group_open(m, usr_wakeup); 811 return; 812 case GRP_REMIT_MSG: 813 if (!m || m->state != MBR_RECLAIMING) 814 return; 815 816 remitted = msg_grp_remitted(hdr); 817 818 /* Messages preceding the REMIT still in receive queue */ 819 if (m->advertised > remitted) { 820 m->state = MBR_REMITTED; 821 in_flight = m->advertised - remitted; 822 m->advertised = ADV_IDLE + in_flight; 823 return; 824 } 825 /* This should never happen */ 826 if (m->advertised < remitted) 827 pr_warn_ratelimited("Unexpected REMIT msg\n"); 828 829 /* All messages preceding the REMIT have been read */ 830 m->state = MBR_JOINED; 831 grp->active_cnt--; 832 m->advertised = ADV_IDLE; 833 834 /* Set oldest pending member to active and advertise */ 835 if (list_empty(&grp->pending)) 836 return; 837 pm = list_first_entry(&grp->pending, struct tipc_member, list); 838 pm->state = MBR_ACTIVE; 839 list_move_tail(&pm->list, &grp->active); 840 grp->active_cnt++; 841 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 842 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 843 return; 844 default: 845 pr_warn("Received unknown GROUP_PROTO message\n"); 846 } 847 } 848 849 /* tipc_group_member_evt() - receive and handle a member up/down event 850 */ 851 void tipc_group_member_evt(struct tipc_group *grp, 852 bool *usr_wakeup, 853 int *sk_rcvbuf, 854 struct tipc_msg *hdr, 855 struct sk_buff_head *inputq, 856 struct sk_buff_head *xmitq) 857 { 858 struct tipc_event *evt = (void *)msg_data(hdr); 859 u32 instance = evt->found_lower; 860 u32 node = evt->port.node; 861 u32 port = evt->port.ref; 862 int event = evt->event; 863 struct tipc_member *m; 864 struct net *net; 865 u32 self; 866 867 if (!grp) 868 return; 869 870 net = grp->net; 871 self = tipc_own_addr(net); 872 if (!grp->loopback && node == self && port == grp->portid) 873 return; 874 875 m = tipc_group_find_member(grp, node, port); 876 877 switch (event) { 878 case TIPC_PUBLISHED: 879 /* Send and wait for arrival of JOIN message if necessary */ 880 if (!m) { 881 m = tipc_group_create_member(grp, node, port, instance, 882 MBR_PUBLISHED); 883 if (!m) 884 break; 885 tipc_group_update_member(m, 0); 886 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 887 break; 888 } 889 890 if (m->state != MBR_JOINING) 891 break; 892 893 /* Member can be taken into service */ 894 m->instance = instance; 895 m->state = MBR_JOINED; 896 tipc_group_open(m, usr_wakeup); 897 tipc_group_update_member(m, 0); 898 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 899 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 900 m->bc_syncpt, inputq); 901 break; 902 case TIPC_WITHDRAWN: 903 if (!m) 904 break; 905 906 tipc_group_decr_active(grp, m); 907 m->state = MBR_LEAVING; 908 list_del_init(&m->list); 909 tipc_group_open(m, usr_wakeup); 910 911 /* Only send event if no LEAVE message can be expected */ 912 if (!tipc_node_is_up(net, node)) 913 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 914 m->bc_rcv_nxt, inputq); 915 break; 916 default: 917 break; 918 } 919 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 920 } 921