1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 #include "core.h" 37 #include "addr.h" 38 #include "group.h" 39 #include "bcast.h" 40 #include "server.h" 41 #include "msg.h" 42 #include "socket.h" 43 #include "node.h" 44 #include "name_table.h" 45 #include "subscr.h" 46 47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 48 #define ADV_IDLE ADV_UNIT 49 #define ADV_ACTIVE (ADV_UNIT * 12) 50 51 enum mbr_state { 52 MBR_QUARANTINED, 53 MBR_DISCOVERED, 54 MBR_JOINING, 55 MBR_PUBLISHED, 56 MBR_JOINED, 57 MBR_PENDING, 58 MBR_ACTIVE, 59 MBR_RECLAIMING, 60 MBR_REMITTED, 61 MBR_LEAVING 62 }; 63 64 struct tipc_member { 65 struct rb_node tree_node; 66 struct list_head list; 67 struct list_head small_win; 68 struct sk_buff *event_msg; 69 struct sk_buff_head deferredq; 70 struct tipc_group *group; 71 u32 node; 72 u32 port; 73 u32 instance; 74 enum mbr_state state; 75 u16 advertised; 76 u16 window; 77 u16 bc_rcv_nxt; 78 u16 bc_syncpt; 79 u16 bc_acked; 80 bool usr_pending; 81 }; 82 83 struct tipc_group { 84 struct rb_root members; 85 struct list_head small_win; 86 struct list_head pending; 87 struct list_head active; 88 struct list_head reclaiming; 89 struct tipc_nlist dests; 90 struct net *net; 91 int subid; 92 u32 type; 93 u32 instance; 94 u32 domain; 95 u32 scope; 96 u32 portid; 97 u16 member_cnt; 98 u16 active_cnt; 99 u16 max_active; 100 u16 bc_snd_nxt; 101 u16 bc_ackers; 102 bool loopback; 103 bool events; 104 }; 105 106 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 107 int mtyp, struct sk_buff_head *xmitq); 108 109 static void tipc_group_decr_active(struct tipc_group *grp, 110 struct tipc_member *m) 111 { 112 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING) 113 grp->active_cnt--; 114 } 115 116 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 117 { 118 int max_active, active_pool, idle_pool; 119 int mcnt = grp->member_cnt + 1; 120 121 /* Limit simultaneous reception from other members */ 122 max_active = min(mcnt / 8, 64); 123 max_active = max(max_active, 16); 124 grp->max_active = max_active; 125 126 /* Reserve blocks for active and idle members */ 127 active_pool = max_active * ADV_ACTIVE; 128 idle_pool = (mcnt - max_active) * ADV_IDLE; 129 130 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 131 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 132 } 133 134 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 135 { 136 return grp->bc_snd_nxt; 137 } 138 139 static bool tipc_group_is_receiver(struct tipc_member *m) 140 { 141 return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; 142 } 143 144 static bool tipc_group_is_sender(struct tipc_member *m) 145 { 146 return m && m->state >= MBR_JOINED; 147 } 148 149 u32 tipc_group_exclude(struct tipc_group *grp) 150 { 151 if (!grp->loopback) 152 return grp->portid; 153 return 0; 154 } 155 156 int tipc_group_size(struct tipc_group *grp) 157 { 158 return grp->member_cnt; 159 } 160 161 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 162 struct tipc_group_req *mreq) 163 { 164 struct tipc_group *grp; 165 u32 type = mreq->type; 166 167 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 168 if (!grp) 169 return NULL; 170 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 171 INIT_LIST_HEAD(&grp->small_win); 172 INIT_LIST_HEAD(&grp->active); 173 INIT_LIST_HEAD(&grp->pending); 174 INIT_LIST_HEAD(&grp->reclaiming); 175 grp->members = RB_ROOT; 176 grp->net = net; 177 grp->portid = portid; 178 grp->domain = addr_domain(net, mreq->scope); 179 grp->type = type; 180 grp->instance = mreq->instance; 181 grp->scope = mreq->scope; 182 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 183 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 184 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) 185 return grp; 186 kfree(grp); 187 return NULL; 188 } 189 190 void tipc_group_delete(struct net *net, struct tipc_group *grp) 191 { 192 struct rb_root *tree = &grp->members; 193 struct tipc_member *m, *tmp; 194 struct sk_buff_head xmitq; 195 196 __skb_queue_head_init(&xmitq); 197 198 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 199 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 200 list_del(&m->list); 201 kfree(m); 202 } 203 tipc_node_distr_xmit(net, &xmitq); 204 tipc_nlist_purge(&grp->dests); 205 tipc_topsrv_kern_unsubscr(net, grp->subid); 206 kfree(grp); 207 } 208 209 struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 210 u32 node, u32 port) 211 { 212 struct rb_node *n = grp->members.rb_node; 213 u64 nkey, key = (u64)node << 32 | port; 214 struct tipc_member *m; 215 216 while (n) { 217 m = container_of(n, struct tipc_member, tree_node); 218 nkey = (u64)m->node << 32 | m->port; 219 if (key < nkey) 220 n = n->rb_left; 221 else if (key > nkey) 222 n = n->rb_right; 223 else 224 return m; 225 } 226 return NULL; 227 } 228 229 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 230 u32 node, u32 port) 231 { 232 struct tipc_member *m; 233 234 m = tipc_group_find_member(grp, node, port); 235 if (m && tipc_group_is_receiver(m)) 236 return m; 237 return NULL; 238 } 239 240 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 241 u32 node) 242 { 243 struct tipc_member *m; 244 struct rb_node *n; 245 246 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 247 m = container_of(n, struct tipc_member, tree_node); 248 if (m->node == node) 249 return m; 250 } 251 return NULL; 252 } 253 254 static void tipc_group_add_to_tree(struct tipc_group *grp, 255 struct tipc_member *m) 256 { 257 u64 nkey, key = (u64)m->node << 32 | m->port; 258 struct rb_node **n, *parent = NULL; 259 struct tipc_member *tmp; 260 261 n = &grp->members.rb_node; 262 while (*n) { 263 tmp = container_of(*n, struct tipc_member, tree_node); 264 parent = *n; 265 tmp = container_of(parent, struct tipc_member, tree_node); 266 nkey = (u64)tmp->node << 32 | tmp->port; 267 if (key < nkey) 268 n = &(*n)->rb_left; 269 else if (key > nkey) 270 n = &(*n)->rb_right; 271 else 272 return; 273 } 274 rb_link_node(&m->tree_node, parent, n); 275 rb_insert_color(&m->tree_node, &grp->members); 276 } 277 278 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 279 u32 node, u32 port, 280 int state) 281 { 282 struct tipc_member *m; 283 284 m = kzalloc(sizeof(*m), GFP_ATOMIC); 285 if (!m) 286 return NULL; 287 INIT_LIST_HEAD(&m->list); 288 INIT_LIST_HEAD(&m->small_win); 289 __skb_queue_head_init(&m->deferredq); 290 m->group = grp; 291 m->node = node; 292 m->port = port; 293 m->bc_acked = grp->bc_snd_nxt - 1; 294 grp->member_cnt++; 295 tipc_group_add_to_tree(grp, m); 296 tipc_nlist_add(&grp->dests, m->node); 297 m->state = state; 298 return m; 299 } 300 301 void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) 302 { 303 tipc_group_create_member(grp, node, port, MBR_DISCOVERED); 304 } 305 306 static void tipc_group_delete_member(struct tipc_group *grp, 307 struct tipc_member *m) 308 { 309 rb_erase(&m->tree_node, &grp->members); 310 grp->member_cnt--; 311 312 /* Check if we were waiting for replicast ack from this member */ 313 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 314 grp->bc_ackers--; 315 316 list_del_init(&m->list); 317 list_del_init(&m->small_win); 318 tipc_group_decr_active(grp, m); 319 320 /* If last member on a node, remove node from dest list */ 321 if (!tipc_group_find_node(grp, m->node)) 322 tipc_nlist_del(&grp->dests, m->node); 323 324 kfree(m); 325 } 326 327 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 328 { 329 return &grp->dests; 330 } 331 332 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 333 int *scope) 334 { 335 seq->type = grp->type; 336 seq->lower = grp->instance; 337 seq->upper = grp->instance; 338 *scope = grp->scope; 339 } 340 341 void tipc_group_update_member(struct tipc_member *m, int len) 342 { 343 struct tipc_group *grp = m->group; 344 struct tipc_member *_m, *tmp; 345 346 if (!tipc_group_is_receiver(m)) 347 return; 348 349 m->window -= len; 350 351 if (m->window >= ADV_IDLE) 352 return; 353 354 list_del_init(&m->small_win); 355 356 /* Sort member into small_window members' list */ 357 list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) { 358 if (_m->window > m->window) 359 break; 360 } 361 list_add_tail(&m->small_win, &_m->small_win); 362 } 363 364 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 365 { 366 u16 prev = grp->bc_snd_nxt - 1; 367 struct tipc_member *m; 368 struct rb_node *n; 369 u16 ackers = 0; 370 371 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 372 m = container_of(n, struct tipc_member, tree_node); 373 if (tipc_group_is_receiver(m)) { 374 tipc_group_update_member(m, len); 375 m->bc_acked = prev; 376 ackers++; 377 } 378 } 379 380 /* Mark number of acknowledges to expect, if any */ 381 if (ack) 382 grp->bc_ackers = ackers; 383 grp->bc_snd_nxt++; 384 } 385 386 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 387 int len, struct tipc_member **mbr) 388 { 389 struct sk_buff_head xmitq; 390 struct tipc_member *m; 391 int adv, state; 392 393 m = tipc_group_find_dest(grp, dnode, dport); 394 *mbr = m; 395 if (!m) 396 return false; 397 if (m->usr_pending) 398 return true; 399 if (m->window >= len) 400 return false; 401 m->usr_pending = true; 402 403 /* If not fully advertised, do it now to prevent mutual blocking */ 404 adv = m->advertised; 405 state = m->state; 406 if (state < MBR_JOINED) 407 return true; 408 if (state == MBR_JOINED && adv == ADV_IDLE) 409 return true; 410 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 411 return true; 412 if (state == MBR_PENDING && adv == ADV_IDLE) 413 return true; 414 skb_queue_head_init(&xmitq); 415 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 416 tipc_node_distr_xmit(grp->net, &xmitq); 417 return true; 418 } 419 420 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 421 { 422 struct tipc_member *m = NULL; 423 424 /* If prev bcast was replicast, reject until all receivers have acked */ 425 if (grp->bc_ackers) 426 return true; 427 428 if (list_empty(&grp->small_win)) 429 return false; 430 431 m = list_first_entry(&grp->small_win, struct tipc_member, small_win); 432 if (m->window >= len) 433 return false; 434 435 return tipc_group_cong(grp, m->node, m->port, len, &m); 436 } 437 438 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 439 */ 440 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 441 { 442 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 443 u16 bc_seqno = msg_grp_bc_seqno(hdr); 444 struct sk_buff *_skb, *tmp; 445 int mtyp = msg_type(hdr); 446 447 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 448 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 449 skb_queue_walk_safe(defq, _skb, tmp) { 450 _hdr = buf_msg(_skb); 451 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 452 continue; 453 __skb_queue_before(defq, _skb, skb); 454 return; 455 } 456 /* Bcast was not bypassed, - add to tail */ 457 } 458 /* Unicasts are never bypassed, - always add to tail */ 459 __skb_queue_tail(defq, skb); 460 } 461 462 /* tipc_group_filter_msg() - determine if we should accept arriving message 463 */ 464 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 465 struct sk_buff_head *xmitq) 466 { 467 struct sk_buff *skb = __skb_dequeue(inputq); 468 bool ack, deliver, update, leave = false; 469 struct sk_buff_head *defq; 470 struct tipc_member *m; 471 struct tipc_msg *hdr; 472 u32 node, port; 473 int mtyp, blks; 474 475 if (!skb) 476 return; 477 478 hdr = buf_msg(skb); 479 node = msg_orignode(hdr); 480 port = msg_origport(hdr); 481 482 if (!msg_in_group(hdr)) 483 goto drop; 484 485 m = tipc_group_find_member(grp, node, port); 486 if (!tipc_group_is_sender(m)) 487 goto drop; 488 489 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 490 goto drop; 491 492 TIPC_SKB_CB(skb)->orig_member = m->instance; 493 defq = &m->deferredq; 494 tipc_group_sort_msg(skb, defq); 495 496 while ((skb = skb_peek(defq))) { 497 hdr = buf_msg(skb); 498 mtyp = msg_type(hdr); 499 blks = msg_blocks(hdr); 500 deliver = true; 501 ack = false; 502 update = false; 503 504 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 505 break; 506 507 /* Decide what to do with message */ 508 switch (mtyp) { 509 case TIPC_GRP_MCAST_MSG: 510 if (msg_nameinst(hdr) != grp->instance) { 511 update = true; 512 deliver = false; 513 } 514 /* Fall thru */ 515 case TIPC_GRP_BCAST_MSG: 516 m->bc_rcv_nxt++; 517 ack = msg_grp_bc_ack_req(hdr); 518 break; 519 case TIPC_GRP_UCAST_MSG: 520 break; 521 case TIPC_GRP_MEMBER_EVT: 522 if (m->state == MBR_LEAVING) 523 leave = true; 524 if (!grp->events) 525 deliver = false; 526 break; 527 default: 528 break; 529 } 530 531 /* Execute decisions */ 532 __skb_dequeue(defq); 533 if (deliver) 534 __skb_queue_tail(inputq, skb); 535 else 536 kfree_skb(skb); 537 538 if (ack) 539 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 540 541 if (leave) { 542 __skb_queue_purge(defq); 543 tipc_group_delete_member(grp, m); 544 break; 545 } 546 if (!update) 547 continue; 548 549 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 550 } 551 return; 552 drop: 553 kfree_skb(skb); 554 } 555 556 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 557 u32 port, struct sk_buff_head *xmitq) 558 { 559 struct list_head *active = &grp->active; 560 int max_active = grp->max_active; 561 int reclaim_limit = max_active * 3 / 4; 562 int active_cnt = grp->active_cnt; 563 struct tipc_member *m, *rm; 564 565 m = tipc_group_find_member(grp, node, port); 566 if (!m) 567 return; 568 569 m->advertised -= blks; 570 571 switch (m->state) { 572 case MBR_JOINED: 573 /* Reclaim advertised space from least active member */ 574 if (!list_empty(active) && active_cnt >= reclaim_limit) { 575 rm = list_first_entry(active, struct tipc_member, list); 576 rm->state = MBR_RECLAIMING; 577 list_move_tail(&rm->list, &grp->reclaiming); 578 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 579 } 580 /* If max active, become pending and wait for reclaimed space */ 581 if (active_cnt >= max_active) { 582 m->state = MBR_PENDING; 583 list_add_tail(&m->list, &grp->pending); 584 break; 585 } 586 /* Otherwise become active */ 587 m->state = MBR_ACTIVE; 588 list_add_tail(&m->list, &grp->active); 589 grp->active_cnt++; 590 /* Fall through */ 591 case MBR_ACTIVE: 592 if (!list_is_last(&m->list, &grp->active)) 593 list_move_tail(&m->list, &grp->active); 594 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 595 break; 596 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 597 break; 598 case MBR_REMITTED: 599 if (m->advertised > ADV_IDLE) 600 break; 601 m->state = MBR_JOINED; 602 if (m->advertised < ADV_IDLE) { 603 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 604 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 605 } 606 break; 607 case MBR_RECLAIMING: 608 case MBR_DISCOVERED: 609 case MBR_JOINING: 610 case MBR_LEAVING: 611 default: 612 break; 613 } 614 } 615 616 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 617 int mtyp, struct sk_buff_head *xmitq) 618 { 619 struct tipc_msg *hdr; 620 struct sk_buff *skb; 621 int adv = 0; 622 623 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 624 m->node, tipc_own_addr(grp->net), 625 m->port, grp->portid, 0); 626 if (!skb) 627 return; 628 629 if (m->state == MBR_ACTIVE) 630 adv = ADV_ACTIVE - m->advertised; 631 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 632 adv = ADV_IDLE - m->advertised; 633 634 hdr = buf_msg(skb); 635 636 if (mtyp == GRP_JOIN_MSG) { 637 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 638 msg_set_adv_win(hdr, adv); 639 m->advertised += adv; 640 } else if (mtyp == GRP_LEAVE_MSG) { 641 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 642 } else if (mtyp == GRP_ADV_MSG) { 643 msg_set_adv_win(hdr, adv); 644 m->advertised += adv; 645 } else if (mtyp == GRP_ACK_MSG) { 646 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 647 } else if (mtyp == GRP_REMIT_MSG) { 648 msg_set_grp_remitted(hdr, m->window); 649 } 650 msg_set_dest_droppable(hdr, true); 651 __skb_queue_tail(xmitq, skb); 652 } 653 654 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 655 struct tipc_msg *hdr, struct sk_buff_head *inputq, 656 struct sk_buff_head *xmitq) 657 { 658 u32 node = msg_orignode(hdr); 659 u32 port = msg_origport(hdr); 660 struct tipc_member *m, *pm; 661 struct tipc_msg *ehdr; 662 u16 remitted, in_flight; 663 664 if (!grp) 665 return; 666 667 m = tipc_group_find_member(grp, node, port); 668 669 switch (msg_type(hdr)) { 670 case GRP_JOIN_MSG: 671 if (!m) 672 m = tipc_group_create_member(grp, node, port, 673 MBR_QUARANTINED); 674 if (!m) 675 return; 676 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 677 m->bc_rcv_nxt = m->bc_syncpt; 678 m->window += msg_adv_win(hdr); 679 680 /* Wait until PUBLISH event is received */ 681 if (m->state == MBR_DISCOVERED) { 682 m->state = MBR_JOINING; 683 } else if (m->state == MBR_PUBLISHED) { 684 m->state = MBR_JOINED; 685 *usr_wakeup = true; 686 m->usr_pending = false; 687 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 688 ehdr = buf_msg(m->event_msg); 689 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); 690 __skb_queue_tail(inputq, m->event_msg); 691 } 692 list_del_init(&m->small_win); 693 tipc_group_update_member(m, 0); 694 return; 695 case GRP_LEAVE_MSG: 696 if (!m) 697 return; 698 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 699 list_del_init(&m->list); 700 list_del_init(&m->small_win); 701 *usr_wakeup = true; 702 703 /* Wait until WITHDRAW event is received */ 704 if (m->state != MBR_LEAVING) { 705 tipc_group_decr_active(grp, m); 706 m->state = MBR_LEAVING; 707 return; 708 } 709 /* Otherwise deliver already received WITHDRAW event */ 710 ehdr = buf_msg(m->event_msg); 711 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); 712 __skb_queue_tail(inputq, m->event_msg); 713 return; 714 case GRP_ADV_MSG: 715 if (!m) 716 return; 717 m->window += msg_adv_win(hdr); 718 *usr_wakeup = m->usr_pending; 719 m->usr_pending = false; 720 list_del_init(&m->small_win); 721 return; 722 case GRP_ACK_MSG: 723 if (!m) 724 return; 725 m->bc_acked = msg_grp_bc_acked(hdr); 726 if (--grp->bc_ackers) 727 break; 728 *usr_wakeup = true; 729 m->usr_pending = false; 730 return; 731 case GRP_RECLAIM_MSG: 732 if (!m) 733 return; 734 *usr_wakeup = m->usr_pending; 735 m->usr_pending = false; 736 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 737 m->window = ADV_IDLE; 738 return; 739 case GRP_REMIT_MSG: 740 if (!m || m->state != MBR_RECLAIMING) 741 return; 742 743 list_del_init(&m->list); 744 grp->active_cnt--; 745 remitted = msg_grp_remitted(hdr); 746 747 /* Messages preceding the REMIT still in receive queue */ 748 if (m->advertised > remitted) { 749 m->state = MBR_REMITTED; 750 in_flight = m->advertised - remitted; 751 } 752 /* All messages preceding the REMIT have been read */ 753 if (m->advertised <= remitted) { 754 m->state = MBR_JOINED; 755 in_flight = 0; 756 } 757 /* ..and the REMIT overtaken by more messages => re-advertise */ 758 if (m->advertised < remitted) 759 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 760 761 m->advertised = ADV_IDLE + in_flight; 762 763 /* Set oldest pending member to active and advertise */ 764 if (list_empty(&grp->pending)) 765 return; 766 pm = list_first_entry(&grp->pending, struct tipc_member, list); 767 pm->state = MBR_ACTIVE; 768 list_move_tail(&pm->list, &grp->active); 769 grp->active_cnt++; 770 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 771 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 772 return; 773 default: 774 pr_warn("Received unknown GROUP_PROTO message\n"); 775 } 776 } 777 778 /* tipc_group_member_evt() - receive and handle a member up/down event 779 */ 780 void tipc_group_member_evt(struct tipc_group *grp, 781 bool *usr_wakeup, 782 int *sk_rcvbuf, 783 struct sk_buff *skb, 784 struct sk_buff_head *inputq, 785 struct sk_buff_head *xmitq) 786 { 787 struct tipc_msg *hdr = buf_msg(skb); 788 struct tipc_event *evt = (void *)msg_data(hdr); 789 u32 instance = evt->found_lower; 790 u32 node = evt->port.node; 791 u32 port = evt->port.ref; 792 int event = evt->event; 793 struct tipc_member *m; 794 struct net *net; 795 bool node_up; 796 u32 self; 797 798 if (!grp) 799 goto drop; 800 801 net = grp->net; 802 self = tipc_own_addr(net); 803 if (!grp->loopback && node == self && port == grp->portid) 804 goto drop; 805 806 /* Convert message before delivery to user */ 807 msg_set_hdr_sz(hdr, GROUP_H_SIZE); 808 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE); 809 msg_set_type(hdr, TIPC_GRP_MEMBER_EVT); 810 msg_set_origport(hdr, port); 811 msg_set_orignode(hdr, node); 812 msg_set_nametype(hdr, grp->type); 813 msg_set_grp_evt(hdr, event); 814 815 m = tipc_group_find_member(grp, node, port); 816 817 if (event == TIPC_PUBLISHED) { 818 if (!m) 819 m = tipc_group_create_member(grp, node, port, 820 MBR_DISCOVERED); 821 if (!m) 822 goto drop; 823 824 /* Hold back event if JOIN message not yet received */ 825 if (m->state == MBR_DISCOVERED) { 826 m->event_msg = skb; 827 m->state = MBR_PUBLISHED; 828 } else { 829 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 830 __skb_queue_tail(inputq, skb); 831 m->state = MBR_JOINED; 832 *usr_wakeup = true; 833 m->usr_pending = false; 834 } 835 m->instance = instance; 836 TIPC_SKB_CB(skb)->orig_member = m->instance; 837 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 838 tipc_group_update_member(m, 0); 839 } else if (event == TIPC_WITHDRAWN) { 840 if (!m) 841 goto drop; 842 843 TIPC_SKB_CB(skb)->orig_member = m->instance; 844 845 *usr_wakeup = true; 846 m->usr_pending = false; 847 node_up = tipc_node_is_up(net, node); 848 m->event_msg = NULL; 849 850 if (node_up) { 851 /* Hold back event if a LEAVE msg should be expected */ 852 if (m->state != MBR_LEAVING) { 853 m->event_msg = skb; 854 tipc_group_decr_active(grp, m); 855 m->state = MBR_LEAVING; 856 } else { 857 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 858 __skb_queue_tail(inputq, skb); 859 } 860 } else { 861 if (m->state != MBR_LEAVING) { 862 tipc_group_decr_active(grp, m); 863 m->state = MBR_LEAVING; 864 msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt); 865 } else { 866 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 867 } 868 __skb_queue_tail(inputq, skb); 869 } 870 list_del_init(&m->list); 871 list_del_init(&m->small_win); 872 } 873 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 874 return; 875 drop: 876 kfree_skb(skb); 877 } 878