1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 #include "core.h" 37 #include "addr.h" 38 #include "group.h" 39 #include "bcast.h" 40 #include "server.h" 41 #include "msg.h" 42 #include "socket.h" 43 #include "node.h" 44 #include "name_table.h" 45 #include "subscr.h" 46 47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 48 #define ADV_IDLE ADV_UNIT 49 #define ADV_ACTIVE (ADV_UNIT * 12) 50 51 enum mbr_state { 52 MBR_QUARANTINED, 53 MBR_DISCOVERED, 54 MBR_JOINING, 55 MBR_PUBLISHED, 56 MBR_JOINED, 57 MBR_PENDING, 58 MBR_ACTIVE, 59 MBR_RECLAIMING, 60 MBR_REMITTED, 61 MBR_LEAVING 62 }; 63 64 struct tipc_member { 65 struct rb_node tree_node; 66 struct list_head list; 67 struct list_head congested; 68 struct sk_buff *event_msg; 69 struct sk_buff_head deferredq; 70 struct tipc_group *group; 71 u32 node; 72 u32 port; 73 u32 instance; 74 enum mbr_state state; 75 u16 advertised; 76 u16 window; 77 u16 bc_rcv_nxt; 78 u16 bc_syncpt; 79 u16 bc_acked; 80 bool usr_pending; 81 }; 82 83 struct tipc_group { 84 struct rb_root members; 85 struct list_head congested; 86 struct list_head pending; 87 struct list_head active; 88 struct list_head reclaiming; 89 struct tipc_nlist dests; 90 struct net *net; 91 int subid; 92 u32 type; 93 u32 instance; 94 u32 domain; 95 u32 scope; 96 u32 portid; 97 u16 member_cnt; 98 u16 active_cnt; 99 u16 max_active; 100 u16 bc_snd_nxt; 101 u16 bc_ackers; 102 bool loopback; 103 bool events; 104 }; 105 106 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 107 int mtyp, struct sk_buff_head *xmitq); 108 109 static void tipc_group_decr_active(struct tipc_group *grp, 110 struct tipc_member *m) 111 { 112 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING || 113 m->state == MBR_REMITTED) 114 grp->active_cnt--; 115 } 116 117 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 118 { 119 int max_active, active_pool, idle_pool; 120 int mcnt = grp->member_cnt + 1; 121 122 /* Limit simultaneous reception from other members */ 123 max_active = min(mcnt / 8, 64); 124 max_active = max(max_active, 16); 125 grp->max_active = max_active; 126 127 /* Reserve blocks for active and idle members */ 128 active_pool = max_active * ADV_ACTIVE; 129 idle_pool = (mcnt - max_active) * ADV_IDLE; 130 131 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 132 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 133 } 134 135 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 136 { 137 return grp->bc_snd_nxt; 138 } 139 140 static bool tipc_group_is_enabled(struct tipc_member *m) 141 { 142 return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; 143 } 144 145 static bool tipc_group_is_receiver(struct tipc_member *m) 146 { 147 return m && m->state >= MBR_JOINED; 148 } 149 150 u32 tipc_group_exclude(struct tipc_group *grp) 151 { 152 if (!grp->loopback) 153 return grp->portid; 154 return 0; 155 } 156 157 int tipc_group_size(struct tipc_group *grp) 158 { 159 return grp->member_cnt; 160 } 161 162 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 163 struct tipc_group_req *mreq) 164 { 165 struct tipc_group *grp; 166 u32 type = mreq->type; 167 168 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 169 if (!grp) 170 return NULL; 171 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 172 INIT_LIST_HEAD(&grp->congested); 173 INIT_LIST_HEAD(&grp->active); 174 INIT_LIST_HEAD(&grp->pending); 175 INIT_LIST_HEAD(&grp->reclaiming); 176 grp->members = RB_ROOT; 177 grp->net = net; 178 grp->portid = portid; 179 grp->domain = addr_domain(net, mreq->scope); 180 grp->type = type; 181 grp->instance = mreq->instance; 182 grp->scope = mreq->scope; 183 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 184 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 185 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) 186 return grp; 187 kfree(grp); 188 return NULL; 189 } 190 191 void tipc_group_delete(struct net *net, struct tipc_group *grp) 192 { 193 struct rb_root *tree = &grp->members; 194 struct tipc_member *m, *tmp; 195 struct sk_buff_head xmitq; 196 197 __skb_queue_head_init(&xmitq); 198 199 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 200 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 201 list_del(&m->list); 202 kfree(m); 203 } 204 tipc_node_distr_xmit(net, &xmitq); 205 tipc_nlist_purge(&grp->dests); 206 tipc_topsrv_kern_unsubscr(net, grp->subid); 207 kfree(grp); 208 } 209 210 struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 211 u32 node, u32 port) 212 { 213 struct rb_node *n = grp->members.rb_node; 214 u64 nkey, key = (u64)node << 32 | port; 215 struct tipc_member *m; 216 217 while (n) { 218 m = container_of(n, struct tipc_member, tree_node); 219 nkey = (u64)m->node << 32 | m->port; 220 if (key < nkey) 221 n = n->rb_left; 222 else if (key > nkey) 223 n = n->rb_right; 224 else 225 return m; 226 } 227 return NULL; 228 } 229 230 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 231 u32 node, u32 port) 232 { 233 struct tipc_member *m; 234 235 m = tipc_group_find_member(grp, node, port); 236 if (m && tipc_group_is_enabled(m)) 237 return m; 238 return NULL; 239 } 240 241 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 242 u32 node) 243 { 244 struct tipc_member *m; 245 struct rb_node *n; 246 247 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 248 m = container_of(n, struct tipc_member, tree_node); 249 if (m->node == node) 250 return m; 251 } 252 return NULL; 253 } 254 255 static void tipc_group_add_to_tree(struct tipc_group *grp, 256 struct tipc_member *m) 257 { 258 u64 nkey, key = (u64)m->node << 32 | m->port; 259 struct rb_node **n, *parent = NULL; 260 struct tipc_member *tmp; 261 262 n = &grp->members.rb_node; 263 while (*n) { 264 tmp = container_of(*n, struct tipc_member, tree_node); 265 parent = *n; 266 tmp = container_of(parent, struct tipc_member, tree_node); 267 nkey = (u64)tmp->node << 32 | tmp->port; 268 if (key < nkey) 269 n = &(*n)->rb_left; 270 else if (key > nkey) 271 n = &(*n)->rb_right; 272 else 273 return; 274 } 275 rb_link_node(&m->tree_node, parent, n); 276 rb_insert_color(&m->tree_node, &grp->members); 277 } 278 279 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 280 u32 node, u32 port, 281 int state) 282 { 283 struct tipc_member *m; 284 285 m = kzalloc(sizeof(*m), GFP_ATOMIC); 286 if (!m) 287 return NULL; 288 INIT_LIST_HEAD(&m->list); 289 INIT_LIST_HEAD(&m->congested); 290 __skb_queue_head_init(&m->deferredq); 291 m->group = grp; 292 m->node = node; 293 m->port = port; 294 m->bc_acked = grp->bc_snd_nxt - 1; 295 grp->member_cnt++; 296 tipc_group_add_to_tree(grp, m); 297 tipc_nlist_add(&grp->dests, m->node); 298 m->state = state; 299 return m; 300 } 301 302 void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) 303 { 304 tipc_group_create_member(grp, node, port, MBR_DISCOVERED); 305 } 306 307 static void tipc_group_delete_member(struct tipc_group *grp, 308 struct tipc_member *m) 309 { 310 rb_erase(&m->tree_node, &grp->members); 311 grp->member_cnt--; 312 313 /* Check if we were waiting for replicast ack from this member */ 314 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 315 grp->bc_ackers--; 316 317 list_del_init(&m->list); 318 list_del_init(&m->congested); 319 tipc_group_decr_active(grp, m); 320 321 /* If last member on a node, remove node from dest list */ 322 if (!tipc_group_find_node(grp, m->node)) 323 tipc_nlist_del(&grp->dests, m->node); 324 325 kfree(m); 326 } 327 328 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 329 { 330 return &grp->dests; 331 } 332 333 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 334 int *scope) 335 { 336 seq->type = grp->type; 337 seq->lower = grp->instance; 338 seq->upper = grp->instance; 339 *scope = grp->scope; 340 } 341 342 void tipc_group_update_member(struct tipc_member *m, int len) 343 { 344 struct tipc_group *grp = m->group; 345 struct tipc_member *_m, *tmp; 346 347 if (!tipc_group_is_enabled(m)) 348 return; 349 350 m->window -= len; 351 352 if (m->window >= ADV_IDLE) 353 return; 354 355 list_del_init(&m->congested); 356 357 /* Sort member into congested members' list */ 358 list_for_each_entry_safe(_m, tmp, &grp->congested, congested) { 359 if (m->window > _m->window) 360 continue; 361 list_add_tail(&m->congested, &_m->congested); 362 return; 363 } 364 list_add_tail(&m->congested, &grp->congested); 365 } 366 367 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 368 { 369 u16 prev = grp->bc_snd_nxt - 1; 370 struct tipc_member *m; 371 struct rb_node *n; 372 u16 ackers = 0; 373 374 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 375 m = container_of(n, struct tipc_member, tree_node); 376 if (tipc_group_is_enabled(m)) { 377 tipc_group_update_member(m, len); 378 m->bc_acked = prev; 379 ackers++; 380 } 381 } 382 383 /* Mark number of acknowledges to expect, if any */ 384 if (ack) 385 grp->bc_ackers = ackers; 386 grp->bc_snd_nxt++; 387 } 388 389 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 390 int len, struct tipc_member **mbr) 391 { 392 struct sk_buff_head xmitq; 393 struct tipc_member *m; 394 int adv, state; 395 396 m = tipc_group_find_dest(grp, dnode, dport); 397 *mbr = m; 398 if (!m) 399 return false; 400 if (m->usr_pending) 401 return true; 402 if (m->window >= len) 403 return false; 404 m->usr_pending = true; 405 406 /* If not fully advertised, do it now to prevent mutual blocking */ 407 adv = m->advertised; 408 state = m->state; 409 if (state < MBR_JOINED) 410 return true; 411 if (state == MBR_JOINED && adv == ADV_IDLE) 412 return true; 413 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 414 return true; 415 if (state == MBR_PENDING && adv == ADV_IDLE) 416 return true; 417 skb_queue_head_init(&xmitq); 418 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 419 tipc_node_distr_xmit(grp->net, &xmitq); 420 return true; 421 } 422 423 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 424 { 425 struct tipc_member *m = NULL; 426 427 /* If prev bcast was replicast, reject until all receivers have acked */ 428 if (grp->bc_ackers) 429 return true; 430 431 if (list_empty(&grp->congested)) 432 return false; 433 434 m = list_first_entry(&grp->congested, struct tipc_member, congested); 435 if (m->window >= len) 436 return false; 437 438 return tipc_group_cong(grp, m->node, m->port, len, &m); 439 } 440 441 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 442 */ 443 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 444 { 445 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 446 u16 bc_seqno = msg_grp_bc_seqno(hdr); 447 struct sk_buff *_skb, *tmp; 448 int mtyp = msg_type(hdr); 449 450 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 451 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 452 skb_queue_walk_safe(defq, _skb, tmp) { 453 _hdr = buf_msg(_skb); 454 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 455 continue; 456 __skb_queue_before(defq, _skb, skb); 457 return; 458 } 459 /* Bcast was not bypassed, - add to tail */ 460 } 461 /* Unicasts are never bypassed, - always add to tail */ 462 __skb_queue_tail(defq, skb); 463 } 464 465 /* tipc_group_filter_msg() - determine if we should accept arriving message 466 */ 467 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 468 struct sk_buff_head *xmitq) 469 { 470 struct sk_buff *skb = __skb_dequeue(inputq); 471 bool ack, deliver, update, leave = false; 472 struct sk_buff_head *defq; 473 struct tipc_member *m; 474 struct tipc_msg *hdr; 475 u32 node, port; 476 int mtyp, blks; 477 478 if (!skb) 479 return; 480 481 hdr = buf_msg(skb); 482 node = msg_orignode(hdr); 483 port = msg_origport(hdr); 484 485 if (!msg_in_group(hdr)) 486 goto drop; 487 488 m = tipc_group_find_member(grp, node, port); 489 if (!tipc_group_is_receiver(m)) 490 goto drop; 491 492 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 493 goto drop; 494 495 TIPC_SKB_CB(skb)->orig_member = m->instance; 496 defq = &m->deferredq; 497 tipc_group_sort_msg(skb, defq); 498 499 while ((skb = skb_peek(defq))) { 500 hdr = buf_msg(skb); 501 mtyp = msg_type(hdr); 502 blks = msg_blocks(hdr); 503 deliver = true; 504 ack = false; 505 update = false; 506 507 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 508 break; 509 510 /* Decide what to do with message */ 511 switch (mtyp) { 512 case TIPC_GRP_MCAST_MSG: 513 if (msg_nameinst(hdr) != grp->instance) { 514 update = true; 515 deliver = false; 516 } 517 /* Fall thru */ 518 case TIPC_GRP_BCAST_MSG: 519 m->bc_rcv_nxt++; 520 ack = msg_grp_bc_ack_req(hdr); 521 break; 522 case TIPC_GRP_UCAST_MSG: 523 break; 524 case TIPC_GRP_MEMBER_EVT: 525 if (m->state == MBR_LEAVING) 526 leave = true; 527 if (!grp->events) 528 deliver = false; 529 break; 530 default: 531 break; 532 } 533 534 /* Execute decisions */ 535 __skb_dequeue(defq); 536 if (deliver) 537 __skb_queue_tail(inputq, skb); 538 else 539 kfree_skb(skb); 540 541 if (ack) 542 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 543 544 if (leave) { 545 __skb_queue_purge(defq); 546 tipc_group_delete_member(grp, m); 547 break; 548 } 549 if (!update) 550 continue; 551 552 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 553 } 554 return; 555 drop: 556 kfree_skb(skb); 557 } 558 559 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 560 u32 port, struct sk_buff_head *xmitq) 561 { 562 struct list_head *active = &grp->active; 563 int max_active = grp->max_active; 564 int reclaim_limit = max_active * 3 / 4; 565 int active_cnt = grp->active_cnt; 566 struct tipc_member *m, *rm, *pm; 567 568 m = tipc_group_find_member(grp, node, port); 569 if (!m) 570 return; 571 572 m->advertised -= blks; 573 574 switch (m->state) { 575 case MBR_JOINED: 576 /* Reclaim advertised space from least active member */ 577 if (!list_empty(active) && active_cnt >= reclaim_limit) { 578 rm = list_first_entry(active, struct tipc_member, list); 579 rm->state = MBR_RECLAIMING; 580 list_move_tail(&rm->list, &grp->reclaiming); 581 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 582 } 583 /* If max active, become pending and wait for reclaimed space */ 584 if (active_cnt >= max_active) { 585 m->state = MBR_PENDING; 586 list_add_tail(&m->list, &grp->pending); 587 break; 588 } 589 /* Otherwise become active */ 590 m->state = MBR_ACTIVE; 591 list_add_tail(&m->list, &grp->active); 592 grp->active_cnt++; 593 /* Fall through */ 594 case MBR_ACTIVE: 595 if (!list_is_last(&m->list, &grp->active)) 596 list_move_tail(&m->list, &grp->active); 597 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 598 break; 599 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 600 break; 601 case MBR_REMITTED: 602 if (m->advertised > ADV_IDLE) 603 break; 604 m->state = MBR_JOINED; 605 if (m->advertised < ADV_IDLE) { 606 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 607 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 608 } 609 grp->active_cnt--; 610 list_del_init(&m->list); 611 if (list_empty(&grp->pending)) 612 return; 613 614 /* Set oldest pending member to active and advertise */ 615 pm = list_first_entry(&grp->pending, struct tipc_member, list); 616 pm->state = MBR_ACTIVE; 617 list_move_tail(&pm->list, &grp->active); 618 grp->active_cnt++; 619 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 620 break; 621 case MBR_RECLAIMING: 622 case MBR_DISCOVERED: 623 case MBR_JOINING: 624 case MBR_LEAVING: 625 default: 626 break; 627 } 628 } 629 630 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 631 int mtyp, struct sk_buff_head *xmitq) 632 { 633 struct tipc_msg *hdr; 634 struct sk_buff *skb; 635 int adv = 0; 636 637 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 638 m->node, tipc_own_addr(grp->net), 639 m->port, grp->portid, 0); 640 if (!skb) 641 return; 642 643 if (m->state == MBR_ACTIVE) 644 adv = ADV_ACTIVE - m->advertised; 645 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 646 adv = ADV_IDLE - m->advertised; 647 648 hdr = buf_msg(skb); 649 650 if (mtyp == GRP_JOIN_MSG) { 651 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 652 msg_set_adv_win(hdr, adv); 653 m->advertised += adv; 654 } else if (mtyp == GRP_LEAVE_MSG) { 655 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 656 } else if (mtyp == GRP_ADV_MSG) { 657 msg_set_adv_win(hdr, adv); 658 m->advertised += adv; 659 } else if (mtyp == GRP_ACK_MSG) { 660 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 661 } else if (mtyp == GRP_REMIT_MSG) { 662 msg_set_grp_remitted(hdr, m->window); 663 } 664 msg_set_dest_droppable(hdr, true); 665 __skb_queue_tail(xmitq, skb); 666 } 667 668 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 669 struct tipc_msg *hdr, struct sk_buff_head *inputq, 670 struct sk_buff_head *xmitq) 671 { 672 u32 node = msg_orignode(hdr); 673 u32 port = msg_origport(hdr); 674 struct tipc_member *m, *pm; 675 struct tipc_msg *ehdr; 676 u16 remitted, in_flight; 677 678 if (!grp) 679 return; 680 681 m = tipc_group_find_member(grp, node, port); 682 683 switch (msg_type(hdr)) { 684 case GRP_JOIN_MSG: 685 if (!m) 686 m = tipc_group_create_member(grp, node, port, 687 MBR_QUARANTINED); 688 if (!m) 689 return; 690 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 691 m->bc_rcv_nxt = m->bc_syncpt; 692 m->window += msg_adv_win(hdr); 693 694 /* Wait until PUBLISH event is received */ 695 if (m->state == MBR_DISCOVERED) { 696 m->state = MBR_JOINING; 697 } else if (m->state == MBR_PUBLISHED) { 698 m->state = MBR_JOINED; 699 *usr_wakeup = true; 700 m->usr_pending = false; 701 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 702 ehdr = buf_msg(m->event_msg); 703 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); 704 __skb_queue_tail(inputq, m->event_msg); 705 } 706 list_del_init(&m->congested); 707 tipc_group_update_member(m, 0); 708 return; 709 case GRP_LEAVE_MSG: 710 if (!m) 711 return; 712 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 713 list_del_init(&m->list); 714 list_del_init(&m->congested); 715 *usr_wakeup = true; 716 717 /* Wait until WITHDRAW event is received */ 718 if (m->state != MBR_LEAVING) { 719 tipc_group_decr_active(grp, m); 720 m->state = MBR_LEAVING; 721 return; 722 } 723 /* Otherwise deliver already received WITHDRAW event */ 724 ehdr = buf_msg(m->event_msg); 725 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); 726 __skb_queue_tail(inputq, m->event_msg); 727 return; 728 case GRP_ADV_MSG: 729 if (!m) 730 return; 731 m->window += msg_adv_win(hdr); 732 *usr_wakeup = m->usr_pending; 733 m->usr_pending = false; 734 list_del_init(&m->congested); 735 return; 736 case GRP_ACK_MSG: 737 if (!m) 738 return; 739 m->bc_acked = msg_grp_bc_acked(hdr); 740 if (--grp->bc_ackers) 741 break; 742 *usr_wakeup = true; 743 m->usr_pending = false; 744 return; 745 case GRP_RECLAIM_MSG: 746 if (!m) 747 return; 748 *usr_wakeup = m->usr_pending; 749 m->usr_pending = false; 750 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 751 m->window = ADV_IDLE; 752 return; 753 case GRP_REMIT_MSG: 754 if (!m || m->state != MBR_RECLAIMING) 755 return; 756 757 remitted = msg_grp_remitted(hdr); 758 759 /* Messages preceding the REMIT still in receive queue */ 760 if (m->advertised > remitted) { 761 m->state = MBR_REMITTED; 762 in_flight = m->advertised - remitted; 763 m->advertised = ADV_IDLE + in_flight; 764 return; 765 } 766 /* All messages preceding the REMIT have been read */ 767 if (m->advertised <= remitted) { 768 m->state = MBR_JOINED; 769 in_flight = 0; 770 } 771 /* ..and the REMIT overtaken by more messages => re-advertise */ 772 if (m->advertised < remitted) 773 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 774 775 m->advertised = ADV_IDLE + in_flight; 776 grp->active_cnt--; 777 list_del_init(&m->list); 778 779 /* Set oldest pending member to active and advertise */ 780 if (list_empty(&grp->pending)) 781 return; 782 pm = list_first_entry(&grp->pending, struct tipc_member, list); 783 pm->state = MBR_ACTIVE; 784 list_move_tail(&pm->list, &grp->active); 785 grp->active_cnt++; 786 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 787 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 788 return; 789 default: 790 pr_warn("Received unknown GROUP_PROTO message\n"); 791 } 792 } 793 794 /* tipc_group_member_evt() - receive and handle a member up/down event 795 */ 796 void tipc_group_member_evt(struct tipc_group *grp, 797 bool *usr_wakeup, 798 int *sk_rcvbuf, 799 struct sk_buff *skb, 800 struct sk_buff_head *inputq, 801 struct sk_buff_head *xmitq) 802 { 803 struct tipc_msg *hdr = buf_msg(skb); 804 struct tipc_event *evt = (void *)msg_data(hdr); 805 u32 instance = evt->found_lower; 806 u32 node = evt->port.node; 807 u32 port = evt->port.ref; 808 int event = evt->event; 809 struct tipc_member *m; 810 struct net *net; 811 bool node_up; 812 u32 self; 813 814 if (!grp) 815 goto drop; 816 817 net = grp->net; 818 self = tipc_own_addr(net); 819 if (!grp->loopback && node == self && port == grp->portid) 820 goto drop; 821 822 /* Convert message before delivery to user */ 823 msg_set_hdr_sz(hdr, GROUP_H_SIZE); 824 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE); 825 msg_set_type(hdr, TIPC_GRP_MEMBER_EVT); 826 msg_set_origport(hdr, port); 827 msg_set_orignode(hdr, node); 828 msg_set_nametype(hdr, grp->type); 829 msg_set_grp_evt(hdr, event); 830 831 m = tipc_group_find_member(grp, node, port); 832 833 if (event == TIPC_PUBLISHED) { 834 if (!m) 835 m = tipc_group_create_member(grp, node, port, 836 MBR_DISCOVERED); 837 if (!m) 838 goto drop; 839 840 /* Hold back event if JOIN message not yet received */ 841 if (m->state == MBR_DISCOVERED) { 842 m->event_msg = skb; 843 m->state = MBR_PUBLISHED; 844 } else { 845 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 846 __skb_queue_tail(inputq, skb); 847 m->state = MBR_JOINED; 848 *usr_wakeup = true; 849 m->usr_pending = false; 850 } 851 m->instance = instance; 852 TIPC_SKB_CB(skb)->orig_member = m->instance; 853 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 854 if (m->window < ADV_IDLE) 855 tipc_group_update_member(m, 0); 856 else 857 list_del_init(&m->congested); 858 } else if (event == TIPC_WITHDRAWN) { 859 if (!m) 860 goto drop; 861 862 TIPC_SKB_CB(skb)->orig_member = m->instance; 863 864 *usr_wakeup = true; 865 m->usr_pending = false; 866 node_up = tipc_node_is_up(net, node); 867 m->event_msg = NULL; 868 869 if (node_up) { 870 /* Hold back event if a LEAVE msg should be expected */ 871 if (m->state != MBR_LEAVING) { 872 m->event_msg = skb; 873 tipc_group_decr_active(grp, m); 874 m->state = MBR_LEAVING; 875 } else { 876 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 877 __skb_queue_tail(inputq, skb); 878 } 879 } else { 880 if (m->state != MBR_LEAVING) { 881 tipc_group_decr_active(grp, m); 882 m->state = MBR_LEAVING; 883 msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt); 884 } else { 885 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 886 } 887 __skb_queue_tail(inputq, skb); 888 } 889 list_del_init(&m->list); 890 list_del_init(&m->congested); 891 } 892 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 893 return; 894 drop: 895 kfree_skb(skb); 896 } 897