xref: /linux/net/tipc/node.c (revision ca55b2fef3a9373fcfc30f82fd26bc7fccbda732)
1 /*
2  * net/tipc/node.c: TIPC node management routines
3  *
4  * Copyright (c) 2000-2006, 2012-2015, Ericsson AB
5  * Copyright (c) 2005-2006, 2010-2014, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "link.h"
39 #include "node.h"
40 #include "name_distr.h"
41 #include "socket.h"
42 #include "bcast.h"
43 #include "discover.h"
44 
45 /* Node FSM states and events:
46  */
47 enum {
48 	SELF_DOWN_PEER_DOWN    = 0xdd,
49 	SELF_UP_PEER_UP        = 0xaa,
50 	SELF_DOWN_PEER_LEAVING = 0xd1,
51 	SELF_UP_PEER_COMING    = 0xac,
52 	SELF_COMING_PEER_UP    = 0xca,
53 	SELF_LEAVING_PEER_DOWN = 0x1d,
54 	NODE_FAILINGOVER       = 0xf0,
55 	NODE_SYNCHING          = 0xcc
56 };
57 
58 enum {
59 	SELF_ESTABL_CONTACT_EVT = 0xece,
60 	SELF_LOST_CONTACT_EVT   = 0x1ce,
61 	PEER_ESTABL_CONTACT_EVT = 0x9ece,
62 	PEER_LOST_CONTACT_EVT   = 0x91ce,
63 	NODE_FAILOVER_BEGIN_EVT = 0xfbe,
64 	NODE_FAILOVER_END_EVT   = 0xfee,
65 	NODE_SYNCH_BEGIN_EVT    = 0xcbe,
66 	NODE_SYNCH_END_EVT      = 0xcee
67 };
68 
69 static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
70 				  struct sk_buff_head *xmitq,
71 				  struct tipc_media_addr **maddr);
72 static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
73 				bool delete);
74 static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
75 static void node_established_contact(struct tipc_node *n_ptr);
76 static void tipc_node_delete(struct tipc_node *node);
77 static void tipc_node_timeout(unsigned long data);
78 static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
79 
80 struct tipc_sock_conn {
81 	u32 port;
82 	u32 peer_port;
83 	u32 peer_node;
84 	struct list_head list;
85 };
86 
87 static const struct nla_policy tipc_nl_node_policy[TIPC_NLA_NODE_MAX + 1] = {
88 	[TIPC_NLA_NODE_UNSPEC]		= { .type = NLA_UNSPEC },
89 	[TIPC_NLA_NODE_ADDR]		= { .type = NLA_U32 },
90 	[TIPC_NLA_NODE_UP]		= { .type = NLA_FLAG }
91 };
92 
93 /*
94  * A trivial power-of-two bitmask technique is used for speed, since this
95  * operation is done for every incoming TIPC packet. The number of hash table
96  * entries has been chosen so that no hash chain exceeds 8 nodes and will
97  * usually be much smaller (typically only a single node).
98  */
99 static unsigned int tipc_hashfn(u32 addr)
100 {
101 	return addr & (NODE_HTABLE_SIZE - 1);
102 }
103 
104 static void tipc_node_kref_release(struct kref *kref)
105 {
106 	struct tipc_node *node = container_of(kref, struct tipc_node, kref);
107 
108 	tipc_node_delete(node);
109 }
110 
111 void tipc_node_put(struct tipc_node *node)
112 {
113 	kref_put(&node->kref, tipc_node_kref_release);
114 }
115 
116 static void tipc_node_get(struct tipc_node *node)
117 {
118 	kref_get(&node->kref);
119 }
120 
121 /*
122  * tipc_node_find - locate specified node object, if it exists
123  */
124 struct tipc_node *tipc_node_find(struct net *net, u32 addr)
125 {
126 	struct tipc_net *tn = net_generic(net, tipc_net_id);
127 	struct tipc_node *node;
128 
129 	if (unlikely(!in_own_cluster_exact(net, addr)))
130 		return NULL;
131 
132 	rcu_read_lock();
133 	hlist_for_each_entry_rcu(node, &tn->node_htable[tipc_hashfn(addr)],
134 				 hash) {
135 		if (node->addr == addr) {
136 			tipc_node_get(node);
137 			rcu_read_unlock();
138 			return node;
139 		}
140 	}
141 	rcu_read_unlock();
142 	return NULL;
143 }
144 
145 struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
146 {
147 	struct tipc_net *tn = net_generic(net, tipc_net_id);
148 	struct tipc_node *n_ptr, *temp_node;
149 
150 	spin_lock_bh(&tn->node_list_lock);
151 	n_ptr = tipc_node_find(net, addr);
152 	if (n_ptr)
153 		goto exit;
154 	n_ptr = kzalloc(sizeof(*n_ptr), GFP_ATOMIC);
155 	if (!n_ptr) {
156 		pr_warn("Node creation failed, no memory\n");
157 		goto exit;
158 	}
159 	n_ptr->addr = addr;
160 	n_ptr->net = net;
161 	n_ptr->capabilities = capabilities;
162 	kref_init(&n_ptr->kref);
163 	spin_lock_init(&n_ptr->lock);
164 	INIT_HLIST_NODE(&n_ptr->hash);
165 	INIT_LIST_HEAD(&n_ptr->list);
166 	INIT_LIST_HEAD(&n_ptr->publ_list);
167 	INIT_LIST_HEAD(&n_ptr->conn_sks);
168 	skb_queue_head_init(&n_ptr->bclink.namedq);
169 	__skb_queue_head_init(&n_ptr->bclink.deferdq);
170 	hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
171 	list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
172 		if (n_ptr->addr < temp_node->addr)
173 			break;
174 	}
175 	list_add_tail_rcu(&n_ptr->list, &temp_node->list);
176 	n_ptr->state = SELF_DOWN_PEER_LEAVING;
177 	n_ptr->signature = INVALID_NODE_SIG;
178 	n_ptr->active_links[0] = INVALID_BEARER_ID;
179 	n_ptr->active_links[1] = INVALID_BEARER_ID;
180 	tipc_node_get(n_ptr);
181 	setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
182 	n_ptr->keepalive_intv = U32_MAX;
183 exit:
184 	spin_unlock_bh(&tn->node_list_lock);
185 	return n_ptr;
186 }
187 
188 static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l)
189 {
190 	unsigned long tol = l->tolerance;
191 	unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
192 	unsigned long keepalive_intv = msecs_to_jiffies(intv);
193 
194 	/* Link with lowest tolerance determines timer interval */
195 	if (keepalive_intv < n->keepalive_intv)
196 		n->keepalive_intv = keepalive_intv;
197 
198 	/* Ensure link's abort limit corresponds to current interval */
199 	l->abort_limit = l->tolerance / jiffies_to_msecs(n->keepalive_intv);
200 }
201 
202 static void tipc_node_delete(struct tipc_node *node)
203 {
204 	list_del_rcu(&node->list);
205 	hlist_del_rcu(&node->hash);
206 	kfree_rcu(node, rcu);
207 }
208 
209 void tipc_node_stop(struct net *net)
210 {
211 	struct tipc_net *tn = net_generic(net, tipc_net_id);
212 	struct tipc_node *node, *t_node;
213 
214 	spin_lock_bh(&tn->node_list_lock);
215 	list_for_each_entry_safe(node, t_node, &tn->node_list, list) {
216 		if (del_timer(&node->timer))
217 			tipc_node_put(node);
218 		tipc_node_put(node);
219 	}
220 	spin_unlock_bh(&tn->node_list_lock);
221 }
222 
223 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
224 {
225 	struct tipc_node *node;
226 	struct tipc_sock_conn *conn;
227 	int err = 0;
228 
229 	if (in_own_node(net, dnode))
230 		return 0;
231 
232 	node = tipc_node_find(net, dnode);
233 	if (!node) {
234 		pr_warn("Connecting sock to node 0x%x failed\n", dnode);
235 		return -EHOSTUNREACH;
236 	}
237 	conn = kmalloc(sizeof(*conn), GFP_ATOMIC);
238 	if (!conn) {
239 		err = -EHOSTUNREACH;
240 		goto exit;
241 	}
242 	conn->peer_node = dnode;
243 	conn->port = port;
244 	conn->peer_port = peer_port;
245 
246 	tipc_node_lock(node);
247 	list_add_tail(&conn->list, &node->conn_sks);
248 	tipc_node_unlock(node);
249 exit:
250 	tipc_node_put(node);
251 	return err;
252 }
253 
254 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
255 {
256 	struct tipc_node *node;
257 	struct tipc_sock_conn *conn, *safe;
258 
259 	if (in_own_node(net, dnode))
260 		return;
261 
262 	node = tipc_node_find(net, dnode);
263 	if (!node)
264 		return;
265 
266 	tipc_node_lock(node);
267 	list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
268 		if (port != conn->port)
269 			continue;
270 		list_del(&conn->list);
271 		kfree(conn);
272 	}
273 	tipc_node_unlock(node);
274 	tipc_node_put(node);
275 }
276 
277 /* tipc_node_timeout - handle expiration of node timer
278  */
279 static void tipc_node_timeout(unsigned long data)
280 {
281 	struct tipc_node *n = (struct tipc_node *)data;
282 	struct tipc_link_entry *le;
283 	struct sk_buff_head xmitq;
284 	int bearer_id;
285 	int rc = 0;
286 
287 	__skb_queue_head_init(&xmitq);
288 
289 	for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
290 		tipc_node_lock(n);
291 		le = &n->links[bearer_id];
292 		if (le->link) {
293 			/* Link tolerance may change asynchronously: */
294 			tipc_node_calculate_timer(n, le->link);
295 			rc = tipc_link_timeout(le->link, &xmitq);
296 		}
297 		tipc_node_unlock(n);
298 		tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
299 		if (rc & TIPC_LINK_DOWN_EVT)
300 			tipc_node_link_down(n, bearer_id, false);
301 	}
302 	if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
303 		tipc_node_get(n);
304 	tipc_node_put(n);
305 }
306 
307 /**
308  * __tipc_node_link_up - handle addition of link
309  * Node lock must be held by caller
310  * Link becomes active (alone or shared) or standby, depending on its priority.
311  */
312 static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
313 				struct sk_buff_head *xmitq)
314 {
315 	int *slot0 = &n->active_links[0];
316 	int *slot1 = &n->active_links[1];
317 	struct tipc_link *ol = node_active_link(n, 0);
318 	struct tipc_link *nl = n->links[bearer_id].link;
319 
320 	if (!nl || !tipc_link_is_up(nl))
321 		return;
322 
323 	n->working_links++;
324 	n->action_flags |= TIPC_NOTIFY_LINK_UP;
325 	n->link_id = nl->peer_bearer_id << 16 | bearer_id;
326 
327 	/* Leave room for tunnel header when returning 'mtu' to users: */
328 	n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
329 
330 	tipc_bearer_add_dest(n->net, bearer_id, n->addr);
331 
332 	pr_debug("Established link <%s> on network plane %c\n",
333 		 nl->name, nl->net_plane);
334 
335 	/* First link? => give it both slots */
336 	if (!ol) {
337 		*slot0 = bearer_id;
338 		*slot1 = bearer_id;
339 		tipc_link_build_bcast_sync_msg(nl, xmitq);
340 		node_established_contact(n);
341 		return;
342 	}
343 
344 	/* Second link => redistribute slots */
345 	if (nl->priority > ol->priority) {
346 		pr_debug("Old link <%s> becomes standby\n", ol->name);
347 		*slot0 = bearer_id;
348 		*slot1 = bearer_id;
349 	} else if (nl->priority == ol->priority) {
350 		*slot0 = bearer_id;
351 	} else {
352 		pr_debug("New link <%s> is standby\n", nl->name);
353 	}
354 
355 	/* Prepare synchronization with first link */
356 	tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq);
357 }
358 
359 /**
360  * tipc_node_link_up - handle addition of link
361  *
362  * Link becomes active (alone or shared) or standby, depending on its priority.
363  */
364 static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
365 			      struct sk_buff_head *xmitq)
366 {
367 	tipc_node_lock(n);
368 	__tipc_node_link_up(n, bearer_id, xmitq);
369 	tipc_node_unlock(n);
370 }
371 
372 /**
373  * __tipc_node_link_down - handle loss of link
374  */
375 static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
376 				  struct sk_buff_head *xmitq,
377 				  struct tipc_media_addr **maddr)
378 {
379 	struct tipc_link_entry *le = &n->links[*bearer_id];
380 	int *slot0 = &n->active_links[0];
381 	int *slot1 = &n->active_links[1];
382 	int i, highest = 0;
383 	struct tipc_link *l, *_l, *tnl;
384 
385 	l = n->links[*bearer_id].link;
386 	if (!l || tipc_link_is_reset(l))
387 		return;
388 
389 	n->working_links--;
390 	n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
391 	n->link_id = l->peer_bearer_id << 16 | *bearer_id;
392 
393 	tipc_bearer_remove_dest(n->net, *bearer_id, n->addr);
394 
395 	pr_debug("Lost link <%s> on network plane %c\n",
396 		 l->name, l->net_plane);
397 
398 	/* Select new active link if any available */
399 	*slot0 = INVALID_BEARER_ID;
400 	*slot1 = INVALID_BEARER_ID;
401 	for (i = 0; i < MAX_BEARERS; i++) {
402 		_l = n->links[i].link;
403 		if (!_l || !tipc_link_is_up(_l))
404 			continue;
405 		if (_l == l)
406 			continue;
407 		if (_l->priority < highest)
408 			continue;
409 		if (_l->priority > highest) {
410 			highest = _l->priority;
411 			*slot0 = i;
412 			*slot1 = i;
413 			continue;
414 		}
415 		*slot1 = i;
416 	}
417 
418 	if (!tipc_node_is_up(n)) {
419 		tipc_link_reset(l);
420 		node_lost_contact(n, &le->inputq);
421 		return;
422 	}
423 
424 	/* There is still a working link => initiate failover */
425 	tnl = node_active_link(n, 0);
426 	tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT);
427 	tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
428 	n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1);
429 	tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq);
430 	tipc_link_reset(l);
431 	tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
432 	tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
433 	*maddr = &n->links[tnl->bearer_id].maddr;
434 	*bearer_id = tnl->bearer_id;
435 }
436 
437 static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
438 {
439 	struct tipc_link_entry *le = &n->links[bearer_id];
440 	struct tipc_media_addr *maddr;
441 	struct sk_buff_head xmitq;
442 
443 	__skb_queue_head_init(&xmitq);
444 
445 	tipc_node_lock(n);
446 	__tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
447 	if (delete && le->link) {
448 		kfree(le->link);
449 		le->link = NULL;
450 		n->link_cnt--;
451 	}
452 	tipc_node_unlock(n);
453 
454 	tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
455 	tipc_sk_rcv(n->net, &le->inputq);
456 }
457 
458 bool tipc_node_is_up(struct tipc_node *n)
459 {
460 	return n->active_links[0] != INVALID_BEARER_ID;
461 }
462 
463 void tipc_node_check_dest(struct net *net, u32 onode,
464 			  struct tipc_bearer *b,
465 			  u16 capabilities, u32 signature,
466 			  struct tipc_media_addr *maddr,
467 			  bool *respond, bool *dupl_addr)
468 {
469 	struct tipc_node *n;
470 	struct tipc_link *l;
471 	struct tipc_link_entry *le;
472 	bool addr_match = false;
473 	bool sign_match = false;
474 	bool link_up = false;
475 	bool accept_addr = false;
476 	bool reset = true;
477 
478 	*dupl_addr = false;
479 	*respond = false;
480 
481 	n = tipc_node_create(net, onode, capabilities);
482 	if (!n)
483 		return;
484 
485 	tipc_node_lock(n);
486 
487 	le = &n->links[b->identity];
488 
489 	/* Prepare to validate requesting node's signature and media address */
490 	l = le->link;
491 	link_up = l && tipc_link_is_up(l);
492 	addr_match = l && !memcmp(&le->maddr, maddr, sizeof(*maddr));
493 	sign_match = (signature == n->signature);
494 
495 	/* These three flags give us eight permutations: */
496 
497 	if (sign_match && addr_match && link_up) {
498 		/* All is fine. Do nothing. */
499 		reset = false;
500 	} else if (sign_match && addr_match && !link_up) {
501 		/* Respond. The link will come up in due time */
502 		*respond = true;
503 	} else if (sign_match && !addr_match && link_up) {
504 		/* Peer has changed i/f address without rebooting.
505 		 * If so, the link will reset soon, and the next
506 		 * discovery will be accepted. So we can ignore it.
507 		 * It may also be an cloned or malicious peer having
508 		 * chosen the same node address and signature as an
509 		 * existing one.
510 		 * Ignore requests until the link goes down, if ever.
511 		 */
512 		*dupl_addr = true;
513 	} else if (sign_match && !addr_match && !link_up) {
514 		/* Peer link has changed i/f address without rebooting.
515 		 * It may also be a cloned or malicious peer; we can't
516 		 * distinguish between the two.
517 		 * The signature is correct, so we must accept.
518 		 */
519 		accept_addr = true;
520 		*respond = true;
521 	} else if (!sign_match && addr_match && link_up) {
522 		/* Peer node rebooted. Two possibilities:
523 		 *  - Delayed re-discovery; this link endpoint has already
524 		 *    reset and re-established contact with the peer, before
525 		 *    receiving a discovery message from that node.
526 		 *    (The peer happened to receive one from this node first).
527 		 *  - The peer came back so fast that our side has not
528 		 *    discovered it yet. Probing from this side will soon
529 		 *    reset the link, since there can be no working link
530 		 *    endpoint at the peer end, and the link will re-establish.
531 		 *  Accept the signature, since it comes from a known peer.
532 		 */
533 		n->signature = signature;
534 	} else if (!sign_match && addr_match && !link_up) {
535 		/*  The peer node has rebooted.
536 		 *  Accept signature, since it is a known peer.
537 		 */
538 		n->signature = signature;
539 		*respond = true;
540 	} else if (!sign_match && !addr_match && link_up) {
541 		/* Peer rebooted with new address, or a new/duplicate peer.
542 		 * Ignore until the link goes down, if ever.
543 		 */
544 		*dupl_addr = true;
545 	} else if (!sign_match && !addr_match && !link_up) {
546 		/* Peer rebooted with new address, or it is a new peer.
547 		 * Accept signature and address.
548 		 */
549 		n->signature = signature;
550 		accept_addr = true;
551 		*respond = true;
552 	}
553 
554 	if (!accept_addr)
555 		goto exit;
556 
557 	/* Now create new link if not already existing */
558 	if (!l) {
559 		if (n->link_cnt == 2) {
560 			pr_warn("Cannot establish 3rd link to %x\n", n->addr);
561 			goto exit;
562 		}
563 		if (!tipc_link_create(n, b, mod(tipc_net(net)->random),
564 				      tipc_own_addr(net), onode, &le->maddr,
565 				      &le->inputq, &n->bclink.namedq, &l)) {
566 			*respond = false;
567 			goto exit;
568 		}
569 		tipc_link_reset(l);
570 		if (n->state == NODE_FAILINGOVER)
571 			tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
572 		le->link = l;
573 		n->link_cnt++;
574 		tipc_node_calculate_timer(n, l);
575 		if (n->link_cnt == 1)
576 			if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
577 				tipc_node_get(n);
578 	}
579 	memcpy(&le->maddr, maddr, sizeof(*maddr));
580 exit:
581 	tipc_node_unlock(n);
582 	if (reset)
583 		tipc_node_link_down(n, b->identity, false);
584 	tipc_node_put(n);
585 }
586 
587 void tipc_node_delete_links(struct net *net, int bearer_id)
588 {
589 	struct tipc_net *tn = net_generic(net, tipc_net_id);
590 	struct tipc_node *n;
591 
592 	rcu_read_lock();
593 	list_for_each_entry_rcu(n, &tn->node_list, list) {
594 		tipc_node_link_down(n, bearer_id, true);
595 	}
596 	rcu_read_unlock();
597 }
598 
599 static void tipc_node_reset_links(struct tipc_node *n)
600 {
601 	char addr_string[16];
602 	int i;
603 
604 	pr_warn("Resetting all links to %s\n",
605 		tipc_addr_string_fill(addr_string, n->addr));
606 
607 	for (i = 0; i < MAX_BEARERS; i++) {
608 		tipc_node_link_down(n, i, false);
609 	}
610 }
611 
612 /* tipc_node_fsm_evt - node finite state machine
613  * Determines when contact is allowed with peer node
614  */
615 static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
616 {
617 	int state = n->state;
618 
619 	switch (state) {
620 	case SELF_DOWN_PEER_DOWN:
621 		switch (evt) {
622 		case SELF_ESTABL_CONTACT_EVT:
623 			state = SELF_UP_PEER_COMING;
624 			break;
625 		case PEER_ESTABL_CONTACT_EVT:
626 			state = SELF_COMING_PEER_UP;
627 			break;
628 		case SELF_LOST_CONTACT_EVT:
629 		case PEER_LOST_CONTACT_EVT:
630 			break;
631 		case NODE_SYNCH_END_EVT:
632 		case NODE_SYNCH_BEGIN_EVT:
633 		case NODE_FAILOVER_BEGIN_EVT:
634 		case NODE_FAILOVER_END_EVT:
635 		default:
636 			goto illegal_evt;
637 		}
638 		break;
639 	case SELF_UP_PEER_UP:
640 		switch (evt) {
641 		case SELF_LOST_CONTACT_EVT:
642 			state = SELF_DOWN_PEER_LEAVING;
643 			break;
644 		case PEER_LOST_CONTACT_EVT:
645 			state = SELF_LEAVING_PEER_DOWN;
646 			break;
647 		case NODE_SYNCH_BEGIN_EVT:
648 			state = NODE_SYNCHING;
649 			break;
650 		case NODE_FAILOVER_BEGIN_EVT:
651 			state = NODE_FAILINGOVER;
652 			break;
653 		case SELF_ESTABL_CONTACT_EVT:
654 		case PEER_ESTABL_CONTACT_EVT:
655 		case NODE_SYNCH_END_EVT:
656 		case NODE_FAILOVER_END_EVT:
657 			break;
658 		default:
659 			goto illegal_evt;
660 		}
661 		break;
662 	case SELF_DOWN_PEER_LEAVING:
663 		switch (evt) {
664 		case PEER_LOST_CONTACT_EVT:
665 			state = SELF_DOWN_PEER_DOWN;
666 			break;
667 		case SELF_ESTABL_CONTACT_EVT:
668 		case PEER_ESTABL_CONTACT_EVT:
669 		case SELF_LOST_CONTACT_EVT:
670 			break;
671 		case NODE_SYNCH_END_EVT:
672 		case NODE_SYNCH_BEGIN_EVT:
673 		case NODE_FAILOVER_BEGIN_EVT:
674 		case NODE_FAILOVER_END_EVT:
675 		default:
676 			goto illegal_evt;
677 		}
678 		break;
679 	case SELF_UP_PEER_COMING:
680 		switch (evt) {
681 		case PEER_ESTABL_CONTACT_EVT:
682 			state = SELF_UP_PEER_UP;
683 			break;
684 		case SELF_LOST_CONTACT_EVT:
685 			state = SELF_DOWN_PEER_LEAVING;
686 			break;
687 		case SELF_ESTABL_CONTACT_EVT:
688 		case PEER_LOST_CONTACT_EVT:
689 			break;
690 		case NODE_SYNCH_END_EVT:
691 		case NODE_SYNCH_BEGIN_EVT:
692 		case NODE_FAILOVER_BEGIN_EVT:
693 		case NODE_FAILOVER_END_EVT:
694 		default:
695 			goto illegal_evt;
696 		}
697 		break;
698 	case SELF_COMING_PEER_UP:
699 		switch (evt) {
700 		case SELF_ESTABL_CONTACT_EVT:
701 			state = SELF_UP_PEER_UP;
702 			break;
703 		case PEER_LOST_CONTACT_EVT:
704 			state = SELF_LEAVING_PEER_DOWN;
705 			break;
706 		case SELF_LOST_CONTACT_EVT:
707 		case PEER_ESTABL_CONTACT_EVT:
708 			break;
709 		case NODE_SYNCH_END_EVT:
710 		case NODE_SYNCH_BEGIN_EVT:
711 		case NODE_FAILOVER_BEGIN_EVT:
712 		case NODE_FAILOVER_END_EVT:
713 		default:
714 			goto illegal_evt;
715 		}
716 		break;
717 	case SELF_LEAVING_PEER_DOWN:
718 		switch (evt) {
719 		case SELF_LOST_CONTACT_EVT:
720 			state = SELF_DOWN_PEER_DOWN;
721 			break;
722 		case SELF_ESTABL_CONTACT_EVT:
723 		case PEER_ESTABL_CONTACT_EVT:
724 		case PEER_LOST_CONTACT_EVT:
725 			break;
726 		case NODE_SYNCH_END_EVT:
727 		case NODE_SYNCH_BEGIN_EVT:
728 		case NODE_FAILOVER_BEGIN_EVT:
729 		case NODE_FAILOVER_END_EVT:
730 		default:
731 			goto illegal_evt;
732 		}
733 		break;
734 	case NODE_FAILINGOVER:
735 		switch (evt) {
736 		case SELF_LOST_CONTACT_EVT:
737 			state = SELF_DOWN_PEER_LEAVING;
738 			break;
739 		case PEER_LOST_CONTACT_EVT:
740 			state = SELF_LEAVING_PEER_DOWN;
741 			break;
742 		case NODE_FAILOVER_END_EVT:
743 			state = SELF_UP_PEER_UP;
744 			break;
745 		case NODE_FAILOVER_BEGIN_EVT:
746 		case SELF_ESTABL_CONTACT_EVT:
747 		case PEER_ESTABL_CONTACT_EVT:
748 			break;
749 		case NODE_SYNCH_BEGIN_EVT:
750 		case NODE_SYNCH_END_EVT:
751 		default:
752 			goto illegal_evt;
753 		}
754 		break;
755 	case NODE_SYNCHING:
756 		switch (evt) {
757 		case SELF_LOST_CONTACT_EVT:
758 			state = SELF_DOWN_PEER_LEAVING;
759 			break;
760 		case PEER_LOST_CONTACT_EVT:
761 			state = SELF_LEAVING_PEER_DOWN;
762 			break;
763 		case NODE_SYNCH_END_EVT:
764 			state = SELF_UP_PEER_UP;
765 			break;
766 		case NODE_FAILOVER_BEGIN_EVT:
767 			state = NODE_FAILINGOVER;
768 			break;
769 		case NODE_SYNCH_BEGIN_EVT:
770 		case SELF_ESTABL_CONTACT_EVT:
771 		case PEER_ESTABL_CONTACT_EVT:
772 			break;
773 		case NODE_FAILOVER_END_EVT:
774 		default:
775 			goto illegal_evt;
776 		}
777 		break;
778 	default:
779 		pr_err("Unknown node fsm state %x\n", state);
780 		break;
781 	}
782 	n->state = state;
783 	return;
784 
785 illegal_evt:
786 	pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
787 }
788 
789 bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
790 {
791 	int state = n->state;
792 
793 	if (likely(state == SELF_UP_PEER_UP))
794 		return true;
795 
796 	if (state == SELF_LEAVING_PEER_DOWN)
797 		return false;
798 
799 	if (state == SELF_DOWN_PEER_LEAVING) {
800 		if (msg_peer_node_is_up(hdr))
801 			return false;
802 	}
803 
804 	return true;
805 }
806 
807 static void node_established_contact(struct tipc_node *n_ptr)
808 {
809 	tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
810 	n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
811 	n_ptr->bclink.oos_state = 0;
812 	n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
813 	tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
814 }
815 
816 static void node_lost_contact(struct tipc_node *n_ptr,
817 			      struct sk_buff_head *inputq)
818 {
819 	char addr_string[16];
820 	struct tipc_sock_conn *conn, *safe;
821 	struct tipc_link *l;
822 	struct list_head *conns = &n_ptr->conn_sks;
823 	struct sk_buff *skb;
824 	struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
825 	uint i;
826 
827 	pr_debug("Lost contact with %s\n",
828 		 tipc_addr_string_fill(addr_string, n_ptr->addr));
829 
830 	/* Flush broadcast link info associated with lost node */
831 	if (n_ptr->bclink.recv_permitted) {
832 		__skb_queue_purge(&n_ptr->bclink.deferdq);
833 
834 		if (n_ptr->bclink.reasm_buf) {
835 			kfree_skb(n_ptr->bclink.reasm_buf);
836 			n_ptr->bclink.reasm_buf = NULL;
837 		}
838 
839 		tipc_bclink_remove_node(n_ptr->net, n_ptr->addr);
840 		tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
841 
842 		n_ptr->bclink.recv_permitted = false;
843 	}
844 
845 	/* Abort any ongoing link failover */
846 	for (i = 0; i < MAX_BEARERS; i++) {
847 		l = n_ptr->links[i].link;
848 		if (l)
849 			tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
850 	}
851 
852 	/* Prevent re-contact with node until cleanup is done */
853 	tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT);
854 
855 	/* Notify publications from this node */
856 	n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN;
857 
858 	/* Notify sockets connected to node */
859 	list_for_each_entry_safe(conn, safe, conns, list) {
860 		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
861 				      SHORT_H_SIZE, 0, tn->own_addr,
862 				      conn->peer_node, conn->port,
863 				      conn->peer_port, TIPC_ERR_NO_NODE);
864 		if (likely(skb))
865 			skb_queue_tail(inputq, skb);
866 		list_del(&conn->list);
867 		kfree(conn);
868 	}
869 }
870 
871 /**
872  * tipc_node_get_linkname - get the name of a link
873  *
874  * @bearer_id: id of the bearer
875  * @node: peer node address
876  * @linkname: link name output buffer
877  *
878  * Returns 0 on success
879  */
880 int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
881 			   char *linkname, size_t len)
882 {
883 	struct tipc_link *link;
884 	int err = -EINVAL;
885 	struct tipc_node *node = tipc_node_find(net, addr);
886 
887 	if (!node)
888 		return err;
889 
890 	if (bearer_id >= MAX_BEARERS)
891 		goto exit;
892 
893 	tipc_node_lock(node);
894 	link = node->links[bearer_id].link;
895 	if (link) {
896 		strncpy(linkname, link->name, len);
897 		err = 0;
898 	}
899 exit:
900 	tipc_node_unlock(node);
901 	tipc_node_put(node);
902 	return err;
903 }
904 
905 void tipc_node_unlock(struct tipc_node *node)
906 {
907 	struct net *net = node->net;
908 	u32 addr = 0;
909 	u32 flags = node->action_flags;
910 	u32 link_id = 0;
911 	struct list_head *publ_list;
912 
913 	if (likely(!flags)) {
914 		spin_unlock_bh(&node->lock);
915 		return;
916 	}
917 
918 	addr = node->addr;
919 	link_id = node->link_id;
920 	publ_list = &node->publ_list;
921 
922 	node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
923 				TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
924 				TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
925 				TIPC_BCAST_RESET);
926 
927 	spin_unlock_bh(&node->lock);
928 
929 	if (flags & TIPC_NOTIFY_NODE_DOWN)
930 		tipc_publ_notify(net, publ_list, addr);
931 
932 	if (flags & TIPC_WAKEUP_BCAST_USERS)
933 		tipc_bclink_wakeup_users(net);
934 
935 	if (flags & TIPC_NOTIFY_NODE_UP)
936 		tipc_named_node_up(net, addr);
937 
938 	if (flags & TIPC_NOTIFY_LINK_UP)
939 		tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
940 				     TIPC_NODE_SCOPE, link_id, addr);
941 
942 	if (flags & TIPC_NOTIFY_LINK_DOWN)
943 		tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
944 				      link_id, addr);
945 
946 	if (flags & TIPC_BCAST_MSG_EVT)
947 		tipc_bclink_input(net);
948 
949 	if (flags & TIPC_BCAST_RESET)
950 		tipc_node_reset_links(node);
951 }
952 
953 /* Caller should hold node lock for the passed node */
954 static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
955 {
956 	void *hdr;
957 	struct nlattr *attrs;
958 
959 	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
960 			  NLM_F_MULTI, TIPC_NL_NODE_GET);
961 	if (!hdr)
962 		return -EMSGSIZE;
963 
964 	attrs = nla_nest_start(msg->skb, TIPC_NLA_NODE);
965 	if (!attrs)
966 		goto msg_full;
967 
968 	if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr))
969 		goto attr_msg_full;
970 	if (tipc_node_is_up(node))
971 		if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP))
972 			goto attr_msg_full;
973 
974 	nla_nest_end(msg->skb, attrs);
975 	genlmsg_end(msg->skb, hdr);
976 
977 	return 0;
978 
979 attr_msg_full:
980 	nla_nest_cancel(msg->skb, attrs);
981 msg_full:
982 	genlmsg_cancel(msg->skb, hdr);
983 
984 	return -EMSGSIZE;
985 }
986 
987 static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
988 					       int *bearer_id,
989 					       struct tipc_media_addr **maddr)
990 {
991 	int id = n->active_links[sel & 1];
992 
993 	if (unlikely(id < 0))
994 		return NULL;
995 
996 	*bearer_id = id;
997 	*maddr = &n->links[id].maddr;
998 	return n->links[id].link;
999 }
1000 
1001 /**
1002  * tipc_node_xmit() is the general link level function for message sending
1003  * @net: the applicable net namespace
1004  * @list: chain of buffers containing message
1005  * @dnode: address of destination node
1006  * @selector: a number used for deterministic link selection
1007  * Consumes the buffer chain, except when returning -ELINKCONG
1008  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
1009  */
1010 int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1011 		   u32 dnode, int selector)
1012 {
1013 	struct tipc_link *l = NULL;
1014 	struct tipc_node *n;
1015 	struct sk_buff_head xmitq;
1016 	struct tipc_media_addr *maddr;
1017 	int bearer_id;
1018 	int rc = -EHOSTUNREACH;
1019 
1020 	__skb_queue_head_init(&xmitq);
1021 	n = tipc_node_find(net, dnode);
1022 	if (likely(n)) {
1023 		tipc_node_lock(n);
1024 		l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
1025 		if (likely(l))
1026 			rc = tipc_link_xmit(l, list, &xmitq);
1027 		tipc_node_unlock(n);
1028 		if (unlikely(rc == -ENOBUFS))
1029 			tipc_node_link_down(n, bearer_id, false);
1030 		tipc_node_put(n);
1031 	}
1032 	if (likely(!rc)) {
1033 		tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
1034 		return 0;
1035 	}
1036 	if (likely(in_own_node(net, dnode))) {
1037 		tipc_sk_rcv(net, list);
1038 		return 0;
1039 	}
1040 	return rc;
1041 }
1042 
1043 /* tipc_node_xmit_skb(): send single buffer to destination
1044  * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
1045  * messages, which will not be rejected
1046  * The only exception is datagram messages rerouted after secondary
1047  * lookup, which are rare and safe to dispose of anyway.
1048  * TODO: Return real return value, and let callers use
1049  * tipc_wait_for_sendpkt() where applicable
1050  */
1051 int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1052 		       u32 selector)
1053 {
1054 	struct sk_buff_head head;
1055 	int rc;
1056 
1057 	skb_queue_head_init(&head);
1058 	__skb_queue_tail(&head, skb);
1059 	rc = tipc_node_xmit(net, &head, dnode, selector);
1060 	if (rc == -ELINKCONG)
1061 		kfree_skb(skb);
1062 	return 0;
1063 }
1064 
1065 /**
1066  * tipc_node_check_state - check and if necessary update node state
1067  * @skb: TIPC packet
1068  * @bearer_id: identity of bearer delivering the packet
1069  * Returns true if state is ok, otherwise consumes buffer and returns false
1070  */
1071 static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1072 				  int bearer_id, struct sk_buff_head *xmitq)
1073 {
1074 	struct tipc_msg *hdr = buf_msg(skb);
1075 	int usr = msg_user(hdr);
1076 	int mtyp = msg_type(hdr);
1077 	u16 oseqno = msg_seqno(hdr);
1078 	u16 iseqno = msg_seqno(msg_get_wrapped(hdr));
1079 	u16 exp_pkts = msg_msgcnt(hdr);
1080 	u16 rcv_nxt, syncpt, dlv_nxt;
1081 	int state = n->state;
1082 	struct tipc_link *l, *tnl, *pl = NULL;
1083 	struct tipc_media_addr *maddr;
1084 	int i, pb_id;
1085 
1086 	l = n->links[bearer_id].link;
1087 	if (!l)
1088 		return false;
1089 	rcv_nxt = l->rcv_nxt;
1090 
1091 
1092 	if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL)))
1093 		return true;
1094 
1095 	/* Find parallel link, if any */
1096 	for (i = 0; i < MAX_BEARERS; i++) {
1097 		if ((i != bearer_id) && n->links[i].link) {
1098 			pl = n->links[i].link;
1099 			break;
1100 		}
1101 	}
1102 
1103 	/* Update node accesibility if applicable */
1104 	if (state == SELF_UP_PEER_COMING) {
1105 		if (!tipc_link_is_up(l))
1106 			return true;
1107 		if (!msg_peer_link_is_up(hdr))
1108 			return true;
1109 		tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
1110 	}
1111 
1112 	if (state == SELF_DOWN_PEER_LEAVING) {
1113 		if (msg_peer_node_is_up(hdr))
1114 			return false;
1115 		tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
1116 	}
1117 
1118 	/* Ignore duplicate packets */
1119 	if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
1120 		return true;
1121 
1122 	/* Initiate or update failover mode if applicable */
1123 	if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
1124 		syncpt = oseqno + exp_pkts - 1;
1125 		if (pl && tipc_link_is_up(pl)) {
1126 			pb_id = pl->bearer_id;
1127 			__tipc_node_link_down(n, &pb_id, xmitq, &maddr);
1128 			tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq);
1129 		}
1130 		/* If pkts arrive out of order, use lowest calculated syncpt */
1131 		if (less(syncpt, n->sync_point))
1132 			n->sync_point = syncpt;
1133 	}
1134 
1135 	/* Open parallel link when tunnel link reaches synch point */
1136 	if ((n->state == NODE_FAILINGOVER) && tipc_link_is_up(l)) {
1137 		if (!more(rcv_nxt, n->sync_point))
1138 			return true;
1139 		tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT);
1140 		if (pl)
1141 			tipc_link_fsm_evt(pl, LINK_FAILOVER_END_EVT);
1142 		return true;
1143 	}
1144 
1145 	/* No synching needed if only one link */
1146 	if (!pl || !tipc_link_is_up(pl))
1147 		return true;
1148 
1149 	/* Initiate synch mode if applicable */
1150 	if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) {
1151 		syncpt = iseqno + exp_pkts - 1;
1152 		if (!tipc_link_is_up(l)) {
1153 			tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
1154 			__tipc_node_link_up(n, bearer_id, xmitq);
1155 		}
1156 		if (n->state == SELF_UP_PEER_UP) {
1157 			n->sync_point = syncpt;
1158 			tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT);
1159 			tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT);
1160 		}
1161 		if (less(syncpt, n->sync_point))
1162 			n->sync_point = syncpt;
1163 	}
1164 
1165 	/* Open tunnel link when parallel link reaches synch point */
1166 	if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) {
1167 		if (tipc_link_is_synching(l)) {
1168 			tnl = l;
1169 		} else {
1170 			tnl = pl;
1171 			pl = l;
1172 		}
1173 		dlv_nxt = pl->rcv_nxt - mod(skb_queue_len(pl->inputq));
1174 		if (more(dlv_nxt, n->sync_point)) {
1175 			tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT);
1176 			tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
1177 			return true;
1178 		}
1179 		if (l == pl)
1180 			return true;
1181 		if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG))
1182 			return true;
1183 		if (usr == LINK_PROTOCOL)
1184 			return true;
1185 		return false;
1186 	}
1187 	return true;
1188 }
1189 
1190 /**
1191  * tipc_rcv - process TIPC packets/messages arriving from off-node
1192  * @net: the applicable net namespace
1193  * @skb: TIPC packet
1194  * @bearer: pointer to bearer message arrived on
1195  *
1196  * Invoked with no locks held. Bearer pointer must point to a valid bearer
1197  * structure (i.e. cannot be NULL), but bearer can be inactive.
1198  */
1199 void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1200 {
1201 	struct sk_buff_head xmitq;
1202 	struct tipc_node *n;
1203 	struct tipc_msg *hdr = buf_msg(skb);
1204 	int usr = msg_user(hdr);
1205 	int bearer_id = b->identity;
1206 	struct tipc_link_entry *le;
1207 	int rc = 0;
1208 
1209 	__skb_queue_head_init(&xmitq);
1210 
1211 	/* Ensure message is well-formed */
1212 	if (unlikely(!tipc_msg_validate(skb)))
1213 		goto discard;
1214 
1215 	/* Handle arrival of a non-unicast link packet */
1216 	if (unlikely(msg_non_seq(hdr))) {
1217 		if (usr ==  LINK_CONFIG)
1218 			tipc_disc_rcv(net, skb, b);
1219 		else
1220 			tipc_bclink_rcv(net, skb);
1221 		return;
1222 	}
1223 
1224 	/* Locate neighboring node that sent packet */
1225 	n = tipc_node_find(net, msg_prevnode(hdr));
1226 	if (unlikely(!n))
1227 		goto discard;
1228 	le = &n->links[bearer_id];
1229 
1230 	tipc_node_lock(n);
1231 
1232 	/* Is reception permitted at the moment ? */
1233 	if (!tipc_node_filter_pkt(n, hdr))
1234 		goto unlock;
1235 
1236 	if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1237 		tipc_bclink_sync_state(n, hdr);
1238 
1239 	/* Release acked broadcast packets */
1240 	if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
1241 		tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
1242 
1243 	/* Check and if necessary update node state */
1244 	if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1245 		rc = tipc_link_rcv(le->link, skb, &xmitq);
1246 		skb = NULL;
1247 	}
1248 unlock:
1249 	tipc_node_unlock(n);
1250 
1251 	if (unlikely(rc & TIPC_LINK_UP_EVT))
1252 		tipc_node_link_up(n, bearer_id, &xmitq);
1253 
1254 	if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1255 		tipc_node_link_down(n, bearer_id, false);
1256 
1257 	if (unlikely(!skb_queue_empty(&n->bclink.namedq)))
1258 		tipc_named_rcv(net, &n->bclink.namedq);
1259 
1260 	if (!skb_queue_empty(&le->inputq))
1261 		tipc_sk_rcv(net, &le->inputq);
1262 
1263 	if (!skb_queue_empty(&xmitq))
1264 		tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1265 
1266 	tipc_node_put(n);
1267 discard:
1268 	kfree_skb(skb);
1269 }
1270 
1271 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
1272 {
1273 	int err;
1274 	struct net *net = sock_net(skb->sk);
1275 	struct tipc_net *tn = net_generic(net, tipc_net_id);
1276 	int done = cb->args[0];
1277 	int last_addr = cb->args[1];
1278 	struct tipc_node *node;
1279 	struct tipc_nl_msg msg;
1280 
1281 	if (done)
1282 		return 0;
1283 
1284 	msg.skb = skb;
1285 	msg.portid = NETLINK_CB(cb->skb).portid;
1286 	msg.seq = cb->nlh->nlmsg_seq;
1287 
1288 	rcu_read_lock();
1289 	if (last_addr) {
1290 		node = tipc_node_find(net, last_addr);
1291 		if (!node) {
1292 			rcu_read_unlock();
1293 			/* We never set seq or call nl_dump_check_consistent()
1294 			 * this means that setting prev_seq here will cause the
1295 			 * consistence check to fail in the netlink callback
1296 			 * handler. Resulting in the NLMSG_DONE message having
1297 			 * the NLM_F_DUMP_INTR flag set if the node state
1298 			 * changed while we released the lock.
1299 			 */
1300 			cb->prev_seq = 1;
1301 			return -EPIPE;
1302 		}
1303 		tipc_node_put(node);
1304 	}
1305 
1306 	list_for_each_entry_rcu(node, &tn->node_list, list) {
1307 		if (last_addr) {
1308 			if (node->addr == last_addr)
1309 				last_addr = 0;
1310 			else
1311 				continue;
1312 		}
1313 
1314 		tipc_node_lock(node);
1315 		err = __tipc_nl_add_node(&msg, node);
1316 		if (err) {
1317 			last_addr = node->addr;
1318 			tipc_node_unlock(node);
1319 			goto out;
1320 		}
1321 
1322 		tipc_node_unlock(node);
1323 	}
1324 	done = 1;
1325 out:
1326 	cb->args[0] = done;
1327 	cb->args[1] = last_addr;
1328 	rcu_read_unlock();
1329 
1330 	return skb->len;
1331 }
1332