xref: /linux/net/tipc/group.c (revision 140eb5227767c6754742020a16d2691222b9c19b)
1 /*
2  * net/tipc/group.c: TIPC group messaging code
3  *
4  * Copyright (c) 2017, Ericsson AB
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions are met:
9  *
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. Neither the names of the copyright holders nor the names of its
16  *    contributors may be used to endorse or promote products derived from
17  *    this software without specific prior written permission.
18  *
19  * Alternatively, this software may be distributed under the terms of the
20  * GNU General Public License ("GPL") version 2 as published by the Free
21  * Software Foundation.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33  * POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include "core.h"
37 #include "addr.h"
38 #include "group.h"
39 #include "bcast.h"
40 #include "server.h"
41 #include "msg.h"
42 #include "socket.h"
43 #include "node.h"
44 #include "name_table.h"
45 #include "subscr.h"
46 
47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
48 #define ADV_IDLE ADV_UNIT
49 #define ADV_ACTIVE (ADV_UNIT * 12)
50 
51 enum mbr_state {
52 	MBR_QUARANTINED,
53 	MBR_DISCOVERED,
54 	MBR_JOINING,
55 	MBR_PUBLISHED,
56 	MBR_JOINED,
57 	MBR_PENDING,
58 	MBR_ACTIVE,
59 	MBR_RECLAIMING,
60 	MBR_REMITTED,
61 	MBR_LEAVING
62 };
63 
64 struct tipc_member {
65 	struct rb_node tree_node;
66 	struct list_head list;
67 	struct list_head congested;
68 	struct sk_buff *event_msg;
69 	struct sk_buff_head deferredq;
70 	struct tipc_group *group;
71 	u32 node;
72 	u32 port;
73 	u32 instance;
74 	enum mbr_state state;
75 	u16 advertised;
76 	u16 window;
77 	u16 bc_rcv_nxt;
78 	u16 bc_syncpt;
79 	u16 bc_acked;
80 	bool usr_pending;
81 };
82 
83 struct tipc_group {
84 	struct rb_root members;
85 	struct list_head congested;
86 	struct list_head pending;
87 	struct list_head active;
88 	struct list_head reclaiming;
89 	struct tipc_nlist dests;
90 	struct net *net;
91 	int subid;
92 	u32 type;
93 	u32 instance;
94 	u32 domain;
95 	u32 scope;
96 	u32 portid;
97 	u16 member_cnt;
98 	u16 active_cnt;
99 	u16 max_active;
100 	u16 bc_snd_nxt;
101 	u16 bc_ackers;
102 	bool loopback;
103 	bool events;
104 };
105 
106 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
107 				  int mtyp, struct sk_buff_head *xmitq);
108 
109 static void tipc_group_decr_active(struct tipc_group *grp,
110 				   struct tipc_member *m)
111 {
112 	if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING ||
113 	    m->state == MBR_REMITTED)
114 		grp->active_cnt--;
115 }
116 
117 static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
118 {
119 	int max_active, active_pool, idle_pool;
120 	int mcnt = grp->member_cnt + 1;
121 
122 	/* Limit simultaneous reception from other members */
123 	max_active = min(mcnt / 8, 64);
124 	max_active = max(max_active, 16);
125 	grp->max_active = max_active;
126 
127 	/* Reserve blocks for active and idle members */
128 	active_pool = max_active * ADV_ACTIVE;
129 	idle_pool = (mcnt - max_active) * ADV_IDLE;
130 
131 	/* Scale to bytes, considering worst-case truesize/msgsize ratio */
132 	return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4;
133 }
134 
135 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
136 {
137 	return grp->bc_snd_nxt;
138 }
139 
140 static bool tipc_group_is_enabled(struct tipc_member *m)
141 {
142 	return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING;
143 }
144 
145 static bool tipc_group_is_receiver(struct tipc_member *m)
146 {
147 	return m && m->state >= MBR_JOINED;
148 }
149 
150 u32 tipc_group_exclude(struct tipc_group *grp)
151 {
152 	if (!grp->loopback)
153 		return grp->portid;
154 	return 0;
155 }
156 
157 int tipc_group_size(struct tipc_group *grp)
158 {
159 	return grp->member_cnt;
160 }
161 
162 struct tipc_group *tipc_group_create(struct net *net, u32 portid,
163 				     struct tipc_group_req *mreq)
164 {
165 	struct tipc_group *grp;
166 	u32 type = mreq->type;
167 
168 	grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
169 	if (!grp)
170 		return NULL;
171 	tipc_nlist_init(&grp->dests, tipc_own_addr(net));
172 	INIT_LIST_HEAD(&grp->congested);
173 	INIT_LIST_HEAD(&grp->active);
174 	INIT_LIST_HEAD(&grp->pending);
175 	INIT_LIST_HEAD(&grp->reclaiming);
176 	grp->members = RB_ROOT;
177 	grp->net = net;
178 	grp->portid = portid;
179 	grp->domain = addr_domain(net, mreq->scope);
180 	grp->type = type;
181 	grp->instance = mreq->instance;
182 	grp->scope = mreq->scope;
183 	grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
184 	grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
185 	if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
186 		return grp;
187 	kfree(grp);
188 	return NULL;
189 }
190 
191 void tipc_group_delete(struct net *net, struct tipc_group *grp)
192 {
193 	struct rb_root *tree = &grp->members;
194 	struct tipc_member *m, *tmp;
195 	struct sk_buff_head xmitq;
196 
197 	__skb_queue_head_init(&xmitq);
198 
199 	rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
200 		tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
201 		list_del(&m->list);
202 		kfree(m);
203 	}
204 	tipc_node_distr_xmit(net, &xmitq);
205 	tipc_nlist_purge(&grp->dests);
206 	tipc_topsrv_kern_unsubscr(net, grp->subid);
207 	kfree(grp);
208 }
209 
210 struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
211 					   u32 node, u32 port)
212 {
213 	struct rb_node *n = grp->members.rb_node;
214 	u64 nkey, key = (u64)node << 32 | port;
215 	struct tipc_member *m;
216 
217 	while (n) {
218 		m = container_of(n, struct tipc_member, tree_node);
219 		nkey = (u64)m->node << 32 | m->port;
220 		if (key < nkey)
221 			n = n->rb_left;
222 		else if (key > nkey)
223 			n = n->rb_right;
224 		else
225 			return m;
226 	}
227 	return NULL;
228 }
229 
230 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
231 						u32 node, u32 port)
232 {
233 	struct tipc_member *m;
234 
235 	m = tipc_group_find_member(grp, node, port);
236 	if (m && tipc_group_is_enabled(m))
237 		return m;
238 	return NULL;
239 }
240 
241 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
242 						u32 node)
243 {
244 	struct tipc_member *m;
245 	struct rb_node *n;
246 
247 	for (n = rb_first(&grp->members); n; n = rb_next(n)) {
248 		m = container_of(n, struct tipc_member, tree_node);
249 		if (m->node == node)
250 			return m;
251 	}
252 	return NULL;
253 }
254 
255 static void tipc_group_add_to_tree(struct tipc_group *grp,
256 				   struct tipc_member *m)
257 {
258 	u64 nkey, key = (u64)m->node << 32 | m->port;
259 	struct rb_node **n, *parent = NULL;
260 	struct tipc_member *tmp;
261 
262 	n = &grp->members.rb_node;
263 	while (*n) {
264 		tmp = container_of(*n, struct tipc_member, tree_node);
265 		parent = *n;
266 		tmp = container_of(parent, struct tipc_member, tree_node);
267 		nkey = (u64)tmp->node << 32 | tmp->port;
268 		if (key < nkey)
269 			n = &(*n)->rb_left;
270 		else if (key > nkey)
271 			n = &(*n)->rb_right;
272 		else
273 			return;
274 	}
275 	rb_link_node(&m->tree_node, parent, n);
276 	rb_insert_color(&m->tree_node, &grp->members);
277 }
278 
279 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
280 						    u32 node, u32 port,
281 						    int state)
282 {
283 	struct tipc_member *m;
284 
285 	m = kzalloc(sizeof(*m), GFP_ATOMIC);
286 	if (!m)
287 		return NULL;
288 	INIT_LIST_HEAD(&m->list);
289 	INIT_LIST_HEAD(&m->congested);
290 	__skb_queue_head_init(&m->deferredq);
291 	m->group = grp;
292 	m->node = node;
293 	m->port = port;
294 	m->bc_acked = grp->bc_snd_nxt - 1;
295 	grp->member_cnt++;
296 	tipc_group_add_to_tree(grp, m);
297 	tipc_nlist_add(&grp->dests, m->node);
298 	m->state = state;
299 	return m;
300 }
301 
302 void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
303 {
304 	tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
305 }
306 
307 static void tipc_group_delete_member(struct tipc_group *grp,
308 				     struct tipc_member *m)
309 {
310 	rb_erase(&m->tree_node, &grp->members);
311 	grp->member_cnt--;
312 
313 	/* Check if we were waiting for replicast ack from this member */
314 	if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
315 		grp->bc_ackers--;
316 
317 	list_del_init(&m->list);
318 	list_del_init(&m->congested);
319 	tipc_group_decr_active(grp, m);
320 
321 	/* If last member on a node, remove node from dest list */
322 	if (!tipc_group_find_node(grp, m->node))
323 		tipc_nlist_del(&grp->dests, m->node);
324 
325 	kfree(m);
326 }
327 
328 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
329 {
330 	return &grp->dests;
331 }
332 
333 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
334 		     int *scope)
335 {
336 	seq->type = grp->type;
337 	seq->lower = grp->instance;
338 	seq->upper = grp->instance;
339 	*scope = grp->scope;
340 }
341 
342 void tipc_group_update_member(struct tipc_member *m, int len)
343 {
344 	struct tipc_group *grp = m->group;
345 	struct tipc_member *_m, *tmp;
346 
347 	if (!tipc_group_is_enabled(m))
348 		return;
349 
350 	m->window -= len;
351 
352 	if (m->window >= ADV_IDLE)
353 		return;
354 
355 	list_del_init(&m->congested);
356 
357 	/* Sort member into congested members' list */
358 	list_for_each_entry_safe(_m, tmp, &grp->congested, congested) {
359 		if (m->window > _m->window)
360 			continue;
361 		list_add_tail(&m->congested, &_m->congested);
362 		return;
363 	}
364 	list_add_tail(&m->congested, &grp->congested);
365 }
366 
367 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
368 {
369 	u16 prev = grp->bc_snd_nxt - 1;
370 	struct tipc_member *m;
371 	struct rb_node *n;
372 	u16 ackers = 0;
373 
374 	for (n = rb_first(&grp->members); n; n = rb_next(n)) {
375 		m = container_of(n, struct tipc_member, tree_node);
376 		if (tipc_group_is_enabled(m)) {
377 			tipc_group_update_member(m, len);
378 			m->bc_acked = prev;
379 			ackers++;
380 		}
381 	}
382 
383 	/* Mark number of acknowledges to expect, if any */
384 	if (ack)
385 		grp->bc_ackers = ackers;
386 	grp->bc_snd_nxt++;
387 }
388 
389 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
390 		     int len, struct tipc_member **mbr)
391 {
392 	struct sk_buff_head xmitq;
393 	struct tipc_member *m;
394 	int adv, state;
395 
396 	m = tipc_group_find_dest(grp, dnode, dport);
397 	*mbr = m;
398 	if (!m)
399 		return false;
400 	if (m->usr_pending)
401 		return true;
402 	if (m->window >= len)
403 		return false;
404 	m->usr_pending = true;
405 
406 	/* If not fully advertised, do it now to prevent mutual blocking */
407 	adv = m->advertised;
408 	state = m->state;
409 	if (state < MBR_JOINED)
410 		return true;
411 	if (state == MBR_JOINED && adv == ADV_IDLE)
412 		return true;
413 	if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
414 		return true;
415 	if (state == MBR_PENDING && adv == ADV_IDLE)
416 		return true;
417 	skb_queue_head_init(&xmitq);
418 	tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
419 	tipc_node_distr_xmit(grp->net, &xmitq);
420 	return true;
421 }
422 
423 bool tipc_group_bc_cong(struct tipc_group *grp, int len)
424 {
425 	struct tipc_member *m = NULL;
426 
427 	/* If prev bcast was replicast, reject until all receivers have acked */
428 	if (grp->bc_ackers)
429 		return true;
430 
431 	if (list_empty(&grp->congested))
432 		return false;
433 
434 	m = list_first_entry(&grp->congested, struct tipc_member, congested);
435 	if (m->window >= len)
436 		return false;
437 
438 	return tipc_group_cong(grp, m->node, m->port, len, &m);
439 }
440 
441 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number
442  */
443 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
444 {
445 	struct tipc_msg *_hdr, *hdr = buf_msg(skb);
446 	u16 bc_seqno = msg_grp_bc_seqno(hdr);
447 	struct sk_buff *_skb, *tmp;
448 	int mtyp = msg_type(hdr);
449 
450 	/* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */
451 	if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
452 		skb_queue_walk_safe(defq, _skb, tmp) {
453 			_hdr = buf_msg(_skb);
454 			if (!less(bc_seqno, msg_grp_bc_seqno(_hdr)))
455 				continue;
456 			__skb_queue_before(defq, _skb, skb);
457 			return;
458 		}
459 		/* Bcast was not bypassed, - add to tail */
460 	}
461 	/* Unicasts are never bypassed, - always add to tail */
462 	__skb_queue_tail(defq, skb);
463 }
464 
465 /* tipc_group_filter_msg() - determine if we should accept arriving message
466  */
467 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
468 			   struct sk_buff_head *xmitq)
469 {
470 	struct sk_buff *skb = __skb_dequeue(inputq);
471 	bool ack, deliver, update, leave = false;
472 	struct sk_buff_head *defq;
473 	struct tipc_member *m;
474 	struct tipc_msg *hdr;
475 	u32 node, port;
476 	int mtyp, blks;
477 
478 	if (!skb)
479 		return;
480 
481 	hdr = buf_msg(skb);
482 	node =  msg_orignode(hdr);
483 	port = msg_origport(hdr);
484 
485 	if (!msg_in_group(hdr))
486 		goto drop;
487 
488 	m = tipc_group_find_member(grp, node, port);
489 	if (!tipc_group_is_receiver(m))
490 		goto drop;
491 
492 	if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
493 		goto drop;
494 
495 	TIPC_SKB_CB(skb)->orig_member = m->instance;
496 	defq = &m->deferredq;
497 	tipc_group_sort_msg(skb, defq);
498 
499 	while ((skb = skb_peek(defq))) {
500 		hdr = buf_msg(skb);
501 		mtyp = msg_type(hdr);
502 		blks = msg_blocks(hdr);
503 		deliver = true;
504 		ack = false;
505 		update = false;
506 
507 		if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
508 			break;
509 
510 		/* Decide what to do with message */
511 		switch (mtyp) {
512 		case TIPC_GRP_MCAST_MSG:
513 			if (msg_nameinst(hdr) != grp->instance) {
514 				update = true;
515 				deliver = false;
516 			}
517 			/* Fall thru */
518 		case TIPC_GRP_BCAST_MSG:
519 			m->bc_rcv_nxt++;
520 			ack = msg_grp_bc_ack_req(hdr);
521 			break;
522 		case TIPC_GRP_UCAST_MSG:
523 			break;
524 		case TIPC_GRP_MEMBER_EVT:
525 			if (m->state == MBR_LEAVING)
526 				leave = true;
527 			if (!grp->events)
528 				deliver = false;
529 			break;
530 		default:
531 			break;
532 		}
533 
534 		/* Execute decisions */
535 		__skb_dequeue(defq);
536 		if (deliver)
537 			__skb_queue_tail(inputq, skb);
538 		else
539 			kfree_skb(skb);
540 
541 		if (ack)
542 			tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
543 
544 		if (leave) {
545 			__skb_queue_purge(defq);
546 			tipc_group_delete_member(grp, m);
547 			break;
548 		}
549 		if (!update)
550 			continue;
551 
552 		tipc_group_update_rcv_win(grp, blks, node, port, xmitq);
553 	}
554 	return;
555 drop:
556 	kfree_skb(skb);
557 }
558 
559 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
560 			       u32 port, struct sk_buff_head *xmitq)
561 {
562 	struct list_head *active = &grp->active;
563 	int max_active = grp->max_active;
564 	int reclaim_limit = max_active * 3 / 4;
565 	int active_cnt = grp->active_cnt;
566 	struct tipc_member *m, *rm, *pm;
567 
568 	m = tipc_group_find_member(grp, node, port);
569 	if (!m)
570 		return;
571 
572 	m->advertised -= blks;
573 
574 	switch (m->state) {
575 	case MBR_JOINED:
576 		/* Reclaim advertised space from least active member */
577 		if (!list_empty(active) && active_cnt >= reclaim_limit) {
578 			rm = list_first_entry(active, struct tipc_member, list);
579 			rm->state = MBR_RECLAIMING;
580 			list_move_tail(&rm->list, &grp->reclaiming);
581 			tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
582 		}
583 		/* If max active, become pending and wait for reclaimed space */
584 		if (active_cnt >= max_active) {
585 			m->state = MBR_PENDING;
586 			list_add_tail(&m->list, &grp->pending);
587 			break;
588 		}
589 		/* Otherwise become active */
590 		m->state = MBR_ACTIVE;
591 		list_add_tail(&m->list, &grp->active);
592 		grp->active_cnt++;
593 		/* Fall through */
594 	case MBR_ACTIVE:
595 		if (!list_is_last(&m->list, &grp->active))
596 			list_move_tail(&m->list, &grp->active);
597 		if (m->advertised > (ADV_ACTIVE * 3 / 4))
598 			break;
599 		tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
600 		break;
601 	case MBR_REMITTED:
602 		if (m->advertised > ADV_IDLE)
603 			break;
604 		m->state = MBR_JOINED;
605 		if (m->advertised < ADV_IDLE) {
606 			pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
607 			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
608 		}
609 		grp->active_cnt--;
610 		list_del_init(&m->list);
611 		if (list_empty(&grp->pending))
612 			return;
613 
614 		/* Set oldest pending member to active and advertise */
615 		pm = list_first_entry(&grp->pending, struct tipc_member, list);
616 		pm->state = MBR_ACTIVE;
617 		list_move_tail(&pm->list, &grp->active);
618 		grp->active_cnt++;
619 		tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
620 		break;
621 	case MBR_RECLAIMING:
622 	case MBR_DISCOVERED:
623 	case MBR_JOINING:
624 	case MBR_LEAVING:
625 	default:
626 		break;
627 	}
628 }
629 
630 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
631 				  int mtyp, struct sk_buff_head *xmitq)
632 {
633 	struct tipc_msg *hdr;
634 	struct sk_buff *skb;
635 	int adv = 0;
636 
637 	skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
638 			      m->node, tipc_own_addr(grp->net),
639 			      m->port, grp->portid, 0);
640 	if (!skb)
641 		return;
642 
643 	if (m->state == MBR_ACTIVE)
644 		adv = ADV_ACTIVE - m->advertised;
645 	else if (m->state == MBR_JOINED || m->state == MBR_PENDING)
646 		adv = ADV_IDLE - m->advertised;
647 
648 	hdr = buf_msg(skb);
649 
650 	if (mtyp == GRP_JOIN_MSG) {
651 		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
652 		msg_set_adv_win(hdr, adv);
653 		m->advertised += adv;
654 	} else if (mtyp == GRP_LEAVE_MSG) {
655 		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
656 	} else if (mtyp == GRP_ADV_MSG) {
657 		msg_set_adv_win(hdr, adv);
658 		m->advertised += adv;
659 	} else if (mtyp == GRP_ACK_MSG) {
660 		msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
661 	} else if (mtyp == GRP_REMIT_MSG) {
662 		msg_set_grp_remitted(hdr, m->window);
663 	}
664 	msg_set_dest_droppable(hdr, true);
665 	__skb_queue_tail(xmitq, skb);
666 }
667 
668 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
669 			  struct tipc_msg *hdr, struct sk_buff_head *inputq,
670 			  struct sk_buff_head *xmitq)
671 {
672 	u32 node = msg_orignode(hdr);
673 	u32 port = msg_origport(hdr);
674 	struct tipc_member *m, *pm;
675 	struct tipc_msg *ehdr;
676 	u16 remitted, in_flight;
677 
678 	if (!grp)
679 		return;
680 
681 	m = tipc_group_find_member(grp, node, port);
682 
683 	switch (msg_type(hdr)) {
684 	case GRP_JOIN_MSG:
685 		if (!m)
686 			m = tipc_group_create_member(grp, node, port,
687 						     MBR_QUARANTINED);
688 		if (!m)
689 			return;
690 		m->bc_syncpt = msg_grp_bc_syncpt(hdr);
691 		m->bc_rcv_nxt = m->bc_syncpt;
692 		m->window += msg_adv_win(hdr);
693 
694 		/* Wait until PUBLISH event is received */
695 		if (m->state == MBR_DISCOVERED) {
696 			m->state = MBR_JOINING;
697 		} else if (m->state == MBR_PUBLISHED) {
698 			m->state = MBR_JOINED;
699 			*usr_wakeup = true;
700 			m->usr_pending = false;
701 			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
702 			ehdr = buf_msg(m->event_msg);
703 			msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
704 			__skb_queue_tail(inputq, m->event_msg);
705 		}
706 		list_del_init(&m->congested);
707 		tipc_group_update_member(m, 0);
708 		return;
709 	case GRP_LEAVE_MSG:
710 		if (!m)
711 			return;
712 		m->bc_syncpt = msg_grp_bc_syncpt(hdr);
713 		list_del_init(&m->list);
714 		list_del_init(&m->congested);
715 		*usr_wakeup = true;
716 
717 		/* Wait until WITHDRAW event is received */
718 		if (m->state != MBR_LEAVING) {
719 			tipc_group_decr_active(grp, m);
720 			m->state = MBR_LEAVING;
721 			return;
722 		}
723 		/* Otherwise deliver already received WITHDRAW event */
724 		ehdr = buf_msg(m->event_msg);
725 		msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
726 		__skb_queue_tail(inputq, m->event_msg);
727 		return;
728 	case GRP_ADV_MSG:
729 		if (!m)
730 			return;
731 		m->window += msg_adv_win(hdr);
732 		*usr_wakeup = m->usr_pending;
733 		m->usr_pending = false;
734 		list_del_init(&m->congested);
735 		return;
736 	case GRP_ACK_MSG:
737 		if (!m)
738 			return;
739 		m->bc_acked = msg_grp_bc_acked(hdr);
740 		if (--grp->bc_ackers)
741 			break;
742 		*usr_wakeup = true;
743 		m->usr_pending = false;
744 		return;
745 	case GRP_RECLAIM_MSG:
746 		if (!m)
747 			return;
748 		*usr_wakeup = m->usr_pending;
749 		m->usr_pending = false;
750 		tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
751 		m->window = ADV_IDLE;
752 		return;
753 	case GRP_REMIT_MSG:
754 		if (!m || m->state != MBR_RECLAIMING)
755 			return;
756 
757 		remitted = msg_grp_remitted(hdr);
758 
759 		/* Messages preceding the REMIT still in receive queue */
760 		if (m->advertised > remitted) {
761 			m->state = MBR_REMITTED;
762 			in_flight = m->advertised - remitted;
763 			m->advertised = ADV_IDLE + in_flight;
764 			return;
765 		}
766 		/* All messages preceding the REMIT have been read */
767 		if (m->advertised <= remitted) {
768 			m->state = MBR_JOINED;
769 			in_flight = 0;
770 		}
771 		/* ..and the REMIT overtaken by more messages => re-advertise */
772 		if (m->advertised < remitted)
773 			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
774 
775 		m->advertised = ADV_IDLE + in_flight;
776 		grp->active_cnt--;
777 		list_del_init(&m->list);
778 
779 		/* Set oldest pending member to active and advertise */
780 		if (list_empty(&grp->pending))
781 			return;
782 		pm = list_first_entry(&grp->pending, struct tipc_member, list);
783 		pm->state = MBR_ACTIVE;
784 		list_move_tail(&pm->list, &grp->active);
785 		grp->active_cnt++;
786 		if (pm->advertised <= (ADV_ACTIVE * 3 / 4))
787 			tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
788 		return;
789 	default:
790 		pr_warn("Received unknown GROUP_PROTO message\n");
791 	}
792 }
793 
794 /* tipc_group_member_evt() - receive and handle a member up/down event
795  */
796 void tipc_group_member_evt(struct tipc_group *grp,
797 			   bool *usr_wakeup,
798 			   int *sk_rcvbuf,
799 			   struct sk_buff *skb,
800 			   struct sk_buff_head *inputq,
801 			   struct sk_buff_head *xmitq)
802 {
803 	struct tipc_msg *hdr = buf_msg(skb);
804 	struct tipc_event *evt = (void *)msg_data(hdr);
805 	u32 instance = evt->found_lower;
806 	u32 node = evt->port.node;
807 	u32 port = evt->port.ref;
808 	int event = evt->event;
809 	struct tipc_member *m;
810 	struct net *net;
811 	bool node_up;
812 	u32 self;
813 
814 	if (!grp)
815 		goto drop;
816 
817 	net = grp->net;
818 	self = tipc_own_addr(net);
819 	if (!grp->loopback && node == self && port == grp->portid)
820 		goto drop;
821 
822 	/* Convert message before delivery to user */
823 	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
824 	msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
825 	msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
826 	msg_set_origport(hdr, port);
827 	msg_set_orignode(hdr, node);
828 	msg_set_nametype(hdr, grp->type);
829 	msg_set_grp_evt(hdr, event);
830 
831 	m = tipc_group_find_member(grp, node, port);
832 
833 	if (event == TIPC_PUBLISHED) {
834 		if (!m)
835 			m = tipc_group_create_member(grp, node, port,
836 						     MBR_DISCOVERED);
837 		if (!m)
838 			goto drop;
839 
840 		/* Hold back event if JOIN message not yet received */
841 		if (m->state == MBR_DISCOVERED) {
842 			m->event_msg = skb;
843 			m->state = MBR_PUBLISHED;
844 		} else {
845 			msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
846 			__skb_queue_tail(inputq, skb);
847 			m->state = MBR_JOINED;
848 			*usr_wakeup = true;
849 			m->usr_pending = false;
850 		}
851 		m->instance = instance;
852 		TIPC_SKB_CB(skb)->orig_member = m->instance;
853 		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
854 		if (m->window < ADV_IDLE)
855 			tipc_group_update_member(m, 0);
856 		else
857 			list_del_init(&m->congested);
858 	} else if (event == TIPC_WITHDRAWN) {
859 		if (!m)
860 			goto drop;
861 
862 		TIPC_SKB_CB(skb)->orig_member = m->instance;
863 
864 		*usr_wakeup = true;
865 		m->usr_pending = false;
866 		node_up = tipc_node_is_up(net, node);
867 		m->event_msg = NULL;
868 
869 		if (node_up) {
870 			/* Hold back event if a LEAVE msg should be expected */
871 			if (m->state != MBR_LEAVING) {
872 				m->event_msg = skb;
873 				tipc_group_decr_active(grp, m);
874 				m->state = MBR_LEAVING;
875 			} else {
876 				msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
877 				__skb_queue_tail(inputq, skb);
878 			}
879 		} else {
880 			if (m->state != MBR_LEAVING) {
881 				tipc_group_decr_active(grp, m);
882 				m->state = MBR_LEAVING;
883 				msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
884 			} else {
885 				msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
886 			}
887 			__skb_queue_tail(inputq, skb);
888 		}
889 		list_del_init(&m->list);
890 		list_del_init(&m->congested);
891 	}
892 	*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
893 	return;
894 drop:
895 	kfree_skb(skb);
896 }
897