1 /* 2 * net/tipc/name_distr.c: TIPC name distribution code 3 * 4 * Copyright (c) 2000-2006, 2014, Ericsson AB 5 * Copyright (c) 2005, 2010-2011, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37 #include "core.h" 38 #include "link.h" 39 #include "name_distr.h" 40 41 int sysctl_tipc_named_timeout __read_mostly = 2000; 42 43 /** 44 * struct tipc_dist_queue - queue holding deferred name table updates 45 */ 46 static struct list_head tipc_dist_queue = LIST_HEAD_INIT(tipc_dist_queue); 47 48 struct distr_queue_item { 49 struct distr_item i; 50 u32 dtype; 51 u32 node; 52 unsigned long expires; 53 struct list_head next; 54 }; 55 56 /** 57 * publ_to_item - add publication info to a publication message 58 */ 59 static void publ_to_item(struct distr_item *i, struct publication *p) 60 { 61 i->type = htonl(p->type); 62 i->lower = htonl(p->lower); 63 i->upper = htonl(p->upper); 64 i->ref = htonl(p->ref); 65 i->key = htonl(p->key); 66 } 67 68 /** 69 * named_prepare_buf - allocate & initialize a publication message 70 */ 71 static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest) 72 { 73 struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE + size); 74 struct tipc_msg *msg; 75 76 if (buf != NULL) { 77 msg = buf_msg(buf); 78 tipc_msg_init(msg, NAME_DISTRIBUTOR, type, INT_H_SIZE, dest); 79 msg_set_size(msg, INT_H_SIZE + size); 80 } 81 return buf; 82 } 83 84 void named_cluster_distribute(struct sk_buff *skb) 85 { 86 struct sk_buff *oskb; 87 struct tipc_node *node; 88 u32 dnode; 89 90 rcu_read_lock(); 91 list_for_each_entry_rcu(node, &tipc_node_list, list) { 92 dnode = node->addr; 93 if (in_own_node(dnode)) 94 continue; 95 if (!tipc_node_active_links(node)) 96 continue; 97 oskb = skb_copy(skb, GFP_ATOMIC); 98 if (!oskb) 99 break; 100 msg_set_destnode(buf_msg(oskb), dnode); 101 tipc_link_xmit_skb(oskb, dnode, dnode); 102 } 103 rcu_read_unlock(); 104 105 kfree_skb(skb); 106 } 107 108 /** 109 * tipc_named_publish - tell other nodes about a new publication by this node 110 */ 111 struct sk_buff *tipc_named_publish(struct publication *publ) 112 { 113 struct sk_buff *buf; 114 struct distr_item *item; 115 116 list_add_tail_rcu(&publ->local_list, 117 &tipc_nametbl->publ_list[publ->scope]); 118 119 if (publ->scope == TIPC_NODE_SCOPE) 120 return NULL; 121 122 buf = named_prepare_buf(PUBLICATION, ITEM_SIZE, 0); 123 if (!buf) { 124 pr_warn("Publication distribution failure\n"); 125 return NULL; 126 } 127 128 item = (struct distr_item *)msg_data(buf_msg(buf)); 129 publ_to_item(item, publ); 130 return buf; 131 } 132 133 /** 134 * tipc_named_withdraw - tell other nodes about a withdrawn publication by this node 135 */ 136 struct sk_buff *tipc_named_withdraw(struct publication *publ) 137 { 138 struct sk_buff *buf; 139 struct distr_item *item; 140 141 list_del(&publ->local_list); 142 143 if (publ->scope == TIPC_NODE_SCOPE) 144 return NULL; 145 146 buf = named_prepare_buf(WITHDRAWAL, ITEM_SIZE, 0); 147 if (!buf) { 148 pr_warn("Withdrawal distribution failure\n"); 149 return NULL; 150 } 151 152 item = (struct distr_item *)msg_data(buf_msg(buf)); 153 publ_to_item(item, publ); 154 return buf; 155 } 156 157 /** 158 * named_distribute - prepare name info for bulk distribution to another node 159 * @list: list of messages (buffers) to be returned from this function 160 * @dnode: node to be updated 161 * @pls: linked list of publication items to be packed into buffer chain 162 */ 163 static void named_distribute(struct sk_buff_head *list, u32 dnode, 164 struct list_head *pls) 165 { 166 struct publication *publ; 167 struct sk_buff *skb = NULL; 168 struct distr_item *item = NULL; 169 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE; 170 uint msg_rem = msg_dsz; 171 172 list_for_each_entry(publ, pls, local_list) { 173 /* Prepare next buffer: */ 174 if (!skb) { 175 skb = named_prepare_buf(PUBLICATION, msg_rem, dnode); 176 if (!skb) { 177 pr_warn("Bulk publication failure\n"); 178 return; 179 } 180 item = (struct distr_item *)msg_data(buf_msg(skb)); 181 } 182 183 /* Pack publication into message: */ 184 publ_to_item(item, publ); 185 item++; 186 msg_rem -= ITEM_SIZE; 187 188 /* Append full buffer to list: */ 189 if (!msg_rem) { 190 __skb_queue_tail(list, skb); 191 skb = NULL; 192 msg_rem = msg_dsz; 193 } 194 } 195 if (skb) { 196 msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem)); 197 skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem)); 198 __skb_queue_tail(list, skb); 199 } 200 } 201 202 /** 203 * tipc_named_node_up - tell specified node about all publications by this node 204 */ 205 void tipc_named_node_up(u32 dnode) 206 { 207 struct sk_buff_head head; 208 209 __skb_queue_head_init(&head); 210 211 rcu_read_lock(); 212 named_distribute(&head, dnode, 213 &tipc_nametbl->publ_list[TIPC_CLUSTER_SCOPE]); 214 named_distribute(&head, dnode, 215 &tipc_nametbl->publ_list[TIPC_ZONE_SCOPE]); 216 rcu_read_unlock(); 217 218 tipc_link_xmit(&head, dnode, dnode); 219 } 220 221 static void tipc_publ_subscribe(struct publication *publ, u32 addr) 222 { 223 struct tipc_node *node; 224 225 if (in_own_node(addr)) 226 return; 227 228 node = tipc_node_find(addr); 229 if (!node) { 230 pr_warn("Node subscription rejected, unknown node 0x%x\n", 231 addr); 232 return; 233 } 234 235 tipc_node_lock(node); 236 list_add_tail(&publ->nodesub_list, &node->publ_list); 237 tipc_node_unlock(node); 238 } 239 240 static void tipc_publ_unsubscribe(struct publication *publ, u32 addr) 241 { 242 struct tipc_node *node; 243 244 node = tipc_node_find(addr); 245 if (!node) 246 return; 247 248 tipc_node_lock(node); 249 list_del_init(&publ->nodesub_list); 250 tipc_node_unlock(node); 251 } 252 253 /** 254 * tipc_publ_purge - remove publication associated with a failed node 255 * 256 * Invoked for each publication issued by a newly failed node. 257 * Removes publication structure from name table & deletes it. 258 */ 259 static void tipc_publ_purge(struct publication *publ, u32 addr) 260 { 261 struct publication *p; 262 263 spin_lock_bh(&tipc_nametbl_lock); 264 p = tipc_nametbl_remove_publ(publ->type, publ->lower, 265 publ->node, publ->ref, publ->key); 266 if (p) 267 tipc_publ_unsubscribe(p, addr); 268 spin_unlock_bh(&tipc_nametbl_lock); 269 270 if (p != publ) { 271 pr_err("Unable to remove publication from failed node\n" 272 " (type=%u, lower=%u, node=0x%x, ref=%u, key=%u)\n", 273 publ->type, publ->lower, publ->node, publ->ref, 274 publ->key); 275 } 276 277 kfree_rcu(p, rcu); 278 } 279 280 void tipc_publ_notify(struct list_head *nsub_list, u32 addr) 281 { 282 struct publication *publ, *tmp; 283 284 list_for_each_entry_safe(publ, tmp, nsub_list, nodesub_list) 285 tipc_publ_purge(publ, addr); 286 } 287 288 /** 289 * tipc_update_nametbl - try to process a nametable update and notify 290 * subscribers 291 * 292 * tipc_nametbl_lock must be held. 293 * Returns the publication item if successful, otherwise NULL. 294 */ 295 static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype) 296 { 297 struct publication *publ = NULL; 298 299 if (dtype == PUBLICATION) { 300 publ = tipc_nametbl_insert_publ(ntohl(i->type), ntohl(i->lower), 301 ntohl(i->upper), 302 TIPC_CLUSTER_SCOPE, node, 303 ntohl(i->ref), ntohl(i->key)); 304 if (publ) { 305 tipc_publ_subscribe(publ, node); 306 return true; 307 } 308 } else if (dtype == WITHDRAWAL) { 309 publ = tipc_nametbl_remove_publ(ntohl(i->type), ntohl(i->lower), 310 node, ntohl(i->ref), 311 ntohl(i->key)); 312 if (publ) { 313 tipc_publ_unsubscribe(publ, node); 314 kfree_rcu(publ, rcu); 315 return true; 316 } 317 } else { 318 pr_warn("Unrecognized name table message received\n"); 319 } 320 return false; 321 } 322 323 /** 324 * tipc_named_add_backlog - add a failed name table update to the backlog 325 * 326 */ 327 static void tipc_named_add_backlog(struct distr_item *i, u32 type, u32 node) 328 { 329 struct distr_queue_item *e; 330 unsigned long now = get_jiffies_64(); 331 332 e = kzalloc(sizeof(*e), GFP_ATOMIC); 333 if (!e) 334 return; 335 e->dtype = type; 336 e->node = node; 337 e->expires = now + msecs_to_jiffies(sysctl_tipc_named_timeout); 338 memcpy(e, i, sizeof(*i)); 339 list_add_tail(&e->next, &tipc_dist_queue); 340 } 341 342 /** 343 * tipc_named_process_backlog - try to process any pending name table updates 344 * from the network. 345 */ 346 void tipc_named_process_backlog(void) 347 { 348 struct distr_queue_item *e, *tmp; 349 char addr[16]; 350 unsigned long now = get_jiffies_64(); 351 352 list_for_each_entry_safe(e, tmp, &tipc_dist_queue, next) { 353 if (time_after(e->expires, now)) { 354 if (!tipc_update_nametbl(&e->i, e->node, e->dtype)) 355 continue; 356 } else { 357 tipc_addr_string_fill(addr, e->node); 358 pr_warn_ratelimited("Dropping name table update (%d) of {%u, %u, %u} from %s key=%u\n", 359 e->dtype, ntohl(e->i.type), 360 ntohl(e->i.lower), 361 ntohl(e->i.upper), 362 addr, ntohl(e->i.key)); 363 } 364 list_del(&e->next); 365 kfree(e); 366 } 367 } 368 369 /** 370 * tipc_named_rcv - process name table update message sent by another node 371 */ 372 void tipc_named_rcv(struct sk_buff *buf) 373 { 374 struct tipc_msg *msg = buf_msg(buf); 375 struct distr_item *item = (struct distr_item *)msg_data(msg); 376 u32 count = msg_data_sz(msg) / ITEM_SIZE; 377 u32 node = msg_orignode(msg); 378 379 spin_lock_bh(&tipc_nametbl_lock); 380 while (count--) { 381 if (!tipc_update_nametbl(item, node, msg_type(msg))) 382 tipc_named_add_backlog(item, msg_type(msg), node); 383 item++; 384 } 385 tipc_named_process_backlog(); 386 spin_unlock_bh(&tipc_nametbl_lock); 387 kfree_skb(buf); 388 } 389 390 /** 391 * tipc_named_reinit - re-initialize local publications 392 * 393 * This routine is called whenever TIPC networking is enabled. 394 * All name table entries published by this node are updated to reflect 395 * the node's new network address. 396 */ 397 void tipc_named_reinit(void) 398 { 399 struct publication *publ; 400 int scope; 401 402 spin_lock_bh(&tipc_nametbl_lock); 403 404 for (scope = TIPC_ZONE_SCOPE; scope <= TIPC_NODE_SCOPE; scope++) 405 list_for_each_entry_rcu(publ, &tipc_nametbl->publ_list[scope], 406 local_list) 407 publ->node = tipc_own_addr; 408 409 spin_unlock_bh(&tipc_nametbl_lock); 410 } 411