1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 #include "core.h" 37 #include "addr.h" 38 #include "group.h" 39 #include "bcast.h" 40 #include "server.h" 41 #include "msg.h" 42 #include "socket.h" 43 #include "node.h" 44 #include "name_table.h" 45 #include "subscr.h" 46 47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 48 #define ADV_IDLE ADV_UNIT 49 #define ADV_ACTIVE (ADV_UNIT * 12) 50 51 enum mbr_state { 52 MBR_JOINING, 53 MBR_PUBLISHED, 54 MBR_JOINED, 55 MBR_PENDING, 56 MBR_ACTIVE, 57 MBR_RECLAIMING, 58 MBR_REMITTED, 59 MBR_LEAVING 60 }; 61 62 struct tipc_member { 63 struct rb_node tree_node; 64 struct list_head list; 65 struct list_head small_win; 66 struct sk_buff_head deferredq; 67 struct tipc_group *group; 68 u32 node; 69 u32 port; 70 u32 instance; 71 enum mbr_state state; 72 u16 advertised; 73 u16 window; 74 u16 bc_rcv_nxt; 75 u16 bc_syncpt; 76 u16 bc_acked; 77 }; 78 79 struct tipc_group { 80 struct rb_root members; 81 struct list_head small_win; 82 struct list_head pending; 83 struct list_head active; 84 struct tipc_nlist dests; 85 struct net *net; 86 int subid; 87 u32 type; 88 u32 instance; 89 u32 scope; 90 u32 portid; 91 u16 member_cnt; 92 u16 active_cnt; 93 u16 max_active; 94 u16 bc_snd_nxt; 95 u16 bc_ackers; 96 bool loopback; 97 bool events; 98 bool open; 99 }; 100 101 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 102 int mtyp, struct sk_buff_head *xmitq); 103 104 bool tipc_group_is_open(struct tipc_group *grp) 105 { 106 return grp->open; 107 } 108 109 static void tipc_group_open(struct tipc_member *m, bool *wakeup) 110 { 111 *wakeup = false; 112 if (list_empty(&m->small_win)) 113 return; 114 list_del_init(&m->small_win); 115 m->group->open = true; 116 *wakeup = true; 117 } 118 119 static void tipc_group_decr_active(struct tipc_group *grp, 120 struct tipc_member *m) 121 { 122 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING || 123 m->state == MBR_REMITTED) 124 grp->active_cnt--; 125 } 126 127 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 128 { 129 int max_active, active_pool, idle_pool; 130 int mcnt = grp->member_cnt + 1; 131 132 /* Limit simultaneous reception from other members */ 133 max_active = min(mcnt / 8, 64); 134 max_active = max(max_active, 16); 135 grp->max_active = max_active; 136 137 /* Reserve blocks for active and idle members */ 138 active_pool = max_active * ADV_ACTIVE; 139 idle_pool = (mcnt - max_active) * ADV_IDLE; 140 141 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 142 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 143 } 144 145 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 146 { 147 return grp->bc_snd_nxt; 148 } 149 150 static bool tipc_group_is_receiver(struct tipc_member *m) 151 { 152 return m && m->state != MBR_JOINING && m->state != MBR_LEAVING; 153 } 154 155 static bool tipc_group_is_sender(struct tipc_member *m) 156 { 157 return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED; 158 } 159 160 u32 tipc_group_exclude(struct tipc_group *grp) 161 { 162 if (!grp->loopback) 163 return grp->portid; 164 return 0; 165 } 166 167 int tipc_group_size(struct tipc_group *grp) 168 { 169 return grp->member_cnt; 170 } 171 172 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 173 struct tipc_group_req *mreq) 174 { 175 u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS; 176 bool global = mreq->scope != TIPC_NODE_SCOPE; 177 struct tipc_group *grp; 178 u32 type = mreq->type; 179 180 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 181 if (!grp) 182 return NULL; 183 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 184 INIT_LIST_HEAD(&grp->small_win); 185 INIT_LIST_HEAD(&grp->active); 186 INIT_LIST_HEAD(&grp->pending); 187 grp->members = RB_ROOT; 188 grp->net = net; 189 grp->portid = portid; 190 grp->type = type; 191 grp->instance = mreq->instance; 192 grp->scope = mreq->scope; 193 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 194 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 195 filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE; 196 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, 197 filter, &grp->subid)) 198 return grp; 199 kfree(grp); 200 return NULL; 201 } 202 203 void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf) 204 { 205 struct rb_root *tree = &grp->members; 206 struct tipc_member *m, *tmp; 207 struct sk_buff_head xmitq; 208 209 skb_queue_head_init(&xmitq); 210 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 211 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq); 212 tipc_group_update_member(m, 0); 213 } 214 tipc_node_distr_xmit(net, &xmitq); 215 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 216 } 217 218 void tipc_group_delete(struct net *net, struct tipc_group *grp) 219 { 220 struct rb_root *tree = &grp->members; 221 struct tipc_member *m, *tmp; 222 struct sk_buff_head xmitq; 223 224 __skb_queue_head_init(&xmitq); 225 226 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 227 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 228 list_del(&m->list); 229 kfree(m); 230 } 231 tipc_node_distr_xmit(net, &xmitq); 232 tipc_nlist_purge(&grp->dests); 233 tipc_topsrv_kern_unsubscr(net, grp->subid); 234 kfree(grp); 235 } 236 237 struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 238 u32 node, u32 port) 239 { 240 struct rb_node *n = grp->members.rb_node; 241 u64 nkey, key = (u64)node << 32 | port; 242 struct tipc_member *m; 243 244 while (n) { 245 m = container_of(n, struct tipc_member, tree_node); 246 nkey = (u64)m->node << 32 | m->port; 247 if (key < nkey) 248 n = n->rb_left; 249 else if (key > nkey) 250 n = n->rb_right; 251 else 252 return m; 253 } 254 return NULL; 255 } 256 257 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 258 u32 node, u32 port) 259 { 260 struct tipc_member *m; 261 262 m = tipc_group_find_member(grp, node, port); 263 if (m && tipc_group_is_receiver(m)) 264 return m; 265 return NULL; 266 } 267 268 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 269 u32 node) 270 { 271 struct tipc_member *m; 272 struct rb_node *n; 273 274 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 275 m = container_of(n, struct tipc_member, tree_node); 276 if (m->node == node) 277 return m; 278 } 279 return NULL; 280 } 281 282 static void tipc_group_add_to_tree(struct tipc_group *grp, 283 struct tipc_member *m) 284 { 285 u64 nkey, key = (u64)m->node << 32 | m->port; 286 struct rb_node **n, *parent = NULL; 287 struct tipc_member *tmp; 288 289 n = &grp->members.rb_node; 290 while (*n) { 291 tmp = container_of(*n, struct tipc_member, tree_node); 292 parent = *n; 293 tmp = container_of(parent, struct tipc_member, tree_node); 294 nkey = (u64)tmp->node << 32 | tmp->port; 295 if (key < nkey) 296 n = &(*n)->rb_left; 297 else if (key > nkey) 298 n = &(*n)->rb_right; 299 else 300 return; 301 } 302 rb_link_node(&m->tree_node, parent, n); 303 rb_insert_color(&m->tree_node, &grp->members); 304 } 305 306 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 307 u32 node, u32 port, 308 u32 instance, int state) 309 { 310 struct tipc_member *m; 311 312 m = kzalloc(sizeof(*m), GFP_ATOMIC); 313 if (!m) 314 return NULL; 315 INIT_LIST_HEAD(&m->list); 316 INIT_LIST_HEAD(&m->small_win); 317 __skb_queue_head_init(&m->deferredq); 318 m->group = grp; 319 m->node = node; 320 m->port = port; 321 m->instance = instance; 322 m->bc_acked = grp->bc_snd_nxt - 1; 323 grp->member_cnt++; 324 tipc_group_add_to_tree(grp, m); 325 tipc_nlist_add(&grp->dests, m->node); 326 m->state = state; 327 return m; 328 } 329 330 void tipc_group_add_member(struct tipc_group *grp, u32 node, 331 u32 port, u32 instance) 332 { 333 tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED); 334 } 335 336 static void tipc_group_delete_member(struct tipc_group *grp, 337 struct tipc_member *m) 338 { 339 rb_erase(&m->tree_node, &grp->members); 340 grp->member_cnt--; 341 342 /* Check if we were waiting for replicast ack from this member */ 343 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 344 grp->bc_ackers--; 345 346 list_del_init(&m->list); 347 list_del_init(&m->small_win); 348 tipc_group_decr_active(grp, m); 349 350 /* If last member on a node, remove node from dest list */ 351 if (!tipc_group_find_node(grp, m->node)) 352 tipc_nlist_del(&grp->dests, m->node); 353 354 kfree(m); 355 } 356 357 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 358 { 359 return &grp->dests; 360 } 361 362 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 363 int *scope) 364 { 365 seq->type = grp->type; 366 seq->lower = grp->instance; 367 seq->upper = grp->instance; 368 *scope = grp->scope; 369 } 370 371 void tipc_group_update_member(struct tipc_member *m, int len) 372 { 373 struct tipc_group *grp = m->group; 374 struct tipc_member *_m, *tmp; 375 376 if (!tipc_group_is_receiver(m)) 377 return; 378 379 m->window -= len; 380 381 if (m->window >= ADV_IDLE) 382 return; 383 384 list_del_init(&m->small_win); 385 386 /* Sort member into small_window members' list */ 387 list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) { 388 if (_m->window > m->window) 389 break; 390 } 391 list_add_tail(&m->small_win, &_m->small_win); 392 } 393 394 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 395 { 396 u16 prev = grp->bc_snd_nxt - 1; 397 struct tipc_member *m; 398 struct rb_node *n; 399 u16 ackers = 0; 400 401 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 402 m = container_of(n, struct tipc_member, tree_node); 403 if (tipc_group_is_receiver(m)) { 404 tipc_group_update_member(m, len); 405 m->bc_acked = prev; 406 ackers++; 407 } 408 } 409 410 /* Mark number of acknowledges to expect, if any */ 411 if (ack) 412 grp->bc_ackers = ackers; 413 grp->bc_snd_nxt++; 414 } 415 416 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 417 int len, struct tipc_member **mbr) 418 { 419 struct sk_buff_head xmitq; 420 struct tipc_member *m; 421 int adv, state; 422 423 m = tipc_group_find_dest(grp, dnode, dport); 424 if (!tipc_group_is_receiver(m)) { 425 *mbr = NULL; 426 return false; 427 } 428 *mbr = m; 429 430 if (m->window >= len) 431 return false; 432 433 grp->open = false; 434 435 /* If not fully advertised, do it now to prevent mutual blocking */ 436 adv = m->advertised; 437 state = m->state; 438 if (state == MBR_JOINED && adv == ADV_IDLE) 439 return true; 440 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 441 return true; 442 if (state == MBR_PENDING && adv == ADV_IDLE) 443 return true; 444 skb_queue_head_init(&xmitq); 445 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 446 tipc_node_distr_xmit(grp->net, &xmitq); 447 return true; 448 } 449 450 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 451 { 452 struct tipc_member *m = NULL; 453 454 /* If prev bcast was replicast, reject until all receivers have acked */ 455 if (grp->bc_ackers) { 456 grp->open = false; 457 return true; 458 } 459 if (list_empty(&grp->small_win)) 460 return false; 461 462 m = list_first_entry(&grp->small_win, struct tipc_member, small_win); 463 if (m->window >= len) 464 return false; 465 466 return tipc_group_cong(grp, m->node, m->port, len, &m); 467 } 468 469 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 470 */ 471 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 472 { 473 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 474 u16 bc_seqno = msg_grp_bc_seqno(hdr); 475 struct sk_buff *_skb, *tmp; 476 int mtyp = msg_type(hdr); 477 478 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 479 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 480 skb_queue_walk_safe(defq, _skb, tmp) { 481 _hdr = buf_msg(_skb); 482 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 483 continue; 484 __skb_queue_before(defq, _skb, skb); 485 return; 486 } 487 /* Bcast was not bypassed, - add to tail */ 488 } 489 /* Unicasts are never bypassed, - always add to tail */ 490 __skb_queue_tail(defq, skb); 491 } 492 493 /* tipc_group_filter_msg() - determine if we should accept arriving message 494 */ 495 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 496 struct sk_buff_head *xmitq) 497 { 498 struct sk_buff *skb = __skb_dequeue(inputq); 499 bool ack, deliver, update, leave = false; 500 struct sk_buff_head *defq; 501 struct tipc_member *m; 502 struct tipc_msg *hdr; 503 u32 node, port; 504 int mtyp, blks; 505 506 if (!skb) 507 return; 508 509 hdr = buf_msg(skb); 510 node = msg_orignode(hdr); 511 port = msg_origport(hdr); 512 513 if (!msg_in_group(hdr)) 514 goto drop; 515 516 m = tipc_group_find_member(grp, node, port); 517 if (!tipc_group_is_sender(m)) 518 goto drop; 519 520 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 521 goto drop; 522 523 TIPC_SKB_CB(skb)->orig_member = m->instance; 524 defq = &m->deferredq; 525 tipc_group_sort_msg(skb, defq); 526 527 while ((skb = skb_peek(defq))) { 528 hdr = buf_msg(skb); 529 mtyp = msg_type(hdr); 530 blks = msg_blocks(hdr); 531 deliver = true; 532 ack = false; 533 update = false; 534 535 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 536 break; 537 538 /* Decide what to do with message */ 539 switch (mtyp) { 540 case TIPC_GRP_MCAST_MSG: 541 if (msg_nameinst(hdr) != grp->instance) { 542 update = true; 543 deliver = false; 544 } 545 /* Fall thru */ 546 case TIPC_GRP_BCAST_MSG: 547 m->bc_rcv_nxt++; 548 ack = msg_grp_bc_ack_req(hdr); 549 break; 550 case TIPC_GRP_UCAST_MSG: 551 break; 552 case TIPC_GRP_MEMBER_EVT: 553 if (m->state == MBR_LEAVING) 554 leave = true; 555 if (!grp->events) 556 deliver = false; 557 break; 558 default: 559 break; 560 } 561 562 /* Execute decisions */ 563 __skb_dequeue(defq); 564 if (deliver) 565 __skb_queue_tail(inputq, skb); 566 else 567 kfree_skb(skb); 568 569 if (ack) 570 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 571 572 if (leave) { 573 __skb_queue_purge(defq); 574 tipc_group_delete_member(grp, m); 575 break; 576 } 577 if (!update) 578 continue; 579 580 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 581 } 582 return; 583 drop: 584 kfree_skb(skb); 585 } 586 587 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 588 u32 port, struct sk_buff_head *xmitq) 589 { 590 struct list_head *active = &grp->active; 591 int max_active = grp->max_active; 592 int reclaim_limit = max_active * 3 / 4; 593 int active_cnt = grp->active_cnt; 594 struct tipc_member *m, *rm, *pm; 595 596 m = tipc_group_find_member(grp, node, port); 597 if (!m) 598 return; 599 600 m->advertised -= blks; 601 602 switch (m->state) { 603 case MBR_JOINED: 604 /* First, decide if member can go active */ 605 if (active_cnt <= max_active) { 606 m->state = MBR_ACTIVE; 607 list_add_tail(&m->list, active); 608 grp->active_cnt++; 609 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 610 } else { 611 m->state = MBR_PENDING; 612 list_add_tail(&m->list, &grp->pending); 613 } 614 615 if (active_cnt < reclaim_limit) 616 break; 617 618 /* Reclaim from oldest active member, if possible */ 619 if (!list_empty(active)) { 620 rm = list_first_entry(active, struct tipc_member, list); 621 rm->state = MBR_RECLAIMING; 622 list_del_init(&rm->list); 623 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 624 break; 625 } 626 /* Nobody to reclaim from; - revert oldest pending to JOINED */ 627 pm = list_first_entry(&grp->pending, struct tipc_member, list); 628 list_del_init(&pm->list); 629 pm->state = MBR_JOINED; 630 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 631 break; 632 case MBR_ACTIVE: 633 if (!list_is_last(&m->list, &grp->active)) 634 list_move_tail(&m->list, &grp->active); 635 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 636 break; 637 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 638 break; 639 case MBR_REMITTED: 640 if (m->advertised > ADV_IDLE) 641 break; 642 m->state = MBR_JOINED; 643 grp->active_cnt--; 644 if (m->advertised < ADV_IDLE) { 645 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 646 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 647 } 648 649 if (list_empty(&grp->pending)) 650 return; 651 652 /* Set oldest pending member to active and advertise */ 653 pm = list_first_entry(&grp->pending, struct tipc_member, list); 654 pm->state = MBR_ACTIVE; 655 list_move_tail(&pm->list, &grp->active); 656 grp->active_cnt++; 657 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 658 break; 659 case MBR_RECLAIMING: 660 case MBR_JOINING: 661 case MBR_LEAVING: 662 default: 663 break; 664 } 665 } 666 667 static void tipc_group_create_event(struct tipc_group *grp, 668 struct tipc_member *m, 669 u32 event, u16 seqno, 670 struct sk_buff_head *inputq) 671 { u32 dnode = tipc_own_addr(grp->net); 672 struct tipc_event evt; 673 struct sk_buff *skb; 674 struct tipc_msg *hdr; 675 676 evt.event = event; 677 evt.found_lower = m->instance; 678 evt.found_upper = m->instance; 679 evt.port.ref = m->port; 680 evt.port.node = m->node; 681 evt.s.seq.type = grp->type; 682 evt.s.seq.lower = m->instance; 683 evt.s.seq.upper = m->instance; 684 685 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT, 686 GROUP_H_SIZE, sizeof(evt), dnode, m->node, 687 grp->portid, m->port, 0); 688 if (!skb) 689 return; 690 691 hdr = buf_msg(skb); 692 msg_set_nametype(hdr, grp->type); 693 msg_set_grp_evt(hdr, event); 694 msg_set_dest_droppable(hdr, true); 695 msg_set_grp_bc_seqno(hdr, seqno); 696 memcpy(msg_data(hdr), &evt, sizeof(evt)); 697 TIPC_SKB_CB(skb)->orig_member = m->instance; 698 __skb_queue_tail(inputq, skb); 699 } 700 701 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 702 int mtyp, struct sk_buff_head *xmitq) 703 { 704 struct tipc_msg *hdr; 705 struct sk_buff *skb; 706 int adv = 0; 707 708 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 709 m->node, tipc_own_addr(grp->net), 710 m->port, grp->portid, 0); 711 if (!skb) 712 return; 713 714 if (m->state == MBR_ACTIVE) 715 adv = ADV_ACTIVE - m->advertised; 716 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 717 adv = ADV_IDLE - m->advertised; 718 719 hdr = buf_msg(skb); 720 721 if (mtyp == GRP_JOIN_MSG) { 722 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 723 msg_set_adv_win(hdr, adv); 724 m->advertised += adv; 725 } else if (mtyp == GRP_LEAVE_MSG) { 726 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 727 } else if (mtyp == GRP_ADV_MSG) { 728 msg_set_adv_win(hdr, adv); 729 m->advertised += adv; 730 } else if (mtyp == GRP_ACK_MSG) { 731 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 732 } else if (mtyp == GRP_REMIT_MSG) { 733 msg_set_grp_remitted(hdr, m->window); 734 } 735 msg_set_dest_droppable(hdr, true); 736 __skb_queue_tail(xmitq, skb); 737 } 738 739 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 740 struct tipc_msg *hdr, struct sk_buff_head *inputq, 741 struct sk_buff_head *xmitq) 742 { 743 u32 node = msg_orignode(hdr); 744 u32 port = msg_origport(hdr); 745 struct tipc_member *m, *pm; 746 u16 remitted, in_flight; 747 748 if (!grp) 749 return; 750 751 if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) 752 return; 753 754 m = tipc_group_find_member(grp, node, port); 755 756 switch (msg_type(hdr)) { 757 case GRP_JOIN_MSG: 758 if (!m) 759 m = tipc_group_create_member(grp, node, port, 760 0, MBR_JOINING); 761 if (!m) 762 return; 763 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 764 m->bc_rcv_nxt = m->bc_syncpt; 765 m->window += msg_adv_win(hdr); 766 767 /* Wait until PUBLISH event is received if necessary */ 768 if (m->state != MBR_PUBLISHED) 769 return; 770 771 /* Member can be taken into service */ 772 m->state = MBR_JOINED; 773 tipc_group_open(m, usr_wakeup); 774 tipc_group_update_member(m, 0); 775 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 776 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 777 m->bc_syncpt, inputq); 778 return; 779 case GRP_LEAVE_MSG: 780 if (!m) 781 return; 782 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 783 list_del_init(&m->list); 784 tipc_group_open(m, usr_wakeup); 785 tipc_group_decr_active(grp, m); 786 m->state = MBR_LEAVING; 787 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 788 m->bc_syncpt, inputq); 789 return; 790 case GRP_ADV_MSG: 791 if (!m) 792 return; 793 m->window += msg_adv_win(hdr); 794 tipc_group_open(m, usr_wakeup); 795 return; 796 case GRP_ACK_MSG: 797 if (!m) 798 return; 799 m->bc_acked = msg_grp_bc_acked(hdr); 800 if (--grp->bc_ackers) 801 return; 802 list_del_init(&m->small_win); 803 m->group->open = true; 804 *usr_wakeup = true; 805 tipc_group_update_member(m, 0); 806 return; 807 case GRP_RECLAIM_MSG: 808 if (!m) 809 return; 810 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 811 m->window = ADV_IDLE; 812 tipc_group_open(m, usr_wakeup); 813 return; 814 case GRP_REMIT_MSG: 815 if (!m || m->state != MBR_RECLAIMING) 816 return; 817 818 remitted = msg_grp_remitted(hdr); 819 820 /* Messages preceding the REMIT still in receive queue */ 821 if (m->advertised > remitted) { 822 m->state = MBR_REMITTED; 823 in_flight = m->advertised - remitted; 824 m->advertised = ADV_IDLE + in_flight; 825 return; 826 } 827 /* This should never happen */ 828 if (m->advertised < remitted) 829 pr_warn_ratelimited("Unexpected REMIT msg\n"); 830 831 /* All messages preceding the REMIT have been read */ 832 m->state = MBR_JOINED; 833 grp->active_cnt--; 834 m->advertised = ADV_IDLE; 835 836 /* Set oldest pending member to active and advertise */ 837 if (list_empty(&grp->pending)) 838 return; 839 pm = list_first_entry(&grp->pending, struct tipc_member, list); 840 pm->state = MBR_ACTIVE; 841 list_move_tail(&pm->list, &grp->active); 842 grp->active_cnt++; 843 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 844 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 845 return; 846 default: 847 pr_warn("Received unknown GROUP_PROTO message\n"); 848 } 849 } 850 851 /* tipc_group_member_evt() - receive and handle a member up/down event 852 */ 853 void tipc_group_member_evt(struct tipc_group *grp, 854 bool *usr_wakeup, 855 int *sk_rcvbuf, 856 struct tipc_msg *hdr, 857 struct sk_buff_head *inputq, 858 struct sk_buff_head *xmitq) 859 { 860 struct tipc_event *evt = (void *)msg_data(hdr); 861 u32 instance = evt->found_lower; 862 u32 node = evt->port.node; 863 u32 port = evt->port.ref; 864 int event = evt->event; 865 struct tipc_member *m; 866 struct net *net; 867 u32 self; 868 869 if (!grp) 870 return; 871 872 net = grp->net; 873 self = tipc_own_addr(net); 874 if (!grp->loopback && node == self && port == grp->portid) 875 return; 876 877 m = tipc_group_find_member(grp, node, port); 878 879 switch (event) { 880 case TIPC_PUBLISHED: 881 /* Send and wait for arrival of JOIN message if necessary */ 882 if (!m) { 883 m = tipc_group_create_member(grp, node, port, instance, 884 MBR_PUBLISHED); 885 if (!m) 886 break; 887 tipc_group_update_member(m, 0); 888 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 889 break; 890 } 891 892 if (m->state != MBR_JOINING) 893 break; 894 895 /* Member can be taken into service */ 896 m->instance = instance; 897 m->state = MBR_JOINED; 898 tipc_group_open(m, usr_wakeup); 899 tipc_group_update_member(m, 0); 900 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 901 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 902 m->bc_syncpt, inputq); 903 break; 904 case TIPC_WITHDRAWN: 905 if (!m) 906 break; 907 908 tipc_group_decr_active(grp, m); 909 m->state = MBR_LEAVING; 910 list_del_init(&m->list); 911 tipc_group_open(m, usr_wakeup); 912 913 /* Only send event if no LEAVE message can be expected */ 914 if (!tipc_node_is_up(net, node)) 915 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 916 m->bc_rcv_nxt, inputq); 917 break; 918 default: 919 break; 920 } 921 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 922 } 923