1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 #include "core.h" 37 #include "addr.h" 38 #include "group.h" 39 #include "bcast.h" 40 #include "topsrv.h" 41 #include "msg.h" 42 #include "socket.h" 43 #include "node.h" 44 #include "name_table.h" 45 #include "subscr.h" 46 47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 48 #define ADV_IDLE ADV_UNIT 49 #define ADV_ACTIVE (ADV_UNIT * 12) 50 51 enum mbr_state { 52 MBR_JOINING, 53 MBR_PUBLISHED, 54 MBR_JOINED, 55 MBR_PENDING, 56 MBR_ACTIVE, 57 MBR_RECLAIMING, 58 MBR_REMITTED, 59 MBR_LEAVING 60 }; 61 62 struct tipc_member { 63 struct rb_node tree_node; 64 struct list_head list; 65 struct list_head small_win; 66 struct sk_buff_head deferredq; 67 struct tipc_group *group; 68 u32 node; 69 u32 port; 70 u32 instance; 71 enum mbr_state state; 72 u16 advertised; 73 u16 window; 74 u16 bc_rcv_nxt; 75 u16 bc_syncpt; 76 u16 bc_acked; 77 }; 78 79 struct tipc_group { 80 struct rb_root members; 81 struct list_head small_win; 82 struct list_head pending; 83 struct list_head active; 84 struct tipc_nlist dests; 85 struct net *net; 86 int subid; 87 u32 type; 88 u32 instance; 89 u32 scope; 90 u32 portid; 91 u16 member_cnt; 92 u16 active_cnt; 93 u16 max_active; 94 u16 bc_snd_nxt; 95 u16 bc_ackers; 96 bool *open; 97 bool loopback; 98 bool events; 99 }; 100 101 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 102 int mtyp, struct sk_buff_head *xmitq); 103 104 static void tipc_group_open(struct tipc_member *m, bool *wakeup) 105 { 106 *wakeup = false; 107 if (list_empty(&m->small_win)) 108 return; 109 list_del_init(&m->small_win); 110 *m->group->open = true; 111 *wakeup = true; 112 } 113 114 static void tipc_group_decr_active(struct tipc_group *grp, 115 struct tipc_member *m) 116 { 117 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING || 118 m->state == MBR_REMITTED) 119 grp->active_cnt--; 120 } 121 122 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 123 { 124 int max_active, active_pool, idle_pool; 125 int mcnt = grp->member_cnt + 1; 126 127 /* Limit simultaneous reception from other members */ 128 max_active = min(mcnt / 8, 64); 129 max_active = max(max_active, 16); 130 grp->max_active = max_active; 131 132 /* Reserve blocks for active and idle members */ 133 active_pool = max_active * ADV_ACTIVE; 134 idle_pool = (mcnt - max_active) * ADV_IDLE; 135 136 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 137 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 138 } 139 140 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 141 { 142 return grp->bc_snd_nxt; 143 } 144 145 static bool tipc_group_is_receiver(struct tipc_member *m) 146 { 147 return m && m->state != MBR_JOINING && m->state != MBR_LEAVING; 148 } 149 150 static bool tipc_group_is_sender(struct tipc_member *m) 151 { 152 return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED; 153 } 154 155 u32 tipc_group_exclude(struct tipc_group *grp) 156 { 157 if (!grp->loopback) 158 return grp->portid; 159 return 0; 160 } 161 162 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 163 struct tipc_group_req *mreq, 164 bool *group_is_open) 165 { 166 u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS; 167 bool global = mreq->scope != TIPC_NODE_SCOPE; 168 struct tipc_group *grp; 169 u32 type = mreq->type; 170 171 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 172 if (!grp) 173 return NULL; 174 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 175 INIT_LIST_HEAD(&grp->small_win); 176 INIT_LIST_HEAD(&grp->active); 177 INIT_LIST_HEAD(&grp->pending); 178 grp->members = RB_ROOT; 179 grp->net = net; 180 grp->portid = portid; 181 grp->type = type; 182 grp->instance = mreq->instance; 183 grp->scope = mreq->scope; 184 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 185 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 186 grp->open = group_is_open; 187 *grp->open = false; 188 filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE; 189 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, 190 filter, &grp->subid)) 191 return grp; 192 kfree(grp); 193 return NULL; 194 } 195 196 void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf) 197 { 198 struct rb_root *tree = &grp->members; 199 struct tipc_member *m, *tmp; 200 struct sk_buff_head xmitq; 201 202 __skb_queue_head_init(&xmitq); 203 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 204 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq); 205 tipc_group_update_member(m, 0); 206 } 207 tipc_node_distr_xmit(net, &xmitq); 208 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 209 } 210 211 void tipc_group_delete(struct net *net, struct tipc_group *grp) 212 { 213 struct rb_root *tree = &grp->members; 214 struct tipc_member *m, *tmp; 215 struct sk_buff_head xmitq; 216 217 __skb_queue_head_init(&xmitq); 218 219 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 220 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 221 __skb_queue_purge(&m->deferredq); 222 list_del(&m->list); 223 kfree(m); 224 } 225 tipc_node_distr_xmit(net, &xmitq); 226 tipc_nlist_purge(&grp->dests); 227 tipc_topsrv_kern_unsubscr(net, grp->subid); 228 kfree(grp); 229 } 230 231 static struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 232 u32 node, u32 port) 233 { 234 struct rb_node *n = grp->members.rb_node; 235 u64 nkey, key = (u64)node << 32 | port; 236 struct tipc_member *m; 237 238 while (n) { 239 m = container_of(n, struct tipc_member, tree_node); 240 nkey = (u64)m->node << 32 | m->port; 241 if (key < nkey) 242 n = n->rb_left; 243 else if (key > nkey) 244 n = n->rb_right; 245 else 246 return m; 247 } 248 return NULL; 249 } 250 251 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 252 u32 node, u32 port) 253 { 254 struct tipc_member *m; 255 256 m = tipc_group_find_member(grp, node, port); 257 if (m && tipc_group_is_receiver(m)) 258 return m; 259 return NULL; 260 } 261 262 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 263 u32 node) 264 { 265 struct tipc_member *m; 266 struct rb_node *n; 267 268 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 269 m = container_of(n, struct tipc_member, tree_node); 270 if (m->node == node) 271 return m; 272 } 273 return NULL; 274 } 275 276 static void tipc_group_add_to_tree(struct tipc_group *grp, 277 struct tipc_member *m) 278 { 279 u64 nkey, key = (u64)m->node << 32 | m->port; 280 struct rb_node **n, *parent = NULL; 281 struct tipc_member *tmp; 282 283 n = &grp->members.rb_node; 284 while (*n) { 285 tmp = container_of(*n, struct tipc_member, tree_node); 286 parent = *n; 287 tmp = container_of(parent, struct tipc_member, tree_node); 288 nkey = (u64)tmp->node << 32 | tmp->port; 289 if (key < nkey) 290 n = &(*n)->rb_left; 291 else if (key > nkey) 292 n = &(*n)->rb_right; 293 else 294 return; 295 } 296 rb_link_node(&m->tree_node, parent, n); 297 rb_insert_color(&m->tree_node, &grp->members); 298 } 299 300 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 301 u32 node, u32 port, 302 u32 instance, int state) 303 { 304 struct tipc_member *m; 305 306 m = kzalloc(sizeof(*m), GFP_ATOMIC); 307 if (!m) 308 return NULL; 309 INIT_LIST_HEAD(&m->list); 310 INIT_LIST_HEAD(&m->small_win); 311 __skb_queue_head_init(&m->deferredq); 312 m->group = grp; 313 m->node = node; 314 m->port = port; 315 m->instance = instance; 316 m->bc_acked = grp->bc_snd_nxt - 1; 317 grp->member_cnt++; 318 tipc_group_add_to_tree(grp, m); 319 tipc_nlist_add(&grp->dests, m->node); 320 m->state = state; 321 return m; 322 } 323 324 void tipc_group_add_member(struct tipc_group *grp, u32 node, 325 u32 port, u32 instance) 326 { 327 tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED); 328 } 329 330 static void tipc_group_delete_member(struct tipc_group *grp, 331 struct tipc_member *m) 332 { 333 rb_erase(&m->tree_node, &grp->members); 334 grp->member_cnt--; 335 336 /* Check if we were waiting for replicast ack from this member */ 337 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 338 grp->bc_ackers--; 339 340 list_del_init(&m->list); 341 list_del_init(&m->small_win); 342 tipc_group_decr_active(grp, m); 343 344 /* If last member on a node, remove node from dest list */ 345 if (!tipc_group_find_node(grp, m->node)) 346 tipc_nlist_del(&grp->dests, m->node); 347 348 kfree(m); 349 } 350 351 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 352 { 353 return &grp->dests; 354 } 355 356 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 357 int *scope) 358 { 359 seq->type = grp->type; 360 seq->lower = grp->instance; 361 seq->upper = grp->instance; 362 *scope = grp->scope; 363 } 364 365 void tipc_group_update_member(struct tipc_member *m, int len) 366 { 367 struct tipc_group *grp = m->group; 368 struct tipc_member *_m, *tmp; 369 370 if (!tipc_group_is_receiver(m)) 371 return; 372 373 m->window -= len; 374 375 if (m->window >= ADV_IDLE) 376 return; 377 378 list_del_init(&m->small_win); 379 380 /* Sort member into small_window members' list */ 381 list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) { 382 if (_m->window > m->window) 383 break; 384 } 385 list_add_tail(&m->small_win, &_m->small_win); 386 } 387 388 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 389 { 390 u16 prev = grp->bc_snd_nxt - 1; 391 struct tipc_member *m; 392 struct rb_node *n; 393 u16 ackers = 0; 394 395 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 396 m = container_of(n, struct tipc_member, tree_node); 397 if (tipc_group_is_receiver(m)) { 398 tipc_group_update_member(m, len); 399 m->bc_acked = prev; 400 ackers++; 401 } 402 } 403 404 /* Mark number of acknowledges to expect, if any */ 405 if (ack) 406 grp->bc_ackers = ackers; 407 grp->bc_snd_nxt++; 408 } 409 410 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 411 int len, struct tipc_member **mbr) 412 { 413 struct sk_buff_head xmitq; 414 struct tipc_member *m; 415 int adv, state; 416 417 m = tipc_group_find_dest(grp, dnode, dport); 418 if (!tipc_group_is_receiver(m)) { 419 *mbr = NULL; 420 return false; 421 } 422 *mbr = m; 423 424 if (m->window >= len) 425 return false; 426 427 *grp->open = false; 428 429 /* If not fully advertised, do it now to prevent mutual blocking */ 430 adv = m->advertised; 431 state = m->state; 432 if (state == MBR_JOINED && adv == ADV_IDLE) 433 return true; 434 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 435 return true; 436 if (state == MBR_PENDING && adv == ADV_IDLE) 437 return true; 438 __skb_queue_head_init(&xmitq); 439 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 440 tipc_node_distr_xmit(grp->net, &xmitq); 441 return true; 442 } 443 444 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 445 { 446 struct tipc_member *m = NULL; 447 448 /* If prev bcast was replicast, reject until all receivers have acked */ 449 if (grp->bc_ackers) { 450 *grp->open = false; 451 return true; 452 } 453 if (list_empty(&grp->small_win)) 454 return false; 455 456 m = list_first_entry(&grp->small_win, struct tipc_member, small_win); 457 if (m->window >= len) 458 return false; 459 460 return tipc_group_cong(grp, m->node, m->port, len, &m); 461 } 462 463 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 464 */ 465 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 466 { 467 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 468 u16 bc_seqno = msg_grp_bc_seqno(hdr); 469 struct sk_buff *_skb, *tmp; 470 int mtyp = msg_type(hdr); 471 472 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 473 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 474 skb_queue_walk_safe(defq, _skb, tmp) { 475 _hdr = buf_msg(_skb); 476 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 477 continue; 478 __skb_queue_before(defq, _skb, skb); 479 return; 480 } 481 /* Bcast was not bypassed, - add to tail */ 482 } 483 /* Unicasts are never bypassed, - always add to tail */ 484 __skb_queue_tail(defq, skb); 485 } 486 487 /* tipc_group_filter_msg() - determine if we should accept arriving message 488 */ 489 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 490 struct sk_buff_head *xmitq) 491 { 492 struct sk_buff *skb = __skb_dequeue(inputq); 493 bool ack, deliver, update, leave = false; 494 struct sk_buff_head *defq; 495 struct tipc_member *m; 496 struct tipc_msg *hdr; 497 u32 node, port; 498 int mtyp, blks; 499 500 if (!skb) 501 return; 502 503 hdr = buf_msg(skb); 504 node = msg_orignode(hdr); 505 port = msg_origport(hdr); 506 507 if (!msg_in_group(hdr)) 508 goto drop; 509 510 m = tipc_group_find_member(grp, node, port); 511 if (!tipc_group_is_sender(m)) 512 goto drop; 513 514 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 515 goto drop; 516 517 TIPC_SKB_CB(skb)->orig_member = m->instance; 518 defq = &m->deferredq; 519 tipc_group_sort_msg(skb, defq); 520 521 while ((skb = skb_peek(defq))) { 522 hdr = buf_msg(skb); 523 mtyp = msg_type(hdr); 524 blks = msg_blocks(hdr); 525 deliver = true; 526 ack = false; 527 update = false; 528 529 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 530 break; 531 532 /* Decide what to do with message */ 533 switch (mtyp) { 534 case TIPC_GRP_MCAST_MSG: 535 if (msg_nameinst(hdr) != grp->instance) { 536 update = true; 537 deliver = false; 538 } 539 fallthrough; 540 case TIPC_GRP_BCAST_MSG: 541 m->bc_rcv_nxt++; 542 ack = msg_grp_bc_ack_req(hdr); 543 break; 544 case TIPC_GRP_UCAST_MSG: 545 break; 546 case TIPC_GRP_MEMBER_EVT: 547 if (m->state == MBR_LEAVING) 548 leave = true; 549 if (!grp->events) 550 deliver = false; 551 break; 552 default: 553 break; 554 } 555 556 /* Execute decisions */ 557 __skb_dequeue(defq); 558 if (deliver) 559 __skb_queue_tail(inputq, skb); 560 else 561 kfree_skb(skb); 562 563 if (ack) 564 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 565 566 if (leave) { 567 __skb_queue_purge(defq); 568 tipc_group_delete_member(grp, m); 569 break; 570 } 571 if (!update) 572 continue; 573 574 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 575 } 576 return; 577 drop: 578 kfree_skb(skb); 579 } 580 581 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 582 u32 port, struct sk_buff_head *xmitq) 583 { 584 struct list_head *active = &grp->active; 585 int max_active = grp->max_active; 586 int reclaim_limit = max_active * 3 / 4; 587 int active_cnt = grp->active_cnt; 588 struct tipc_member *m, *rm, *pm; 589 590 m = tipc_group_find_member(grp, node, port); 591 if (!m) 592 return; 593 594 m->advertised -= blks; 595 596 switch (m->state) { 597 case MBR_JOINED: 598 /* First, decide if member can go active */ 599 if (active_cnt <= max_active) { 600 m->state = MBR_ACTIVE; 601 list_add_tail(&m->list, active); 602 grp->active_cnt++; 603 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 604 } else { 605 m->state = MBR_PENDING; 606 list_add_tail(&m->list, &grp->pending); 607 } 608 609 if (active_cnt < reclaim_limit) 610 break; 611 612 /* Reclaim from oldest active member, if possible */ 613 if (!list_empty(active)) { 614 rm = list_first_entry(active, struct tipc_member, list); 615 rm->state = MBR_RECLAIMING; 616 list_del_init(&rm->list); 617 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 618 break; 619 } 620 /* Nobody to reclaim from; - revert oldest pending to JOINED */ 621 pm = list_first_entry(&grp->pending, struct tipc_member, list); 622 list_del_init(&pm->list); 623 pm->state = MBR_JOINED; 624 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 625 break; 626 case MBR_ACTIVE: 627 if (!list_is_last(&m->list, &grp->active)) 628 list_move_tail(&m->list, &grp->active); 629 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 630 break; 631 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 632 break; 633 case MBR_REMITTED: 634 if (m->advertised > ADV_IDLE) 635 break; 636 m->state = MBR_JOINED; 637 grp->active_cnt--; 638 if (m->advertised < ADV_IDLE) { 639 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 640 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 641 } 642 643 if (list_empty(&grp->pending)) 644 return; 645 646 /* Set oldest pending member to active and advertise */ 647 pm = list_first_entry(&grp->pending, struct tipc_member, list); 648 pm->state = MBR_ACTIVE; 649 list_move_tail(&pm->list, &grp->active); 650 grp->active_cnt++; 651 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 652 break; 653 case MBR_RECLAIMING: 654 case MBR_JOINING: 655 case MBR_LEAVING: 656 default: 657 break; 658 } 659 } 660 661 static void tipc_group_create_event(struct tipc_group *grp, 662 struct tipc_member *m, 663 u32 event, u16 seqno, 664 struct sk_buff_head *inputq) 665 { u32 dnode = tipc_own_addr(grp->net); 666 struct tipc_event evt; 667 struct sk_buff *skb; 668 struct tipc_msg *hdr; 669 670 memset(&evt, 0, sizeof(evt)); 671 evt.event = event; 672 evt.found_lower = m->instance; 673 evt.found_upper = m->instance; 674 evt.port.ref = m->port; 675 evt.port.node = m->node; 676 evt.s.seq.type = grp->type; 677 evt.s.seq.lower = m->instance; 678 evt.s.seq.upper = m->instance; 679 680 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT, 681 GROUP_H_SIZE, sizeof(evt), dnode, m->node, 682 grp->portid, m->port, 0); 683 if (!skb) 684 return; 685 686 hdr = buf_msg(skb); 687 msg_set_nametype(hdr, grp->type); 688 msg_set_grp_evt(hdr, event); 689 msg_set_dest_droppable(hdr, true); 690 msg_set_grp_bc_seqno(hdr, seqno); 691 memcpy(msg_data(hdr), &evt, sizeof(evt)); 692 TIPC_SKB_CB(skb)->orig_member = m->instance; 693 __skb_queue_tail(inputq, skb); 694 } 695 696 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 697 int mtyp, struct sk_buff_head *xmitq) 698 { 699 struct tipc_msg *hdr; 700 struct sk_buff *skb; 701 int adv = 0; 702 703 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 704 m->node, tipc_own_addr(grp->net), 705 m->port, grp->portid, 0); 706 if (!skb) 707 return; 708 709 if (m->state == MBR_ACTIVE) 710 adv = ADV_ACTIVE - m->advertised; 711 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 712 adv = ADV_IDLE - m->advertised; 713 714 hdr = buf_msg(skb); 715 716 if (mtyp == GRP_JOIN_MSG) { 717 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 718 msg_set_adv_win(hdr, adv); 719 m->advertised += adv; 720 } else if (mtyp == GRP_LEAVE_MSG) { 721 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 722 } else if (mtyp == GRP_ADV_MSG) { 723 msg_set_adv_win(hdr, adv); 724 m->advertised += adv; 725 } else if (mtyp == GRP_ACK_MSG) { 726 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 727 } else if (mtyp == GRP_REMIT_MSG) { 728 msg_set_grp_remitted(hdr, m->window); 729 } 730 msg_set_dest_droppable(hdr, true); 731 __skb_queue_tail(xmitq, skb); 732 } 733 734 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 735 struct tipc_msg *hdr, struct sk_buff_head *inputq, 736 struct sk_buff_head *xmitq) 737 { 738 u32 node = msg_orignode(hdr); 739 u32 port = msg_origport(hdr); 740 struct tipc_member *m, *pm; 741 u16 remitted, in_flight; 742 743 if (!grp) 744 return; 745 746 if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) 747 return; 748 749 m = tipc_group_find_member(grp, node, port); 750 751 switch (msg_type(hdr)) { 752 case GRP_JOIN_MSG: 753 if (!m) 754 m = tipc_group_create_member(grp, node, port, 755 0, MBR_JOINING); 756 if (!m) 757 return; 758 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 759 m->bc_rcv_nxt = m->bc_syncpt; 760 m->window += msg_adv_win(hdr); 761 762 /* Wait until PUBLISH event is received if necessary */ 763 if (m->state != MBR_PUBLISHED) 764 return; 765 766 /* Member can be taken into service */ 767 m->state = MBR_JOINED; 768 tipc_group_open(m, usr_wakeup); 769 tipc_group_update_member(m, 0); 770 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 771 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 772 m->bc_syncpt, inputq); 773 return; 774 case GRP_LEAVE_MSG: 775 if (!m) 776 return; 777 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 778 list_del_init(&m->list); 779 tipc_group_open(m, usr_wakeup); 780 tipc_group_decr_active(grp, m); 781 m->state = MBR_LEAVING; 782 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 783 m->bc_syncpt, inputq); 784 return; 785 case GRP_ADV_MSG: 786 if (!m) 787 return; 788 m->window += msg_adv_win(hdr); 789 tipc_group_open(m, usr_wakeup); 790 return; 791 case GRP_ACK_MSG: 792 if (!m) 793 return; 794 m->bc_acked = msg_grp_bc_acked(hdr); 795 if (--grp->bc_ackers) 796 return; 797 list_del_init(&m->small_win); 798 *m->group->open = true; 799 *usr_wakeup = true; 800 tipc_group_update_member(m, 0); 801 return; 802 case GRP_RECLAIM_MSG: 803 if (!m) 804 return; 805 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 806 m->window = ADV_IDLE; 807 tipc_group_open(m, usr_wakeup); 808 return; 809 case GRP_REMIT_MSG: 810 if (!m || m->state != MBR_RECLAIMING) 811 return; 812 813 remitted = msg_grp_remitted(hdr); 814 815 /* Messages preceding the REMIT still in receive queue */ 816 if (m->advertised > remitted) { 817 m->state = MBR_REMITTED; 818 in_flight = m->advertised - remitted; 819 m->advertised = ADV_IDLE + in_flight; 820 return; 821 } 822 /* This should never happen */ 823 if (m->advertised < remitted) 824 pr_warn_ratelimited("Unexpected REMIT msg\n"); 825 826 /* All messages preceding the REMIT have been read */ 827 m->state = MBR_JOINED; 828 grp->active_cnt--; 829 m->advertised = ADV_IDLE; 830 831 /* Set oldest pending member to active and advertise */ 832 if (list_empty(&grp->pending)) 833 return; 834 pm = list_first_entry(&grp->pending, struct tipc_member, list); 835 pm->state = MBR_ACTIVE; 836 list_move_tail(&pm->list, &grp->active); 837 grp->active_cnt++; 838 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 839 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 840 return; 841 default: 842 pr_warn("Received unknown GROUP_PROTO message\n"); 843 } 844 } 845 846 /* tipc_group_member_evt() - receive and handle a member up/down event 847 */ 848 void tipc_group_member_evt(struct tipc_group *grp, 849 bool *usr_wakeup, 850 int *sk_rcvbuf, 851 struct tipc_msg *hdr, 852 struct sk_buff_head *inputq, 853 struct sk_buff_head *xmitq) 854 { 855 struct tipc_event *evt = (void *)msg_data(hdr); 856 u32 instance = evt->found_lower; 857 u32 node = evt->port.node; 858 u32 port = evt->port.ref; 859 int event = evt->event; 860 struct tipc_member *m; 861 struct net *net; 862 u32 self; 863 864 if (!grp) 865 return; 866 867 net = grp->net; 868 self = tipc_own_addr(net); 869 if (!grp->loopback && node == self && port == grp->portid) 870 return; 871 872 m = tipc_group_find_member(grp, node, port); 873 874 switch (event) { 875 case TIPC_PUBLISHED: 876 /* Send and wait for arrival of JOIN message if necessary */ 877 if (!m) { 878 m = tipc_group_create_member(grp, node, port, instance, 879 MBR_PUBLISHED); 880 if (!m) 881 break; 882 tipc_group_update_member(m, 0); 883 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 884 break; 885 } 886 887 if (m->state != MBR_JOINING) 888 break; 889 890 /* Member can be taken into service */ 891 m->instance = instance; 892 m->state = MBR_JOINED; 893 tipc_group_open(m, usr_wakeup); 894 tipc_group_update_member(m, 0); 895 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 896 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 897 m->bc_syncpt, inputq); 898 break; 899 case TIPC_WITHDRAWN: 900 if (!m) 901 break; 902 903 tipc_group_decr_active(grp, m); 904 m->state = MBR_LEAVING; 905 list_del_init(&m->list); 906 tipc_group_open(m, usr_wakeup); 907 908 /* Only send event if no LEAVE message can be expected */ 909 if (!tipc_node_is_up(net, node)) 910 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 911 m->bc_rcv_nxt, inputq); 912 break; 913 default: 914 break; 915 } 916 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 917 } 918 919 int tipc_group_fill_sock_diag(struct tipc_group *grp, struct sk_buff *skb) 920 { 921 struct nlattr *group = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_GROUP); 922 923 if (!group) 924 return -EMSGSIZE; 925 926 if (nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_ID, 927 grp->type) || 928 nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_INSTANCE, 929 grp->instance) || 930 nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_BC_SEND_NEXT, 931 grp->bc_snd_nxt)) 932 goto group_msg_cancel; 933 934 if (grp->scope == TIPC_NODE_SCOPE) 935 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_NODE_SCOPE)) 936 goto group_msg_cancel; 937 938 if (grp->scope == TIPC_CLUSTER_SCOPE) 939 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_CLUSTER_SCOPE)) 940 goto group_msg_cancel; 941 942 if (*grp->open) 943 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_OPEN)) 944 goto group_msg_cancel; 945 946 nla_nest_end(skb, group); 947 return 0; 948 949 group_msg_cancel: 950 nla_nest_cancel(skb, group); 951 return -1; 952 } 953