xref: /linux/net/tipc/name_distr.c (revision 32d7e03d26fd93187c87ed0fbf59ec7023a61404)
1 /*
2  * net/tipc/name_distr.c: TIPC name distribution code
3  *
4  * Copyright (c) 2000-2006, 2014-2019, Ericsson AB
5  * Copyright (c) 2005, 2010-2011, Wind River Systems
6  * Copyright (c) 2020-2021, Red Hat Inc
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in the
16  *    documentation and/or other materials provided with the distribution.
17  * 3. Neither the names of the copyright holders nor the names of its
18  *    contributors may be used to endorse or promote products derived from
19  *    this software without specific prior written permission.
20  *
21  * Alternatively, this software may be distributed under the terms of the
22  * GNU General Public License ("GPL") version 2 as published by the Free
23  * Software Foundation.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35  * POSSIBILITY OF SUCH DAMAGE.
36  */
37 
38 #include "core.h"
39 #include "link.h"
40 #include "name_distr.h"
41 
42 int sysctl_tipc_named_timeout __read_mostly = 2000;
43 
44 struct distr_queue_item {
45 	struct distr_item i;
46 	u32 dtype;
47 	u32 node;
48 	unsigned long expires;
49 	struct list_head next;
50 };
51 
52 /**
53  * publ_to_item - add publication info to a publication message
54  * @p: publication info
55  * @i: location of item in the message
56  */
57 static void publ_to_item(struct distr_item *i, struct publication *p)
58 {
59 	i->type = htonl(p->sr.type);
60 	i->lower = htonl(p->sr.lower);
61 	i->upper = htonl(p->sr.upper);
62 	i->port = htonl(p->sk.ref);
63 	i->key = htonl(p->key);
64 }
65 
66 /**
67  * named_prepare_buf - allocate & initialize a publication message
68  * @net: the associated network namespace
69  * @type: message type
70  * @size: payload size
71  * @dest: destination node
72  *
73  * The buffer returned is of size INT_H_SIZE + payload size
74  */
75 static struct sk_buff *named_prepare_buf(struct net *net, u32 type, u32 size,
76 					 u32 dest)
77 {
78 	struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE + size, GFP_ATOMIC);
79 	u32 self = tipc_own_addr(net);
80 	struct tipc_msg *msg;
81 
82 	if (buf != NULL) {
83 		msg = buf_msg(buf);
84 		tipc_msg_init(self, msg, NAME_DISTRIBUTOR,
85 			      type, INT_H_SIZE, dest);
86 		msg_set_size(msg, INT_H_SIZE + size);
87 	}
88 	return buf;
89 }
90 
91 /**
92  * tipc_named_publish - tell other nodes about a new publication by this node
93  * @net: the associated network namespace
94  * @p: the new publication
95  */
96 struct sk_buff *tipc_named_publish(struct net *net, struct publication *p)
97 {
98 	struct name_table *nt = tipc_name_table(net);
99 	struct distr_item *item;
100 	struct sk_buff *skb;
101 
102 	if (p->scope == TIPC_NODE_SCOPE) {
103 		list_add_tail_rcu(&p->binding_node, &nt->node_scope);
104 		return NULL;
105 	}
106 	write_lock_bh(&nt->cluster_scope_lock);
107 	list_add_tail(&p->binding_node, &nt->cluster_scope);
108 	write_unlock_bh(&nt->cluster_scope_lock);
109 	skb = named_prepare_buf(net, PUBLICATION, ITEM_SIZE, 0);
110 	if (!skb) {
111 		pr_warn("Publication distribution failure\n");
112 		return NULL;
113 	}
114 	msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
115 	msg_set_non_legacy(buf_msg(skb));
116 	item = (struct distr_item *)msg_data(buf_msg(skb));
117 	publ_to_item(item, p);
118 	return skb;
119 }
120 
121 /**
122  * tipc_named_withdraw - tell other nodes about a withdrawn publication by this node
123  * @net: the associated network namespace
124  * @p: the withdrawn publication
125  */
126 struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *p)
127 {
128 	struct name_table *nt = tipc_name_table(net);
129 	struct distr_item *item;
130 	struct sk_buff *skb;
131 
132 	write_lock_bh(&nt->cluster_scope_lock);
133 	list_del(&p->binding_node);
134 	write_unlock_bh(&nt->cluster_scope_lock);
135 	if (p->scope == TIPC_NODE_SCOPE)
136 		return NULL;
137 
138 	skb = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
139 	if (!skb) {
140 		pr_warn("Withdrawal distribution failure\n");
141 		return NULL;
142 	}
143 	msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
144 	msg_set_non_legacy(buf_msg(skb));
145 	item = (struct distr_item *)msg_data(buf_msg(skb));
146 	publ_to_item(item, p);
147 	return skb;
148 }
149 
150 /**
151  * named_distribute - prepare name info for bulk distribution to another node
152  * @net: the associated network namespace
153  * @list: list of messages (buffers) to be returned from this function
154  * @dnode: node to be updated
155  * @pls: linked list of publication items to be packed into buffer chain
156  * @seqno: sequence number for this message
157  */
158 static void named_distribute(struct net *net, struct sk_buff_head *list,
159 			     u32 dnode, struct list_head *pls, u16 seqno)
160 {
161 	struct publication *publ;
162 	struct sk_buff *skb = NULL;
163 	struct distr_item *item = NULL;
164 	u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0, false) - INT_H_SIZE) /
165 			ITEM_SIZE) * ITEM_SIZE;
166 	u32 msg_rem = msg_dsz;
167 	struct tipc_msg *hdr;
168 
169 	list_for_each_entry(publ, pls, binding_node) {
170 		/* Prepare next buffer: */
171 		if (!skb) {
172 			skb = named_prepare_buf(net, PUBLICATION, msg_rem,
173 						dnode);
174 			if (!skb) {
175 				pr_warn("Bulk publication failure\n");
176 				return;
177 			}
178 			hdr = buf_msg(skb);
179 			msg_set_bc_ack_invalid(hdr, true);
180 			msg_set_bulk(hdr);
181 			msg_set_non_legacy(hdr);
182 			item = (struct distr_item *)msg_data(hdr);
183 		}
184 
185 		/* Pack publication into message: */
186 		publ_to_item(item, publ);
187 		item++;
188 		msg_rem -= ITEM_SIZE;
189 
190 		/* Append full buffer to list: */
191 		if (!msg_rem) {
192 			__skb_queue_tail(list, skb);
193 			skb = NULL;
194 			msg_rem = msg_dsz;
195 		}
196 	}
197 	if (skb) {
198 		hdr = buf_msg(skb);
199 		msg_set_size(hdr, INT_H_SIZE + (msg_dsz - msg_rem));
200 		skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
201 		__skb_queue_tail(list, skb);
202 	}
203 	hdr = buf_msg(skb_peek_tail(list));
204 	msg_set_last_bulk(hdr);
205 	msg_set_named_seqno(hdr, seqno);
206 }
207 
208 /**
209  * tipc_named_node_up - tell specified node about all publications by this node
210  * @net: the associated network namespace
211  * @dnode: destination node
212  * @capabilities: peer node's capabilities
213  */
214 void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities)
215 {
216 	struct name_table *nt = tipc_name_table(net);
217 	struct tipc_net *tn = tipc_net(net);
218 	struct sk_buff_head head;
219 	u16 seqno;
220 
221 	__skb_queue_head_init(&head);
222 	spin_lock_bh(&tn->nametbl_lock);
223 	if (!(capabilities & TIPC_NAMED_BCAST))
224 		nt->rc_dests++;
225 	seqno = nt->snd_nxt;
226 	spin_unlock_bh(&tn->nametbl_lock);
227 
228 	read_lock_bh(&nt->cluster_scope_lock);
229 	named_distribute(net, &head, dnode, &nt->cluster_scope, seqno);
230 	tipc_node_xmit(net, &head, dnode, 0);
231 	read_unlock_bh(&nt->cluster_scope_lock);
232 }
233 
234 /**
235  * tipc_publ_purge - remove publication associated with a failed node
236  * @net: the associated network namespace
237  * @p: the publication to remove
238  * @addr: failed node's address
239  *
240  * Invoked for each publication issued by a newly failed node.
241  * Removes publication structure from name table & deletes it.
242  */
243 static void tipc_publ_purge(struct net *net, struct publication *p, u32 addr)
244 {
245 	struct tipc_net *tn = tipc_net(net);
246 	struct publication *_p;
247 	struct tipc_uaddr ua;
248 
249 	tipc_uaddr(&ua, TIPC_SERVICE_RANGE, p->scope, p->sr.type,
250 		   p->sr.lower, p->sr.upper);
251 	spin_lock_bh(&tn->nametbl_lock);
252 	_p = tipc_nametbl_remove_publ(net, &ua, &p->sk, p->key);
253 	if (_p)
254 		tipc_node_unsubscribe(net, &_p->binding_node, addr);
255 	spin_unlock_bh(&tn->nametbl_lock);
256 	if (_p)
257 		kfree_rcu(_p, rcu);
258 }
259 
260 void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
261 		      u32 addr, u16 capabilities)
262 {
263 	struct name_table *nt = tipc_name_table(net);
264 	struct tipc_net *tn = tipc_net(net);
265 
266 	struct publication *publ, *tmp;
267 
268 	list_for_each_entry_safe(publ, tmp, nsub_list, binding_node)
269 		tipc_publ_purge(net, publ, addr);
270 	spin_lock_bh(&tn->nametbl_lock);
271 	if (!(capabilities & TIPC_NAMED_BCAST))
272 		nt->rc_dests--;
273 	spin_unlock_bh(&tn->nametbl_lock);
274 }
275 
276 /**
277  * tipc_update_nametbl - try to process a nametable update and notify
278  *			 subscribers
279  * @net: the associated network namespace
280  * @i: location of item in the message
281  * @node: node address
282  * @dtype: name distributor message type
283  *
284  * tipc_nametbl_lock must be held.
285  * Return: the publication item if successful, otherwise NULL.
286  */
287 static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
288 				u32 node, u32 dtype)
289 {
290 	struct publication *p = NULL;
291 	struct tipc_socket_addr sk;
292 	struct tipc_uaddr ua;
293 	u32 key = ntohl(i->key);
294 
295 	tipc_uaddr(&ua, TIPC_SERVICE_RANGE, TIPC_CLUSTER_SCOPE,
296 		   ntohl(i->type), ntohl(i->lower), ntohl(i->upper));
297 	sk.ref = ntohl(i->port);
298 	sk.node = node;
299 
300 	if (dtype == PUBLICATION) {
301 		p = tipc_nametbl_insert_publ(net, &ua, &sk, key);
302 		if (p) {
303 			tipc_node_subscribe(net, &p->binding_node, node);
304 			return true;
305 		}
306 	} else if (dtype == WITHDRAWAL) {
307 		p = tipc_nametbl_remove_publ(net, &ua, &sk, key);
308 		if (p) {
309 			tipc_node_unsubscribe(net, &p->binding_node, node);
310 			kfree_rcu(p, rcu);
311 			return true;
312 		}
313 		pr_warn_ratelimited("Failed to remove binding %u,%u from %u\n",
314 				    ua.sr.type, ua.sr.lower, node);
315 	} else {
316 		pr_warn("Unrecognized name table message received\n");
317 	}
318 	return false;
319 }
320 
321 static struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
322 					  u16 *rcv_nxt, bool *open)
323 {
324 	struct sk_buff *skb, *tmp;
325 	struct tipc_msg *hdr;
326 	u16 seqno;
327 
328 	spin_lock_bh(&namedq->lock);
329 	skb_queue_walk_safe(namedq, skb, tmp) {
330 		if (unlikely(skb_linearize(skb))) {
331 			__skb_unlink(skb, namedq);
332 			kfree_skb(skb);
333 			continue;
334 		}
335 		hdr = buf_msg(skb);
336 		seqno = msg_named_seqno(hdr);
337 		if (msg_is_last_bulk(hdr)) {
338 			*rcv_nxt = seqno;
339 			*open = true;
340 		}
341 
342 		if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
343 			__skb_unlink(skb, namedq);
344 			spin_unlock_bh(&namedq->lock);
345 			return skb;
346 		}
347 
348 		if (*open && (*rcv_nxt == seqno)) {
349 			(*rcv_nxt)++;
350 			__skb_unlink(skb, namedq);
351 			spin_unlock_bh(&namedq->lock);
352 			return skb;
353 		}
354 
355 		if (less(seqno, *rcv_nxt)) {
356 			__skb_unlink(skb, namedq);
357 			kfree_skb(skb);
358 			continue;
359 		}
360 	}
361 	spin_unlock_bh(&namedq->lock);
362 	return NULL;
363 }
364 
365 /**
366  * tipc_named_rcv - process name table update messages sent by another node
367  * @net: the associated network namespace
368  * @namedq: queue to receive from
369  * @rcv_nxt: store last received seqno here
370  * @open: last bulk msg was received (FIXME)
371  */
372 void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
373 		    u16 *rcv_nxt, bool *open)
374 {
375 	struct tipc_net *tn = tipc_net(net);
376 	struct distr_item *item;
377 	struct tipc_msg *hdr;
378 	struct sk_buff *skb;
379 	u32 count, node;
380 
381 	spin_lock_bh(&tn->nametbl_lock);
382 	while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
383 		hdr = buf_msg(skb);
384 		node = msg_orignode(hdr);
385 		item = (struct distr_item *)msg_data(hdr);
386 		count = msg_data_sz(hdr) / ITEM_SIZE;
387 		while (count--) {
388 			tipc_update_nametbl(net, item, node, msg_type(hdr));
389 			item++;
390 		}
391 		kfree_skb(skb);
392 	}
393 	spin_unlock_bh(&tn->nametbl_lock);
394 }
395 
396 /**
397  * tipc_named_reinit - re-initialize local publications
398  * @net: the associated network namespace
399  *
400  * This routine is called whenever TIPC networking is enabled.
401  * All name table entries published by this node are updated to reflect
402  * the node's new network address.
403  */
404 void tipc_named_reinit(struct net *net)
405 {
406 	struct name_table *nt = tipc_name_table(net);
407 	struct tipc_net *tn = tipc_net(net);
408 	struct publication *p;
409 	u32 self = tipc_own_addr(net);
410 
411 	spin_lock_bh(&tn->nametbl_lock);
412 
413 	list_for_each_entry_rcu(p, &nt->node_scope, binding_node)
414 		p->sk.node = self;
415 	list_for_each_entry_rcu(p, &nt->cluster_scope, binding_node)
416 		p->sk.node = self;
417 	nt->rc_dests = 0;
418 	spin_unlock_bh(&tn->nametbl_lock);
419 }
420