1 /* 2 * net/tipc/link.c: TIPC link code 3 * 4 * Copyright (c) 1996-2007, Ericsson AB 5 * Copyright (c) 2004-2007, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37 #include "core.h" 38 #include "dbg.h" 39 #include "link.h" 40 #include "net.h" 41 #include "node.h" 42 #include "port.h" 43 #include "addr.h" 44 #include "node_subscr.h" 45 #include "name_distr.h" 46 #include "bearer.h" 47 #include "name_table.h" 48 #include "discover.h" 49 #include "config.h" 50 #include "bcast.h" 51 52 53 /* 54 * Out-of-range value for link session numbers 55 */ 56 57 #define INVALID_SESSION 0x10000 58 59 /* 60 * Limit for deferred reception queue: 61 */ 62 63 #define DEF_QUEUE_LIMIT 256u 64 65 /* 66 * Link state events: 67 */ 68 69 #define STARTING_EVT 856384768 /* link processing trigger */ 70 #define TRAFFIC_MSG_EVT 560815u /* rx'd ??? */ 71 #define TIMEOUT_EVT 560817u /* link timer expired */ 72 73 /* 74 * The following two 'message types' is really just implementation 75 * data conveniently stored in the message header. 76 * They must not be considered part of the protocol 77 */ 78 #define OPEN_MSG 0 79 #define CLOSED_MSG 1 80 81 /* 82 * State value stored in 'exp_msg_count' 83 */ 84 85 #define START_CHANGEOVER 100000u 86 87 /** 88 * struct link_name - deconstructed link name 89 * @addr_local: network address of node at this end 90 * @if_local: name of interface at this end 91 * @addr_peer: network address of node at far end 92 * @if_peer: name of interface at far end 93 */ 94 95 struct link_name { 96 u32 addr_local; 97 char if_local[TIPC_MAX_IF_NAME]; 98 u32 addr_peer; 99 char if_peer[TIPC_MAX_IF_NAME]; 100 }; 101 102 static void link_handle_out_of_seq_msg(struct link *l_ptr, 103 struct sk_buff *buf); 104 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf); 105 static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf); 106 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance); 107 static int link_send_sections_long(struct port *sender, 108 struct iovec const *msg_sect, 109 u32 num_sect, u32 destnode); 110 static void link_check_defragm_bufs(struct link *l_ptr); 111 static void link_state_event(struct link *l_ptr, u32 event); 112 static void link_reset_statistics(struct link *l_ptr); 113 static void link_print(struct link *l_ptr, struct print_buf *buf, 114 const char *str); 115 static void link_start(struct link *l_ptr); 116 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf); 117 118 119 /* 120 * Debugging code used by link routines only 121 * 122 * When debugging link problems on a system that has multiple links, 123 * the standard TIPC debugging routines may not be useful since they 124 * allow the output from multiple links to be intermixed. For this reason 125 * routines of the form "dbg_link_XXX()" have been created that will capture 126 * debug info into a link's personal print buffer, which can then be dumped 127 * into the TIPC system log (TIPC_LOG) upon request. 128 * 129 * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size 130 * of the print buffer used by each link. If LINK_LOG_BUF_SIZE is set to 0, 131 * the dbg_link_XXX() routines simply send their output to the standard 132 * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful 133 * when there is only a single link in the system being debugged. 134 * 135 * Notes: 136 * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE 137 * - "l_ptr" must be valid when using dbg_link_XXX() macros 138 */ 139 140 #define LINK_LOG_BUF_SIZE 0 141 142 #define dbg_link(fmt, arg...) \ 143 do { \ 144 if (LINK_LOG_BUF_SIZE) \ 145 tipc_printf(&l_ptr->print_buf, fmt, ## arg); \ 146 } while (0) 147 #define dbg_link_msg(msg, txt) \ 148 do { \ 149 if (LINK_LOG_BUF_SIZE) \ 150 tipc_msg_dbg(&l_ptr->print_buf, msg, txt); \ 151 } while (0) 152 #define dbg_link_state(txt) \ 153 do { \ 154 if (LINK_LOG_BUF_SIZE) \ 155 link_print(l_ptr, &l_ptr->print_buf, txt); \ 156 } while (0) 157 #define dbg_link_dump() do { \ 158 if (LINK_LOG_BUF_SIZE) { \ 159 tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \ 160 tipc_printbuf_move(LOG, &l_ptr->print_buf); \ 161 } \ 162 } while (0) 163 164 static void dbg_print_link(struct link *l_ptr, const char *str) 165 { 166 if (DBG_OUTPUT != TIPC_NULL) 167 link_print(l_ptr, DBG_OUTPUT, str); 168 } 169 170 static void dbg_print_buf_chain(struct sk_buff *root_buf) 171 { 172 if (DBG_OUTPUT != TIPC_NULL) { 173 struct sk_buff *buf = root_buf; 174 175 while (buf) { 176 msg_dbg(buf_msg(buf), "In chain: "); 177 buf = buf->next; 178 } 179 } 180 } 181 182 /* 183 * Simple link routines 184 */ 185 186 static unsigned int align(unsigned int i) 187 { 188 return (i + 3) & ~3u; 189 } 190 191 static void link_init_max_pkt(struct link *l_ptr) 192 { 193 u32 max_pkt; 194 195 max_pkt = (l_ptr->b_ptr->publ.mtu & ~3); 196 if (max_pkt > MAX_MSG_SIZE) 197 max_pkt = MAX_MSG_SIZE; 198 199 l_ptr->max_pkt_target = max_pkt; 200 if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT) 201 l_ptr->max_pkt = l_ptr->max_pkt_target; 202 else 203 l_ptr->max_pkt = MAX_PKT_DEFAULT; 204 205 l_ptr->max_pkt_probes = 0; 206 } 207 208 static u32 link_next_sent(struct link *l_ptr) 209 { 210 if (l_ptr->next_out) 211 return msg_seqno(buf_msg(l_ptr->next_out)); 212 return mod(l_ptr->next_out_no); 213 } 214 215 static u32 link_last_sent(struct link *l_ptr) 216 { 217 return mod(link_next_sent(l_ptr) - 1); 218 } 219 220 /* 221 * Simple non-static link routines (i.e. referenced outside this file) 222 */ 223 224 int tipc_link_is_up(struct link *l_ptr) 225 { 226 if (!l_ptr) 227 return 0; 228 return link_working_working(l_ptr) || link_working_unknown(l_ptr); 229 } 230 231 int tipc_link_is_active(struct link *l_ptr) 232 { 233 return (l_ptr->owner->active_links[0] == l_ptr) || 234 (l_ptr->owner->active_links[1] == l_ptr); 235 } 236 237 /** 238 * link_name_validate - validate & (optionally) deconstruct link name 239 * @name - ptr to link name string 240 * @name_parts - ptr to area for link name components (or NULL if not needed) 241 * 242 * Returns 1 if link name is valid, otherwise 0. 243 */ 244 245 static int link_name_validate(const char *name, struct link_name *name_parts) 246 { 247 char name_copy[TIPC_MAX_LINK_NAME]; 248 char *addr_local; 249 char *if_local; 250 char *addr_peer; 251 char *if_peer; 252 char dummy; 253 u32 z_local, c_local, n_local; 254 u32 z_peer, c_peer, n_peer; 255 u32 if_local_len; 256 u32 if_peer_len; 257 258 /* copy link name & ensure length is OK */ 259 260 name_copy[TIPC_MAX_LINK_NAME - 1] = 0; 261 /* need above in case non-Posix strncpy() doesn't pad with nulls */ 262 strncpy(name_copy, name, TIPC_MAX_LINK_NAME); 263 if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0) 264 return 0; 265 266 /* ensure all component parts of link name are present */ 267 268 addr_local = name_copy; 269 if ((if_local = strchr(addr_local, ':')) == NULL) 270 return 0; 271 *(if_local++) = 0; 272 if ((addr_peer = strchr(if_local, '-')) == NULL) 273 return 0; 274 *(addr_peer++) = 0; 275 if_local_len = addr_peer - if_local; 276 if ((if_peer = strchr(addr_peer, ':')) == NULL) 277 return 0; 278 *(if_peer++) = 0; 279 if_peer_len = strlen(if_peer) + 1; 280 281 /* validate component parts of link name */ 282 283 if ((sscanf(addr_local, "%u.%u.%u%c", 284 &z_local, &c_local, &n_local, &dummy) != 3) || 285 (sscanf(addr_peer, "%u.%u.%u%c", 286 &z_peer, &c_peer, &n_peer, &dummy) != 3) || 287 (z_local > 255) || (c_local > 4095) || (n_local > 4095) || 288 (z_peer > 255) || (c_peer > 4095) || (n_peer > 4095) || 289 (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) || 290 (if_peer_len <= 1) || (if_peer_len > TIPC_MAX_IF_NAME) || 291 (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) || 292 (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1))) 293 return 0; 294 295 /* return link name components, if necessary */ 296 297 if (name_parts) { 298 name_parts->addr_local = tipc_addr(z_local, c_local, n_local); 299 strcpy(name_parts->if_local, if_local); 300 name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer); 301 strcpy(name_parts->if_peer, if_peer); 302 } 303 return 1; 304 } 305 306 /** 307 * link_timeout - handle expiration of link timer 308 * @l_ptr: pointer to link 309 * 310 * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict 311 * with tipc_link_delete(). (There is no risk that the node will be deleted by 312 * another thread because tipc_link_delete() always cancels the link timer before 313 * tipc_node_delete() is called.) 314 */ 315 316 static void link_timeout(struct link *l_ptr) 317 { 318 tipc_node_lock(l_ptr->owner); 319 320 /* update counters used in statistical profiling of send traffic */ 321 322 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 323 l_ptr->stats.queue_sz_counts++; 324 325 if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) 326 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; 327 328 if (l_ptr->first_out) { 329 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 330 u32 length = msg_size(msg); 331 332 if ((msg_user(msg) == MSG_FRAGMENTER) && 333 (msg_type(msg) == FIRST_FRAGMENT)) { 334 length = msg_size(msg_get_wrapped(msg)); 335 } 336 if (length) { 337 l_ptr->stats.msg_lengths_total += length; 338 l_ptr->stats.msg_length_counts++; 339 if (length <= 64) 340 l_ptr->stats.msg_length_profile[0]++; 341 else if (length <= 256) 342 l_ptr->stats.msg_length_profile[1]++; 343 else if (length <= 1024) 344 l_ptr->stats.msg_length_profile[2]++; 345 else if (length <= 4096) 346 l_ptr->stats.msg_length_profile[3]++; 347 else if (length <= 16384) 348 l_ptr->stats.msg_length_profile[4]++; 349 else if (length <= 32768) 350 l_ptr->stats.msg_length_profile[5]++; 351 else 352 l_ptr->stats.msg_length_profile[6]++; 353 } 354 } 355 356 /* do all other link processing performed on a periodic basis */ 357 358 link_check_defragm_bufs(l_ptr); 359 360 link_state_event(l_ptr, TIMEOUT_EVT); 361 362 if (l_ptr->next_out) 363 tipc_link_push_queue(l_ptr); 364 365 tipc_node_unlock(l_ptr->owner); 366 } 367 368 static void link_set_timer(struct link *l_ptr, u32 time) 369 { 370 k_start_timer(&l_ptr->timer, time); 371 } 372 373 /** 374 * tipc_link_create - create a new link 375 * @b_ptr: pointer to associated bearer 376 * @peer: network address of node at other end of link 377 * @media_addr: media address to use when sending messages over link 378 * 379 * Returns pointer to link. 380 */ 381 382 struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer, 383 const struct tipc_media_addr *media_addr) 384 { 385 struct link *l_ptr; 386 struct tipc_msg *msg; 387 char *if_name; 388 389 l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC); 390 if (!l_ptr) { 391 warn("Link creation failed, no memory\n"); 392 return NULL; 393 } 394 395 if (LINK_LOG_BUF_SIZE) { 396 char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC); 397 398 if (!pb) { 399 kfree(l_ptr); 400 warn("Link creation failed, no memory for print buffer\n"); 401 return NULL; 402 } 403 tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE); 404 } 405 406 l_ptr->addr = peer; 407 if_name = strchr(b_ptr->publ.name, ':') + 1; 408 sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:", 409 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 410 tipc_node(tipc_own_addr), 411 if_name, 412 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); 413 /* note: peer i/f is appended to link name by reset/activate */ 414 memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); 415 l_ptr->checkpoint = 1; 416 l_ptr->b_ptr = b_ptr; 417 link_set_supervision_props(l_ptr, b_ptr->media->tolerance); 418 l_ptr->state = RESET_UNKNOWN; 419 420 l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg; 421 msg = l_ptr->pmsg; 422 tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr); 423 msg_set_size(msg, sizeof(l_ptr->proto_msg)); 424 msg_set_session(msg, (tipc_random & 0xffff)); 425 msg_set_bearer_id(msg, b_ptr->identity); 426 strcpy((char *)msg_data(msg), if_name); 427 428 l_ptr->priority = b_ptr->priority; 429 tipc_link_set_queue_limits(l_ptr, b_ptr->media->window); 430 431 link_init_max_pkt(l_ptr); 432 433 l_ptr->next_out_no = 1; 434 INIT_LIST_HEAD(&l_ptr->waiting_ports); 435 436 link_reset_statistics(l_ptr); 437 438 l_ptr->owner = tipc_node_attach_link(l_ptr); 439 if (!l_ptr->owner) { 440 if (LINK_LOG_BUF_SIZE) 441 kfree(l_ptr->print_buf.buf); 442 kfree(l_ptr); 443 return NULL; 444 } 445 446 k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); 447 list_add_tail(&l_ptr->link_list, &b_ptr->links); 448 tipc_k_signal((Handler)link_start, (unsigned long)l_ptr); 449 450 dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n", 451 l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit); 452 453 return l_ptr; 454 } 455 456 /** 457 * tipc_link_delete - delete a link 458 * @l_ptr: pointer to link 459 * 460 * Note: 'tipc_net_lock' is write_locked, bearer is locked. 461 * This routine must not grab the node lock until after link timer cancellation 462 * to avoid a potential deadlock situation. 463 */ 464 465 void tipc_link_delete(struct link *l_ptr) 466 { 467 if (!l_ptr) { 468 err("Attempt to delete non-existent link\n"); 469 return; 470 } 471 472 dbg("tipc_link_delete()\n"); 473 474 k_cancel_timer(&l_ptr->timer); 475 476 tipc_node_lock(l_ptr->owner); 477 tipc_link_reset(l_ptr); 478 tipc_node_detach_link(l_ptr->owner, l_ptr); 479 tipc_link_stop(l_ptr); 480 list_del_init(&l_ptr->link_list); 481 if (LINK_LOG_BUF_SIZE) 482 kfree(l_ptr->print_buf.buf); 483 tipc_node_unlock(l_ptr->owner); 484 k_term_timer(&l_ptr->timer); 485 kfree(l_ptr); 486 } 487 488 static void link_start(struct link *l_ptr) 489 { 490 dbg("link_start %x\n", l_ptr); 491 link_state_event(l_ptr, STARTING_EVT); 492 } 493 494 /** 495 * link_schedule_port - schedule port for deferred sending 496 * @l_ptr: pointer to link 497 * @origport: reference to sending port 498 * @sz: amount of data to be sent 499 * 500 * Schedules port for renewed sending of messages after link congestion 501 * has abated. 502 */ 503 504 static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz) 505 { 506 struct port *p_ptr; 507 508 spin_lock_bh(&tipc_port_list_lock); 509 p_ptr = tipc_port_lock(origport); 510 if (p_ptr) { 511 if (!p_ptr->wakeup) 512 goto exit; 513 if (!list_empty(&p_ptr->wait_list)) 514 goto exit; 515 p_ptr->publ.congested = 1; 516 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 517 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 518 l_ptr->stats.link_congs++; 519 exit: 520 tipc_port_unlock(p_ptr); 521 } 522 spin_unlock_bh(&tipc_port_list_lock); 523 return -ELINKCONG; 524 } 525 526 void tipc_link_wakeup_ports(struct link *l_ptr, int all) 527 { 528 struct port *p_ptr; 529 struct port *temp_p_ptr; 530 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 531 532 if (all) 533 win = 100000; 534 if (win <= 0) 535 return; 536 if (!spin_trylock_bh(&tipc_port_list_lock)) 537 return; 538 if (link_congested(l_ptr)) 539 goto exit; 540 list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports, 541 wait_list) { 542 if (win <= 0) 543 break; 544 list_del_init(&p_ptr->wait_list); 545 spin_lock_bh(p_ptr->publ.lock); 546 p_ptr->publ.congested = 0; 547 p_ptr->wakeup(&p_ptr->publ); 548 win -= p_ptr->waiting_pkts; 549 spin_unlock_bh(p_ptr->publ.lock); 550 } 551 552 exit: 553 spin_unlock_bh(&tipc_port_list_lock); 554 } 555 556 /** 557 * link_release_outqueue - purge link's outbound message queue 558 * @l_ptr: pointer to link 559 */ 560 561 static void link_release_outqueue(struct link *l_ptr) 562 { 563 struct sk_buff *buf = l_ptr->first_out; 564 struct sk_buff *next; 565 566 while (buf) { 567 next = buf->next; 568 buf_discard(buf); 569 buf = next; 570 } 571 l_ptr->first_out = NULL; 572 l_ptr->out_queue_size = 0; 573 } 574 575 /** 576 * tipc_link_reset_fragments - purge link's inbound message fragments queue 577 * @l_ptr: pointer to link 578 */ 579 580 void tipc_link_reset_fragments(struct link *l_ptr) 581 { 582 struct sk_buff *buf = l_ptr->defragm_buf; 583 struct sk_buff *next; 584 585 while (buf) { 586 next = buf->next; 587 buf_discard(buf); 588 buf = next; 589 } 590 l_ptr->defragm_buf = NULL; 591 } 592 593 /** 594 * tipc_link_stop - purge all inbound and outbound messages associated with link 595 * @l_ptr: pointer to link 596 */ 597 598 void tipc_link_stop(struct link *l_ptr) 599 { 600 struct sk_buff *buf; 601 struct sk_buff *next; 602 603 buf = l_ptr->oldest_deferred_in; 604 while (buf) { 605 next = buf->next; 606 buf_discard(buf); 607 buf = next; 608 } 609 610 buf = l_ptr->first_out; 611 while (buf) { 612 next = buf->next; 613 buf_discard(buf); 614 buf = next; 615 } 616 617 tipc_link_reset_fragments(l_ptr); 618 619 buf_discard(l_ptr->proto_msg_queue); 620 l_ptr->proto_msg_queue = NULL; 621 } 622 623 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */ 624 #define link_send_event(fcn, l_ptr, up) do { } while (0) 625 626 void tipc_link_reset(struct link *l_ptr) 627 { 628 struct sk_buff *buf; 629 u32 prev_state = l_ptr->state; 630 u32 checkpoint = l_ptr->next_in_no; 631 int was_active_link = tipc_link_is_active(l_ptr); 632 633 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); 634 635 /* Link is down, accept any session */ 636 l_ptr->peer_session = INVALID_SESSION; 637 638 /* Prepare for max packet size negotiation */ 639 link_init_max_pkt(l_ptr); 640 641 l_ptr->state = RESET_UNKNOWN; 642 dbg_link_state("Resetting Link\n"); 643 644 if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET)) 645 return; 646 647 tipc_node_link_down(l_ptr->owner, l_ptr); 648 tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); 649 650 if (was_active_link && tipc_node_has_active_links(l_ptr->owner) && 651 l_ptr->owner->permit_changeover) { 652 l_ptr->reset_checkpoint = checkpoint; 653 l_ptr->exp_msg_count = START_CHANGEOVER; 654 } 655 656 /* Clean up all queues: */ 657 658 link_release_outqueue(l_ptr); 659 buf_discard(l_ptr->proto_msg_queue); 660 l_ptr->proto_msg_queue = NULL; 661 buf = l_ptr->oldest_deferred_in; 662 while (buf) { 663 struct sk_buff *next = buf->next; 664 buf_discard(buf); 665 buf = next; 666 } 667 if (!list_empty(&l_ptr->waiting_ports)) 668 tipc_link_wakeup_ports(l_ptr, 1); 669 670 l_ptr->retransm_queue_head = 0; 671 l_ptr->retransm_queue_size = 0; 672 l_ptr->last_out = NULL; 673 l_ptr->first_out = NULL; 674 l_ptr->next_out = NULL; 675 l_ptr->unacked_window = 0; 676 l_ptr->checkpoint = 1; 677 l_ptr->next_out_no = 1; 678 l_ptr->deferred_inqueue_sz = 0; 679 l_ptr->oldest_deferred_in = NULL; 680 l_ptr->newest_deferred_in = NULL; 681 l_ptr->fsm_msg_cnt = 0; 682 l_ptr->stale_count = 0; 683 link_reset_statistics(l_ptr); 684 685 link_send_event(tipc_cfg_link_event, l_ptr, 0); 686 if (!in_own_cluster(l_ptr->addr)) 687 link_send_event(tipc_disc_link_event, l_ptr, 0); 688 } 689 690 691 static void link_activate(struct link *l_ptr) 692 { 693 l_ptr->next_in_no = l_ptr->stats.recv_info = 1; 694 tipc_node_link_up(l_ptr->owner, l_ptr); 695 tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr); 696 link_send_event(tipc_cfg_link_event, l_ptr, 1); 697 if (!in_own_cluster(l_ptr->addr)) 698 link_send_event(tipc_disc_link_event, l_ptr, 1); 699 } 700 701 /** 702 * link_state_event - link finite state machine 703 * @l_ptr: pointer to link 704 * @event: state machine event to process 705 */ 706 707 static void link_state_event(struct link *l_ptr, unsigned event) 708 { 709 struct link *other; 710 u32 cont_intv = l_ptr->continuity_interval; 711 712 if (!l_ptr->started && (event != STARTING_EVT)) 713 return; /* Not yet. */ 714 715 if (link_blocked(l_ptr)) { 716 if (event == TIMEOUT_EVT) { 717 link_set_timer(l_ptr, cont_intv); 718 } 719 return; /* Changeover going on */ 720 } 721 dbg_link("STATE_EV: <%s> ", l_ptr->name); 722 723 switch (l_ptr->state) { 724 case WORKING_WORKING: 725 dbg_link("WW/"); 726 switch (event) { 727 case TRAFFIC_MSG_EVT: 728 dbg_link("TRF-"); 729 /* fall through */ 730 case ACTIVATE_MSG: 731 dbg_link("ACT\n"); 732 break; 733 case TIMEOUT_EVT: 734 dbg_link("TIM "); 735 if (l_ptr->next_in_no != l_ptr->checkpoint) { 736 l_ptr->checkpoint = l_ptr->next_in_no; 737 if (tipc_bclink_acks_missing(l_ptr->owner)) { 738 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 739 0, 0, 0, 0, 0); 740 l_ptr->fsm_msg_cnt++; 741 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { 742 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 743 1, 0, 0, 0, 0); 744 l_ptr->fsm_msg_cnt++; 745 } 746 link_set_timer(l_ptr, cont_intv); 747 break; 748 } 749 dbg_link(" -> WU\n"); 750 l_ptr->state = WORKING_UNKNOWN; 751 l_ptr->fsm_msg_cnt = 0; 752 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 753 l_ptr->fsm_msg_cnt++; 754 link_set_timer(l_ptr, cont_intv / 4); 755 break; 756 case RESET_MSG: 757 dbg_link("RES -> RR\n"); 758 info("Resetting link <%s>, requested by peer\n", 759 l_ptr->name); 760 tipc_link_reset(l_ptr); 761 l_ptr->state = RESET_RESET; 762 l_ptr->fsm_msg_cnt = 0; 763 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 764 l_ptr->fsm_msg_cnt++; 765 link_set_timer(l_ptr, cont_intv); 766 break; 767 default: 768 err("Unknown link event %u in WW state\n", event); 769 } 770 break; 771 case WORKING_UNKNOWN: 772 dbg_link("WU/"); 773 switch (event) { 774 case TRAFFIC_MSG_EVT: 775 dbg_link("TRF-"); 776 case ACTIVATE_MSG: 777 dbg_link("ACT -> WW\n"); 778 l_ptr->state = WORKING_WORKING; 779 l_ptr->fsm_msg_cnt = 0; 780 link_set_timer(l_ptr, cont_intv); 781 break; 782 case RESET_MSG: 783 dbg_link("RES -> RR\n"); 784 info("Resetting link <%s>, requested by peer " 785 "while probing\n", l_ptr->name); 786 tipc_link_reset(l_ptr); 787 l_ptr->state = RESET_RESET; 788 l_ptr->fsm_msg_cnt = 0; 789 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 790 l_ptr->fsm_msg_cnt++; 791 link_set_timer(l_ptr, cont_intv); 792 break; 793 case TIMEOUT_EVT: 794 dbg_link("TIM "); 795 if (l_ptr->next_in_no != l_ptr->checkpoint) { 796 dbg_link("-> WW\n"); 797 l_ptr->state = WORKING_WORKING; 798 l_ptr->fsm_msg_cnt = 0; 799 l_ptr->checkpoint = l_ptr->next_in_no; 800 if (tipc_bclink_acks_missing(l_ptr->owner)) { 801 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 802 0, 0, 0, 0, 0); 803 l_ptr->fsm_msg_cnt++; 804 } 805 link_set_timer(l_ptr, cont_intv); 806 } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) { 807 dbg_link("Probing %u/%u,timer = %u ms)\n", 808 l_ptr->fsm_msg_cnt, l_ptr->abort_limit, 809 cont_intv / 4); 810 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 811 1, 0, 0, 0, 0); 812 l_ptr->fsm_msg_cnt++; 813 link_set_timer(l_ptr, cont_intv / 4); 814 } else { /* Link has failed */ 815 dbg_link("-> RU (%u probes unanswered)\n", 816 l_ptr->fsm_msg_cnt); 817 warn("Resetting link <%s>, peer not responding\n", 818 l_ptr->name); 819 tipc_link_reset(l_ptr); 820 l_ptr->state = RESET_UNKNOWN; 821 l_ptr->fsm_msg_cnt = 0; 822 tipc_link_send_proto_msg(l_ptr, RESET_MSG, 823 0, 0, 0, 0, 0); 824 l_ptr->fsm_msg_cnt++; 825 link_set_timer(l_ptr, cont_intv); 826 } 827 break; 828 default: 829 err("Unknown link event %u in WU state\n", event); 830 } 831 break; 832 case RESET_UNKNOWN: 833 dbg_link("RU/"); 834 switch (event) { 835 case TRAFFIC_MSG_EVT: 836 dbg_link("TRF-\n"); 837 break; 838 case ACTIVATE_MSG: 839 other = l_ptr->owner->active_links[0]; 840 if (other && link_working_unknown(other)) { 841 dbg_link("ACT\n"); 842 break; 843 } 844 dbg_link("ACT -> WW\n"); 845 l_ptr->state = WORKING_WORKING; 846 l_ptr->fsm_msg_cnt = 0; 847 link_activate(l_ptr); 848 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 849 l_ptr->fsm_msg_cnt++; 850 link_set_timer(l_ptr, cont_intv); 851 break; 852 case RESET_MSG: 853 dbg_link("RES\n"); 854 dbg_link(" -> RR\n"); 855 l_ptr->state = RESET_RESET; 856 l_ptr->fsm_msg_cnt = 0; 857 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0); 858 l_ptr->fsm_msg_cnt++; 859 link_set_timer(l_ptr, cont_intv); 860 break; 861 case STARTING_EVT: 862 dbg_link("START-"); 863 l_ptr->started = 1; 864 /* fall through */ 865 case TIMEOUT_EVT: 866 dbg_link("TIM\n"); 867 tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); 868 l_ptr->fsm_msg_cnt++; 869 link_set_timer(l_ptr, cont_intv); 870 break; 871 default: 872 err("Unknown link event %u in RU state\n", event); 873 } 874 break; 875 case RESET_RESET: 876 dbg_link("RR/ "); 877 switch (event) { 878 case TRAFFIC_MSG_EVT: 879 dbg_link("TRF-"); 880 /* fall through */ 881 case ACTIVATE_MSG: 882 other = l_ptr->owner->active_links[0]; 883 if (other && link_working_unknown(other)) { 884 dbg_link("ACT\n"); 885 break; 886 } 887 dbg_link("ACT -> WW\n"); 888 l_ptr->state = WORKING_WORKING; 889 l_ptr->fsm_msg_cnt = 0; 890 link_activate(l_ptr); 891 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 892 l_ptr->fsm_msg_cnt++; 893 link_set_timer(l_ptr, cont_intv); 894 break; 895 case RESET_MSG: 896 dbg_link("RES\n"); 897 break; 898 case TIMEOUT_EVT: 899 dbg_link("TIM\n"); 900 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 901 l_ptr->fsm_msg_cnt++; 902 link_set_timer(l_ptr, cont_intv); 903 dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt); 904 break; 905 default: 906 err("Unknown link event %u in RR state\n", event); 907 } 908 break; 909 default: 910 err("Unknown link state %u/%u\n", l_ptr->state, event); 911 } 912 } 913 914 /* 915 * link_bundle_buf(): Append contents of a buffer to 916 * the tail of an existing one. 917 */ 918 919 static int link_bundle_buf(struct link *l_ptr, 920 struct sk_buff *bundler, 921 struct sk_buff *buf) 922 { 923 struct tipc_msg *bundler_msg = buf_msg(bundler); 924 struct tipc_msg *msg = buf_msg(buf); 925 u32 size = msg_size(msg); 926 u32 bundle_size = msg_size(bundler_msg); 927 u32 to_pos = align(bundle_size); 928 u32 pad = to_pos - bundle_size; 929 930 if (msg_user(bundler_msg) != MSG_BUNDLER) 931 return 0; 932 if (msg_type(bundler_msg) != OPEN_MSG) 933 return 0; 934 if (skb_tailroom(bundler) < (pad + size)) 935 return 0; 936 if (l_ptr->max_pkt < (to_pos + size)) 937 return 0; 938 939 skb_put(bundler, pad + size); 940 skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size); 941 msg_set_size(bundler_msg, to_pos + size); 942 msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1); 943 dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n", 944 msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg)); 945 msg_dbg(msg, "PACKD:"); 946 buf_discard(buf); 947 l_ptr->stats.sent_bundled++; 948 return 1; 949 } 950 951 static void link_add_to_outqueue(struct link *l_ptr, 952 struct sk_buff *buf, 953 struct tipc_msg *msg) 954 { 955 u32 ack = mod(l_ptr->next_in_no - 1); 956 u32 seqno = mod(l_ptr->next_out_no++); 957 958 msg_set_word(msg, 2, ((ack << 16) | seqno)); 959 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 960 buf->next = NULL; 961 if (l_ptr->first_out) { 962 l_ptr->last_out->next = buf; 963 l_ptr->last_out = buf; 964 } else 965 l_ptr->first_out = l_ptr->last_out = buf; 966 l_ptr->out_queue_size++; 967 } 968 969 /* 970 * tipc_link_send_buf() is the 'full path' for messages, called from 971 * inside TIPC when the 'fast path' in tipc_send_buf 972 * has failed, and from link_send() 973 */ 974 975 int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf) 976 { 977 struct tipc_msg *msg = buf_msg(buf); 978 u32 size = msg_size(msg); 979 u32 dsz = msg_data_sz(msg); 980 u32 queue_size = l_ptr->out_queue_size; 981 u32 imp = tipc_msg_tot_importance(msg); 982 u32 queue_limit = l_ptr->queue_limit[imp]; 983 u32 max_packet = l_ptr->max_pkt; 984 985 msg_set_prevnode(msg, tipc_own_addr); /* If routed message */ 986 987 /* Match msg importance against queue limits: */ 988 989 if (unlikely(queue_size >= queue_limit)) { 990 if (imp <= TIPC_CRITICAL_IMPORTANCE) { 991 return link_schedule_port(l_ptr, msg_origport(msg), 992 size); 993 } 994 msg_dbg(msg, "TIPC: Congestion, throwing away\n"); 995 buf_discard(buf); 996 if (imp > CONN_MANAGER) { 997 warn("Resetting link <%s>, send queue full", l_ptr->name); 998 tipc_link_reset(l_ptr); 999 } 1000 return dsz; 1001 } 1002 1003 /* Fragmentation needed ? */ 1004 1005 if (size > max_packet) 1006 return link_send_long_buf(l_ptr, buf); 1007 1008 /* Packet can be queued or sent: */ 1009 1010 if (queue_size > l_ptr->stats.max_queue_sz) 1011 l_ptr->stats.max_queue_sz = queue_size; 1012 1013 if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && 1014 !link_congested(l_ptr))) { 1015 link_add_to_outqueue(l_ptr, buf, msg); 1016 1017 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { 1018 l_ptr->unacked_window = 0; 1019 } else { 1020 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1021 l_ptr->stats.bearer_congs++; 1022 l_ptr->next_out = buf; 1023 } 1024 return dsz; 1025 } 1026 /* Congestion: can message be bundled ?: */ 1027 1028 if ((msg_user(msg) != CHANGEOVER_PROTOCOL) && 1029 (msg_user(msg) != MSG_FRAGMENTER)) { 1030 1031 /* Try adding message to an existing bundle */ 1032 1033 if (l_ptr->next_out && 1034 link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { 1035 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); 1036 return dsz; 1037 } 1038 1039 /* Try creating a new bundle */ 1040 1041 if (size <= max_packet * 2 / 3) { 1042 struct sk_buff *bundler = tipc_buf_acquire(max_packet); 1043 struct tipc_msg bundler_hdr; 1044 1045 if (bundler) { 1046 tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG, 1047 INT_H_SIZE, l_ptr->addr); 1048 skb_copy_to_linear_data(bundler, &bundler_hdr, 1049 INT_H_SIZE); 1050 skb_trim(bundler, INT_H_SIZE); 1051 link_bundle_buf(l_ptr, bundler, buf); 1052 buf = bundler; 1053 msg = buf_msg(buf); 1054 l_ptr->stats.sent_bundles++; 1055 } 1056 } 1057 } 1058 if (!l_ptr->next_out) 1059 l_ptr->next_out = buf; 1060 link_add_to_outqueue(l_ptr, buf, msg); 1061 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); 1062 return dsz; 1063 } 1064 1065 /* 1066 * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has 1067 * not been selected yet, and the the owner node is not locked 1068 * Called by TIPC internal users, e.g. the name distributor 1069 */ 1070 1071 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) 1072 { 1073 struct link *l_ptr; 1074 struct tipc_node *n_ptr; 1075 int res = -ELINKCONG; 1076 1077 read_lock_bh(&tipc_net_lock); 1078 n_ptr = tipc_node_select(dest, selector); 1079 if (n_ptr) { 1080 tipc_node_lock(n_ptr); 1081 l_ptr = n_ptr->active_links[selector & 1]; 1082 if (l_ptr) { 1083 dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest); 1084 res = tipc_link_send_buf(l_ptr, buf); 1085 } else { 1086 dbg("Attempt to send msg to unreachable node:\n"); 1087 msg_dbg(buf_msg(buf),">>>"); 1088 buf_discard(buf); 1089 } 1090 tipc_node_unlock(n_ptr); 1091 } else { 1092 dbg("Attempt to send msg to unknown node:\n"); 1093 msg_dbg(buf_msg(buf),">>>"); 1094 buf_discard(buf); 1095 } 1096 read_unlock_bh(&tipc_net_lock); 1097 return res; 1098 } 1099 1100 /* 1101 * link_send_buf_fast: Entry for data messages where the 1102 * destination link is known and the header is complete, 1103 * inclusive total message length. Very time critical. 1104 * Link is locked. Returns user data length. 1105 */ 1106 1107 static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf, 1108 u32 *used_max_pkt) 1109 { 1110 struct tipc_msg *msg = buf_msg(buf); 1111 int res = msg_data_sz(msg); 1112 1113 if (likely(!link_congested(l_ptr))) { 1114 if (likely(msg_size(msg) <= l_ptr->max_pkt)) { 1115 if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { 1116 link_add_to_outqueue(l_ptr, buf, msg); 1117 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, 1118 &l_ptr->media_addr))) { 1119 l_ptr->unacked_window = 0; 1120 msg_dbg(msg,"SENT_FAST:"); 1121 return res; 1122 } 1123 dbg("failed sent fast...\n"); 1124 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1125 l_ptr->stats.bearer_congs++; 1126 l_ptr->next_out = buf; 1127 return res; 1128 } 1129 } 1130 else 1131 *used_max_pkt = l_ptr->max_pkt; 1132 } 1133 return tipc_link_send_buf(l_ptr, buf); /* All other cases */ 1134 } 1135 1136 /* 1137 * tipc_send_buf_fast: Entry for data messages where the 1138 * destination node is known and the header is complete, 1139 * inclusive total message length. 1140 * Returns user data length. 1141 */ 1142 int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode) 1143 { 1144 struct link *l_ptr; 1145 struct tipc_node *n_ptr; 1146 int res; 1147 u32 selector = msg_origport(buf_msg(buf)) & 1; 1148 u32 dummy; 1149 1150 if (destnode == tipc_own_addr) 1151 return tipc_port_recv_msg(buf); 1152 1153 read_lock_bh(&tipc_net_lock); 1154 n_ptr = tipc_node_select(destnode, selector); 1155 if (likely(n_ptr)) { 1156 tipc_node_lock(n_ptr); 1157 l_ptr = n_ptr->active_links[selector]; 1158 dbg("send_fast: buf %x selected %x, destnode = %x\n", 1159 buf, l_ptr, destnode); 1160 if (likely(l_ptr)) { 1161 res = link_send_buf_fast(l_ptr, buf, &dummy); 1162 tipc_node_unlock(n_ptr); 1163 read_unlock_bh(&tipc_net_lock); 1164 return res; 1165 } 1166 tipc_node_unlock(n_ptr); 1167 } 1168 read_unlock_bh(&tipc_net_lock); 1169 res = msg_data_sz(buf_msg(buf)); 1170 tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1171 return res; 1172 } 1173 1174 1175 /* 1176 * tipc_link_send_sections_fast: Entry for messages where the 1177 * destination processor is known and the header is complete, 1178 * except for total message length. 1179 * Returns user data length or errno. 1180 */ 1181 int tipc_link_send_sections_fast(struct port *sender, 1182 struct iovec const *msg_sect, 1183 const u32 num_sect, 1184 u32 destaddr) 1185 { 1186 struct tipc_msg *hdr = &sender->publ.phdr; 1187 struct link *l_ptr; 1188 struct sk_buff *buf; 1189 struct tipc_node *node; 1190 int res; 1191 u32 selector = msg_origport(hdr) & 1; 1192 1193 again: 1194 /* 1195 * Try building message using port's max_pkt hint. 1196 * (Must not hold any locks while building message.) 1197 */ 1198 1199 res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt, 1200 !sender->user_port, &buf); 1201 1202 read_lock_bh(&tipc_net_lock); 1203 node = tipc_node_select(destaddr, selector); 1204 if (likely(node)) { 1205 tipc_node_lock(node); 1206 l_ptr = node->active_links[selector]; 1207 if (likely(l_ptr)) { 1208 if (likely(buf)) { 1209 res = link_send_buf_fast(l_ptr, buf, 1210 &sender->publ.max_pkt); 1211 if (unlikely(res < 0)) 1212 buf_discard(buf); 1213 exit: 1214 tipc_node_unlock(node); 1215 read_unlock_bh(&tipc_net_lock); 1216 return res; 1217 } 1218 1219 /* Exit if build request was invalid */ 1220 1221 if (unlikely(res < 0)) 1222 goto exit; 1223 1224 /* Exit if link (or bearer) is congested */ 1225 1226 if (link_congested(l_ptr) || 1227 !list_empty(&l_ptr->b_ptr->cong_links)) { 1228 res = link_schedule_port(l_ptr, 1229 sender->publ.ref, res); 1230 goto exit; 1231 } 1232 1233 /* 1234 * Message size exceeds max_pkt hint; update hint, 1235 * then re-try fast path or fragment the message 1236 */ 1237 1238 sender->publ.max_pkt = l_ptr->max_pkt; 1239 tipc_node_unlock(node); 1240 read_unlock_bh(&tipc_net_lock); 1241 1242 1243 if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt) 1244 goto again; 1245 1246 return link_send_sections_long(sender, msg_sect, 1247 num_sect, destaddr); 1248 } 1249 tipc_node_unlock(node); 1250 } 1251 read_unlock_bh(&tipc_net_lock); 1252 1253 /* Couldn't find a link to the destination node */ 1254 1255 if (buf) 1256 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1257 if (res >= 0) 1258 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1259 TIPC_ERR_NO_NODE); 1260 return res; 1261 } 1262 1263 /* 1264 * link_send_sections_long(): Entry for long messages where the 1265 * destination node is known and the header is complete, 1266 * inclusive total message length. 1267 * Link and bearer congestion status have been checked to be ok, 1268 * and are ignored if they change. 1269 * 1270 * Note that fragments do not use the full link MTU so that they won't have 1271 * to undergo refragmentation if link changeover causes them to be sent 1272 * over another link with an additional tunnel header added as prefix. 1273 * (Refragmentation will still occur if the other link has a smaller MTU.) 1274 * 1275 * Returns user data length or errno. 1276 */ 1277 static int link_send_sections_long(struct port *sender, 1278 struct iovec const *msg_sect, 1279 u32 num_sect, 1280 u32 destaddr) 1281 { 1282 struct link *l_ptr; 1283 struct tipc_node *node; 1284 struct tipc_msg *hdr = &sender->publ.phdr; 1285 u32 dsz = msg_data_sz(hdr); 1286 u32 max_pkt,fragm_sz,rest; 1287 struct tipc_msg fragm_hdr; 1288 struct sk_buff *buf,*buf_chain,*prev; 1289 u32 fragm_crs,fragm_rest,hsz,sect_rest; 1290 const unchar *sect_crs; 1291 int curr_sect; 1292 u32 fragm_no; 1293 1294 again: 1295 fragm_no = 1; 1296 max_pkt = sender->publ.max_pkt - INT_H_SIZE; 1297 /* leave room for tunnel header in case of link changeover */ 1298 fragm_sz = max_pkt - INT_H_SIZE; 1299 /* leave room for fragmentation header in each fragment */ 1300 rest = dsz; 1301 fragm_crs = 0; 1302 fragm_rest = 0; 1303 sect_rest = 0; 1304 sect_crs = NULL; 1305 curr_sect = -1; 1306 1307 /* Prepare reusable fragment header: */ 1308 1309 msg_dbg(hdr, ">FRAGMENTING>"); 1310 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 1311 INT_H_SIZE, msg_destnode(hdr)); 1312 msg_set_link_selector(&fragm_hdr, sender->publ.ref); 1313 msg_set_size(&fragm_hdr, max_pkt); 1314 msg_set_fragm_no(&fragm_hdr, 1); 1315 1316 /* Prepare header of first fragment: */ 1317 1318 buf_chain = buf = tipc_buf_acquire(max_pkt); 1319 if (!buf) 1320 return -ENOMEM; 1321 buf->next = NULL; 1322 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); 1323 hsz = msg_hdr_sz(hdr); 1324 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz); 1325 msg_dbg(buf_msg(buf), ">BUILD>"); 1326 1327 /* Chop up message: */ 1328 1329 fragm_crs = INT_H_SIZE + hsz; 1330 fragm_rest = fragm_sz - hsz; 1331 1332 do { /* For all sections */ 1333 u32 sz; 1334 1335 if (!sect_rest) { 1336 sect_rest = msg_sect[++curr_sect].iov_len; 1337 sect_crs = (const unchar *)msg_sect[curr_sect].iov_base; 1338 } 1339 1340 if (sect_rest < fragm_rest) 1341 sz = sect_rest; 1342 else 1343 sz = fragm_rest; 1344 1345 if (likely(!sender->user_port)) { 1346 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { 1347 error: 1348 for (; buf_chain; buf_chain = buf) { 1349 buf = buf_chain->next; 1350 buf_discard(buf_chain); 1351 } 1352 return -EFAULT; 1353 } 1354 } else 1355 skb_copy_to_linear_data_offset(buf, fragm_crs, 1356 sect_crs, sz); 1357 sect_crs += sz; 1358 sect_rest -= sz; 1359 fragm_crs += sz; 1360 fragm_rest -= sz; 1361 rest -= sz; 1362 1363 if (!fragm_rest && rest) { 1364 1365 /* Initiate new fragment: */ 1366 if (rest <= fragm_sz) { 1367 fragm_sz = rest; 1368 msg_set_type(&fragm_hdr,LAST_FRAGMENT); 1369 } else { 1370 msg_set_type(&fragm_hdr, FRAGMENT); 1371 } 1372 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 1373 msg_set_fragm_no(&fragm_hdr, ++fragm_no); 1374 prev = buf; 1375 buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE); 1376 if (!buf) 1377 goto error; 1378 1379 buf->next = NULL; 1380 prev->next = buf; 1381 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); 1382 fragm_crs = INT_H_SIZE; 1383 fragm_rest = fragm_sz; 1384 msg_dbg(buf_msg(buf)," >BUILD>"); 1385 } 1386 } 1387 while (rest > 0); 1388 1389 /* 1390 * Now we have a buffer chain. Select a link and check 1391 * that packet size is still OK 1392 */ 1393 node = tipc_node_select(destaddr, sender->publ.ref & 1); 1394 if (likely(node)) { 1395 tipc_node_lock(node); 1396 l_ptr = node->active_links[sender->publ.ref & 1]; 1397 if (!l_ptr) { 1398 tipc_node_unlock(node); 1399 goto reject; 1400 } 1401 if (l_ptr->max_pkt < max_pkt) { 1402 sender->publ.max_pkt = l_ptr->max_pkt; 1403 tipc_node_unlock(node); 1404 for (; buf_chain; buf_chain = buf) { 1405 buf = buf_chain->next; 1406 buf_discard(buf_chain); 1407 } 1408 goto again; 1409 } 1410 } else { 1411 reject: 1412 for (; buf_chain; buf_chain = buf) { 1413 buf = buf_chain->next; 1414 buf_discard(buf_chain); 1415 } 1416 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1417 TIPC_ERR_NO_NODE); 1418 } 1419 1420 /* Append whole chain to send queue: */ 1421 1422 buf = buf_chain; 1423 l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1); 1424 if (!l_ptr->next_out) 1425 l_ptr->next_out = buf_chain; 1426 l_ptr->stats.sent_fragmented++; 1427 while (buf) { 1428 struct sk_buff *next = buf->next; 1429 struct tipc_msg *msg = buf_msg(buf); 1430 1431 l_ptr->stats.sent_fragments++; 1432 msg_set_long_msgno(msg, l_ptr->long_msg_seq_no); 1433 link_add_to_outqueue(l_ptr, buf, msg); 1434 msg_dbg(msg, ">ADD>"); 1435 buf = next; 1436 } 1437 1438 /* Send it, if possible: */ 1439 1440 tipc_link_push_queue(l_ptr); 1441 tipc_node_unlock(node); 1442 return dsz; 1443 } 1444 1445 /* 1446 * tipc_link_push_packet: Push one unsent packet to the media 1447 */ 1448 u32 tipc_link_push_packet(struct link *l_ptr) 1449 { 1450 struct sk_buff *buf = l_ptr->first_out; 1451 u32 r_q_size = l_ptr->retransm_queue_size; 1452 u32 r_q_head = l_ptr->retransm_queue_head; 1453 1454 /* Step to position where retransmission failed, if any, */ 1455 /* consider that buffers may have been released in meantime */ 1456 1457 if (r_q_size && buf) { 1458 u32 last = lesser(mod(r_q_head + r_q_size), 1459 link_last_sent(l_ptr)); 1460 u32 first = msg_seqno(buf_msg(buf)); 1461 1462 while (buf && less(first, r_q_head)) { 1463 first = mod(first + 1); 1464 buf = buf->next; 1465 } 1466 l_ptr->retransm_queue_head = r_q_head = first; 1467 l_ptr->retransm_queue_size = r_q_size = mod(last - first); 1468 } 1469 1470 /* Continue retransmission now, if there is anything: */ 1471 1472 if (r_q_size && buf) { 1473 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1474 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 1475 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1476 msg_dbg(buf_msg(buf), ">DEF-RETR>"); 1477 l_ptr->retransm_queue_head = mod(++r_q_head); 1478 l_ptr->retransm_queue_size = --r_q_size; 1479 l_ptr->stats.retransmitted++; 1480 return 0; 1481 } else { 1482 l_ptr->stats.bearer_congs++; 1483 msg_dbg(buf_msg(buf), "|>DEF-RETR>"); 1484 return PUSH_FAILED; 1485 } 1486 } 1487 1488 /* Send deferred protocol message, if any: */ 1489 1490 buf = l_ptr->proto_msg_queue; 1491 if (buf) { 1492 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1493 msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in); 1494 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1495 msg_dbg(buf_msg(buf), ">DEF-PROT>"); 1496 l_ptr->unacked_window = 0; 1497 buf_discard(buf); 1498 l_ptr->proto_msg_queue = NULL; 1499 return 0; 1500 } else { 1501 msg_dbg(buf_msg(buf), "|>DEF-PROT>"); 1502 l_ptr->stats.bearer_congs++; 1503 return PUSH_FAILED; 1504 } 1505 } 1506 1507 /* Send one deferred data message, if send window not full: */ 1508 1509 buf = l_ptr->next_out; 1510 if (buf) { 1511 struct tipc_msg *msg = buf_msg(buf); 1512 u32 next = msg_seqno(msg); 1513 u32 first = msg_seqno(buf_msg(l_ptr->first_out)); 1514 1515 if (mod(next - first) < l_ptr->queue_limit[0]) { 1516 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1517 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1518 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1519 if (msg_user(msg) == MSG_BUNDLER) 1520 msg_set_type(msg, CLOSED_MSG); 1521 msg_dbg(msg, ">PUSH-DATA>"); 1522 l_ptr->next_out = buf->next; 1523 return 0; 1524 } else { 1525 msg_dbg(msg, "|PUSH-DATA|"); 1526 l_ptr->stats.bearer_congs++; 1527 return PUSH_FAILED; 1528 } 1529 } 1530 } 1531 return PUSH_FINISHED; 1532 } 1533 1534 /* 1535 * push_queue(): push out the unsent messages of a link where 1536 * congestion has abated. Node is locked 1537 */ 1538 void tipc_link_push_queue(struct link *l_ptr) 1539 { 1540 u32 res; 1541 1542 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) 1543 return; 1544 1545 do { 1546 res = tipc_link_push_packet(l_ptr); 1547 } while (!res); 1548 1549 if (res == PUSH_FAILED) 1550 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1551 } 1552 1553 static void link_reset_all(unsigned long addr) 1554 { 1555 struct tipc_node *n_ptr; 1556 char addr_string[16]; 1557 u32 i; 1558 1559 read_lock_bh(&tipc_net_lock); 1560 n_ptr = tipc_node_find((u32)addr); 1561 if (!n_ptr) { 1562 read_unlock_bh(&tipc_net_lock); 1563 return; /* node no longer exists */ 1564 } 1565 1566 tipc_node_lock(n_ptr); 1567 1568 warn("Resetting all links to %s\n", 1569 tipc_addr_string_fill(addr_string, n_ptr->addr)); 1570 1571 for (i = 0; i < MAX_BEARERS; i++) { 1572 if (n_ptr->links[i]) { 1573 link_print(n_ptr->links[i], TIPC_OUTPUT, 1574 "Resetting link\n"); 1575 tipc_link_reset(n_ptr->links[i]); 1576 } 1577 } 1578 1579 tipc_node_unlock(n_ptr); 1580 read_unlock_bh(&tipc_net_lock); 1581 } 1582 1583 static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf) 1584 { 1585 struct tipc_msg *msg = buf_msg(buf); 1586 1587 warn("Retransmission failure on link <%s>\n", l_ptr->name); 1588 tipc_msg_dbg(TIPC_OUTPUT, msg, ">RETR-FAIL>"); 1589 1590 if (l_ptr->addr) { 1591 1592 /* Handle failure on standard link */ 1593 1594 link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n"); 1595 tipc_link_reset(l_ptr); 1596 1597 } else { 1598 1599 /* Handle failure on broadcast link */ 1600 1601 struct tipc_node *n_ptr; 1602 char addr_string[16]; 1603 1604 tipc_printf(TIPC_OUTPUT, "Msg seq number: %u, ", msg_seqno(msg)); 1605 tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n", 1606 (unsigned long) TIPC_SKB_CB(buf)->handle); 1607 1608 n_ptr = l_ptr->owner->next; 1609 tipc_node_lock(n_ptr); 1610 1611 tipc_addr_string_fill(addr_string, n_ptr->addr); 1612 tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string); 1613 tipc_printf(TIPC_OUTPUT, "Supported: %d, ", n_ptr->bclink.supported); 1614 tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked); 1615 tipc_printf(TIPC_OUTPUT, "Last in: %u, ", n_ptr->bclink.last_in); 1616 tipc_printf(TIPC_OUTPUT, "Gap after: %u, ", n_ptr->bclink.gap_after); 1617 tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to); 1618 tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync); 1619 1620 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); 1621 1622 tipc_node_unlock(n_ptr); 1623 1624 l_ptr->stale_count = 0; 1625 } 1626 } 1627 1628 void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf, 1629 u32 retransmits) 1630 { 1631 struct tipc_msg *msg; 1632 1633 if (!buf) 1634 return; 1635 1636 msg = buf_msg(buf); 1637 1638 dbg("Retransmitting %u in link %x\n", retransmits, l_ptr); 1639 1640 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 1641 if (l_ptr->retransm_queue_size == 0) { 1642 msg_dbg(msg, ">NO_RETR->BCONG>"); 1643 dbg_print_link(l_ptr, " "); 1644 l_ptr->retransm_queue_head = msg_seqno(msg); 1645 l_ptr->retransm_queue_size = retransmits; 1646 } else { 1647 err("Unexpected retransmit on link %s (qsize=%d)\n", 1648 l_ptr->name, l_ptr->retransm_queue_size); 1649 } 1650 return; 1651 } else { 1652 /* Detect repeated retransmit failures on uncongested bearer */ 1653 1654 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 1655 if (++l_ptr->stale_count > 100) { 1656 link_retransmit_failure(l_ptr, buf); 1657 return; 1658 } 1659 } else { 1660 l_ptr->last_retransmitted = msg_seqno(msg); 1661 l_ptr->stale_count = 1; 1662 } 1663 } 1664 1665 while (retransmits && (buf != l_ptr->next_out) && buf) { 1666 msg = buf_msg(buf); 1667 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1668 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1669 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1670 msg_dbg(buf_msg(buf), ">RETR>"); 1671 buf = buf->next; 1672 retransmits--; 1673 l_ptr->stats.retransmitted++; 1674 } else { 1675 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1676 l_ptr->stats.bearer_congs++; 1677 l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf)); 1678 l_ptr->retransm_queue_size = retransmits; 1679 return; 1680 } 1681 } 1682 1683 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; 1684 } 1685 1686 /** 1687 * link_insert_deferred_queue - insert deferred messages back into receive chain 1688 */ 1689 1690 static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr, 1691 struct sk_buff *buf) 1692 { 1693 u32 seq_no; 1694 1695 if (l_ptr->oldest_deferred_in == NULL) 1696 return buf; 1697 1698 seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 1699 if (seq_no == mod(l_ptr->next_in_no)) { 1700 l_ptr->newest_deferred_in->next = buf; 1701 buf = l_ptr->oldest_deferred_in; 1702 l_ptr->oldest_deferred_in = NULL; 1703 l_ptr->deferred_inqueue_sz = 0; 1704 } 1705 return buf; 1706 } 1707 1708 /** 1709 * link_recv_buf_validate - validate basic format of received message 1710 * 1711 * This routine ensures a TIPC message has an acceptable header, and at least 1712 * as much data as the header indicates it should. The routine also ensures 1713 * that the entire message header is stored in the main fragment of the message 1714 * buffer, to simplify future access to message header fields. 1715 * 1716 * Note: Having extra info present in the message header or data areas is OK. 1717 * TIPC will ignore the excess, under the assumption that it is optional info 1718 * introduced by a later release of the protocol. 1719 */ 1720 1721 static int link_recv_buf_validate(struct sk_buff *buf) 1722 { 1723 static u32 min_data_hdr_size[8] = { 1724 SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE, 1725 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE 1726 }; 1727 1728 struct tipc_msg *msg; 1729 u32 tipc_hdr[2]; 1730 u32 size; 1731 u32 hdr_size; 1732 u32 min_hdr_size; 1733 1734 if (unlikely(buf->len < MIN_H_SIZE)) 1735 return 0; 1736 1737 msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr); 1738 if (msg == NULL) 1739 return 0; 1740 1741 if (unlikely(msg_version(msg) != TIPC_VERSION)) 1742 return 0; 1743 1744 size = msg_size(msg); 1745 hdr_size = msg_hdr_sz(msg); 1746 min_hdr_size = msg_isdata(msg) ? 1747 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE; 1748 1749 if (unlikely((hdr_size < min_hdr_size) || 1750 (size < hdr_size) || 1751 (buf->len < size) || 1752 (size - hdr_size > TIPC_MAX_USER_MSG_SIZE))) 1753 return 0; 1754 1755 return pskb_may_pull(buf, hdr_size); 1756 } 1757 1758 /** 1759 * tipc_recv_msg - process TIPC messages arriving from off-node 1760 * @head: pointer to message buffer chain 1761 * @tb_ptr: pointer to bearer message arrived on 1762 * 1763 * Invoked with no locks held. Bearer pointer must point to a valid bearer 1764 * structure (i.e. cannot be NULL), but bearer can be inactive. 1765 */ 1766 1767 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr) 1768 { 1769 read_lock_bh(&tipc_net_lock); 1770 while (head) { 1771 struct bearer *b_ptr = (struct bearer *)tb_ptr; 1772 struct tipc_node *n_ptr; 1773 struct link *l_ptr; 1774 struct sk_buff *crs; 1775 struct sk_buff *buf = head; 1776 struct tipc_msg *msg; 1777 u32 seq_no; 1778 u32 ackd; 1779 u32 released = 0; 1780 int type; 1781 1782 head = head->next; 1783 1784 /* Ensure bearer is still enabled */ 1785 1786 if (unlikely(!b_ptr->active)) 1787 goto cont; 1788 1789 /* Ensure message is well-formed */ 1790 1791 if (unlikely(!link_recv_buf_validate(buf))) 1792 goto cont; 1793 1794 /* Ensure message data is a single contiguous unit */ 1795 1796 if (unlikely(buf_linearize(buf))) { 1797 goto cont; 1798 } 1799 1800 /* Handle arrival of a non-unicast link message */ 1801 1802 msg = buf_msg(buf); 1803 1804 if (unlikely(msg_non_seq(msg))) { 1805 if (msg_user(msg) == LINK_CONFIG) 1806 tipc_disc_recv_msg(buf, b_ptr); 1807 else 1808 tipc_bclink_recv_pkt(buf); 1809 continue; 1810 } 1811 1812 if (unlikely(!msg_short(msg) && 1813 (msg_destnode(msg) != tipc_own_addr))) 1814 goto cont; 1815 1816 /* Discard non-routeable messages destined for another node */ 1817 1818 if (unlikely(!msg_isdata(msg) && 1819 (msg_destnode(msg) != tipc_own_addr))) { 1820 if ((msg_user(msg) != CONN_MANAGER) && 1821 (msg_user(msg) != MSG_FRAGMENTER)) 1822 goto cont; 1823 } 1824 1825 /* Locate neighboring node that sent message */ 1826 1827 n_ptr = tipc_node_find(msg_prevnode(msg)); 1828 if (unlikely(!n_ptr)) 1829 goto cont; 1830 tipc_node_lock(n_ptr); 1831 1832 /* Don't talk to neighbor during cleanup after last session */ 1833 1834 if (n_ptr->cleanup_required) { 1835 tipc_node_unlock(n_ptr); 1836 goto cont; 1837 } 1838 1839 /* Locate unicast link endpoint that should handle message */ 1840 1841 l_ptr = n_ptr->links[b_ptr->identity]; 1842 if (unlikely(!l_ptr)) { 1843 tipc_node_unlock(n_ptr); 1844 goto cont; 1845 } 1846 1847 /* Validate message sequence number info */ 1848 1849 seq_no = msg_seqno(msg); 1850 ackd = msg_ack(msg); 1851 1852 /* Release acked messages */ 1853 1854 if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) { 1855 if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported) 1856 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1857 } 1858 1859 crs = l_ptr->first_out; 1860 while ((crs != l_ptr->next_out) && 1861 less_eq(msg_seqno(buf_msg(crs)), ackd)) { 1862 struct sk_buff *next = crs->next; 1863 1864 buf_discard(crs); 1865 crs = next; 1866 released++; 1867 } 1868 if (released) { 1869 l_ptr->first_out = crs; 1870 l_ptr->out_queue_size -= released; 1871 } 1872 1873 /* Try sending any messages link endpoint has pending */ 1874 1875 if (unlikely(l_ptr->next_out)) 1876 tipc_link_push_queue(l_ptr); 1877 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1878 tipc_link_wakeup_ports(l_ptr, 0); 1879 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { 1880 l_ptr->stats.sent_acks++; 1881 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1882 } 1883 1884 /* Now (finally!) process the incoming message */ 1885 1886 protocol_check: 1887 if (likely(link_working_working(l_ptr))) { 1888 if (likely(seq_no == mod(l_ptr->next_in_no))) { 1889 l_ptr->next_in_no++; 1890 if (unlikely(l_ptr->oldest_deferred_in)) 1891 head = link_insert_deferred_queue(l_ptr, 1892 head); 1893 if (likely(msg_is_dest(msg, tipc_own_addr))) { 1894 deliver: 1895 if (likely(msg_isdata(msg))) { 1896 tipc_node_unlock(n_ptr); 1897 tipc_port_recv_msg(buf); 1898 continue; 1899 } 1900 switch (msg_user(msg)) { 1901 case MSG_BUNDLER: 1902 l_ptr->stats.recv_bundles++; 1903 l_ptr->stats.recv_bundled += 1904 msg_msgcnt(msg); 1905 tipc_node_unlock(n_ptr); 1906 tipc_link_recv_bundle(buf); 1907 continue; 1908 case ROUTE_DISTRIBUTOR: 1909 tipc_node_unlock(n_ptr); 1910 tipc_cltr_recv_routing_table(buf); 1911 continue; 1912 case NAME_DISTRIBUTOR: 1913 tipc_node_unlock(n_ptr); 1914 tipc_named_recv(buf); 1915 continue; 1916 case CONN_MANAGER: 1917 tipc_node_unlock(n_ptr); 1918 tipc_port_recv_proto_msg(buf); 1919 continue; 1920 case MSG_FRAGMENTER: 1921 l_ptr->stats.recv_fragments++; 1922 if (tipc_link_recv_fragment(&l_ptr->defragm_buf, 1923 &buf, &msg)) { 1924 l_ptr->stats.recv_fragmented++; 1925 goto deliver; 1926 } 1927 break; 1928 case CHANGEOVER_PROTOCOL: 1929 type = msg_type(msg); 1930 if (link_recv_changeover_msg(&l_ptr, &buf)) { 1931 msg = buf_msg(buf); 1932 seq_no = msg_seqno(msg); 1933 if (type == ORIGINAL_MSG) 1934 goto deliver; 1935 goto protocol_check; 1936 } 1937 break; 1938 } 1939 } 1940 tipc_node_unlock(n_ptr); 1941 tipc_net_route_msg(buf); 1942 continue; 1943 } 1944 link_handle_out_of_seq_msg(l_ptr, buf); 1945 head = link_insert_deferred_queue(l_ptr, head); 1946 tipc_node_unlock(n_ptr); 1947 continue; 1948 } 1949 1950 if (msg_user(msg) == LINK_PROTOCOL) { 1951 link_recv_proto_msg(l_ptr, buf); 1952 head = link_insert_deferred_queue(l_ptr, head); 1953 tipc_node_unlock(n_ptr); 1954 continue; 1955 } 1956 msg_dbg(msg,"NSEQ<REC<"); 1957 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 1958 1959 if (link_working_working(l_ptr)) { 1960 /* Re-insert in front of queue */ 1961 msg_dbg(msg,"RECV-REINS:"); 1962 buf->next = head; 1963 head = buf; 1964 tipc_node_unlock(n_ptr); 1965 continue; 1966 } 1967 tipc_node_unlock(n_ptr); 1968 cont: 1969 buf_discard(buf); 1970 } 1971 read_unlock_bh(&tipc_net_lock); 1972 } 1973 1974 /* 1975 * link_defer_buf(): Sort a received out-of-sequence packet 1976 * into the deferred reception queue. 1977 * Returns the increase of the queue length,i.e. 0 or 1 1978 */ 1979 1980 u32 tipc_link_defer_pkt(struct sk_buff **head, 1981 struct sk_buff **tail, 1982 struct sk_buff *buf) 1983 { 1984 struct sk_buff *prev = NULL; 1985 struct sk_buff *crs = *head; 1986 u32 seq_no = msg_seqno(buf_msg(buf)); 1987 1988 buf->next = NULL; 1989 1990 /* Empty queue ? */ 1991 if (*head == NULL) { 1992 *head = *tail = buf; 1993 return 1; 1994 } 1995 1996 /* Last ? */ 1997 if (less(msg_seqno(buf_msg(*tail)), seq_no)) { 1998 (*tail)->next = buf; 1999 *tail = buf; 2000 return 1; 2001 } 2002 2003 /* Scan through queue and sort it in */ 2004 do { 2005 struct tipc_msg *msg = buf_msg(crs); 2006 2007 if (less(seq_no, msg_seqno(msg))) { 2008 buf->next = crs; 2009 if (prev) 2010 prev->next = buf; 2011 else 2012 *head = buf; 2013 return 1; 2014 } 2015 if (seq_no == msg_seqno(msg)) { 2016 break; 2017 } 2018 prev = crs; 2019 crs = crs->next; 2020 } 2021 while (crs); 2022 2023 /* Message is a duplicate of an existing message */ 2024 2025 buf_discard(buf); 2026 return 0; 2027 } 2028 2029 /** 2030 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet 2031 */ 2032 2033 static void link_handle_out_of_seq_msg(struct link *l_ptr, 2034 struct sk_buff *buf) 2035 { 2036 u32 seq_no = msg_seqno(buf_msg(buf)); 2037 2038 if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { 2039 link_recv_proto_msg(l_ptr, buf); 2040 return; 2041 } 2042 2043 dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n", 2044 seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no); 2045 2046 /* Record OOS packet arrival (force mismatch on next timeout) */ 2047 2048 l_ptr->checkpoint--; 2049 2050 /* 2051 * Discard packet if a duplicate; otherwise add it to deferred queue 2052 * and notify peer of gap as per protocol specification 2053 */ 2054 2055 if (less(seq_no, mod(l_ptr->next_in_no))) { 2056 l_ptr->stats.duplicates++; 2057 buf_discard(buf); 2058 return; 2059 } 2060 2061 if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, 2062 &l_ptr->newest_deferred_in, buf)) { 2063 l_ptr->deferred_inqueue_sz++; 2064 l_ptr->stats.deferred_recv++; 2065 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 2066 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 2067 } else 2068 l_ptr->stats.duplicates++; 2069 } 2070 2071 /* 2072 * Send protocol message to the other endpoint. 2073 */ 2074 void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg, 2075 u32 gap, u32 tolerance, u32 priority, u32 ack_mtu) 2076 { 2077 struct sk_buff *buf = NULL; 2078 struct tipc_msg *msg = l_ptr->pmsg; 2079 u32 msg_size = sizeof(l_ptr->proto_msg); 2080 2081 if (link_blocked(l_ptr)) 2082 return; 2083 msg_set_type(msg, msg_typ); 2084 msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); 2085 msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); 2086 msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); 2087 2088 if (msg_typ == STATE_MSG) { 2089 u32 next_sent = mod(l_ptr->next_out_no); 2090 2091 if (!tipc_link_is_up(l_ptr)) 2092 return; 2093 if (l_ptr->next_out) 2094 next_sent = msg_seqno(buf_msg(l_ptr->next_out)); 2095 msg_set_next_sent(msg, next_sent); 2096 if (l_ptr->oldest_deferred_in) { 2097 u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 2098 gap = mod(rec - mod(l_ptr->next_in_no)); 2099 } 2100 msg_set_seq_gap(msg, gap); 2101 if (gap) 2102 l_ptr->stats.sent_nacks++; 2103 msg_set_link_tolerance(msg, tolerance); 2104 msg_set_linkprio(msg, priority); 2105 msg_set_max_pkt(msg, ack_mtu); 2106 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 2107 msg_set_probe(msg, probe_msg != 0); 2108 if (probe_msg) { 2109 u32 mtu = l_ptr->max_pkt; 2110 2111 if ((mtu < l_ptr->max_pkt_target) && 2112 link_working_working(l_ptr) && 2113 l_ptr->fsm_msg_cnt) { 2114 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; 2115 if (l_ptr->max_pkt_probes == 10) { 2116 l_ptr->max_pkt_target = (msg_size - 4); 2117 l_ptr->max_pkt_probes = 0; 2118 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; 2119 } 2120 l_ptr->max_pkt_probes++; 2121 } 2122 2123 l_ptr->stats.sent_probes++; 2124 } 2125 l_ptr->stats.sent_states++; 2126 } else { /* RESET_MSG or ACTIVATE_MSG */ 2127 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1)); 2128 msg_set_seq_gap(msg, 0); 2129 msg_set_next_sent(msg, 1); 2130 msg_set_link_tolerance(msg, l_ptr->tolerance); 2131 msg_set_linkprio(msg, l_ptr->priority); 2132 msg_set_max_pkt(msg, l_ptr->max_pkt_target); 2133 } 2134 2135 if (tipc_node_has_redundant_links(l_ptr->owner)) { 2136 msg_set_redundant_link(msg); 2137 } else { 2138 msg_clear_redundant_link(msg); 2139 } 2140 msg_set_linkprio(msg, l_ptr->priority); 2141 2142 /* Ensure sequence number will not fit : */ 2143 2144 msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2))); 2145 2146 /* Congestion? */ 2147 2148 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 2149 if (!l_ptr->proto_msg_queue) { 2150 l_ptr->proto_msg_queue = 2151 tipc_buf_acquire(sizeof(l_ptr->proto_msg)); 2152 } 2153 buf = l_ptr->proto_msg_queue; 2154 if (!buf) 2155 return; 2156 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 2157 return; 2158 } 2159 msg_set_timestamp(msg, jiffies_to_msecs(jiffies)); 2160 2161 /* Message can be sent */ 2162 2163 msg_dbg(msg, ">>"); 2164 2165 buf = tipc_buf_acquire(msg_size); 2166 if (!buf) 2167 return; 2168 2169 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 2170 msg_set_size(buf_msg(buf), msg_size); 2171 2172 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 2173 l_ptr->unacked_window = 0; 2174 buf_discard(buf); 2175 return; 2176 } 2177 2178 /* New congestion */ 2179 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 2180 l_ptr->proto_msg_queue = buf; 2181 l_ptr->stats.bearer_congs++; 2182 } 2183 2184 /* 2185 * Receive protocol message : 2186 * Note that network plane id propagates through the network, and may 2187 * change at any time. The node with lowest address rules 2188 */ 2189 2190 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf) 2191 { 2192 u32 rec_gap = 0; 2193 u32 max_pkt_info; 2194 u32 max_pkt_ack; 2195 u32 msg_tol; 2196 struct tipc_msg *msg = buf_msg(buf); 2197 2198 dbg("AT(%u):", jiffies_to_msecs(jiffies)); 2199 msg_dbg(msg, "<<"); 2200 if (link_blocked(l_ptr)) 2201 goto exit; 2202 2203 /* record unnumbered packet arrival (force mismatch on next timeout) */ 2204 2205 l_ptr->checkpoint--; 2206 2207 if (l_ptr->b_ptr->net_plane != msg_net_plane(msg)) 2208 if (tipc_own_addr > msg_prevnode(msg)) 2209 l_ptr->b_ptr->net_plane = msg_net_plane(msg); 2210 2211 l_ptr->owner->permit_changeover = msg_redundant_link(msg); 2212 2213 switch (msg_type(msg)) { 2214 2215 case RESET_MSG: 2216 if (!link_working_unknown(l_ptr) && 2217 (l_ptr->peer_session != INVALID_SESSION)) { 2218 if (msg_session(msg) == l_ptr->peer_session) { 2219 dbg("Duplicate RESET: %u<->%u\n", 2220 msg_session(msg), l_ptr->peer_session); 2221 break; /* duplicate: ignore */ 2222 } 2223 } 2224 /* fall thru' */ 2225 case ACTIVATE_MSG: 2226 /* Update link settings according other endpoint's values */ 2227 2228 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg)); 2229 2230 if ((msg_tol = msg_link_tolerance(msg)) && 2231 (msg_tol > l_ptr->tolerance)) 2232 link_set_supervision_props(l_ptr, msg_tol); 2233 2234 if (msg_linkprio(msg) > l_ptr->priority) 2235 l_ptr->priority = msg_linkprio(msg); 2236 2237 max_pkt_info = msg_max_pkt(msg); 2238 if (max_pkt_info) { 2239 if (max_pkt_info < l_ptr->max_pkt_target) 2240 l_ptr->max_pkt_target = max_pkt_info; 2241 if (l_ptr->max_pkt > l_ptr->max_pkt_target) 2242 l_ptr->max_pkt = l_ptr->max_pkt_target; 2243 } else { 2244 l_ptr->max_pkt = l_ptr->max_pkt_target; 2245 } 2246 l_ptr->owner->bclink.supported = (max_pkt_info != 0); 2247 2248 link_state_event(l_ptr, msg_type(msg)); 2249 2250 l_ptr->peer_session = msg_session(msg); 2251 l_ptr->peer_bearer_id = msg_bearer_id(msg); 2252 2253 /* Synchronize broadcast sequence numbers */ 2254 if (!tipc_node_has_redundant_links(l_ptr->owner)) { 2255 l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg)); 2256 } 2257 break; 2258 case STATE_MSG: 2259 2260 if ((msg_tol = msg_link_tolerance(msg))) 2261 link_set_supervision_props(l_ptr, msg_tol); 2262 2263 if (msg_linkprio(msg) && 2264 (msg_linkprio(msg) != l_ptr->priority)) { 2265 warn("Resetting link <%s>, priority change %u->%u\n", 2266 l_ptr->name, l_ptr->priority, msg_linkprio(msg)); 2267 l_ptr->priority = msg_linkprio(msg); 2268 tipc_link_reset(l_ptr); /* Enforce change to take effect */ 2269 break; 2270 } 2271 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 2272 l_ptr->stats.recv_states++; 2273 if (link_reset_unknown(l_ptr)) 2274 break; 2275 2276 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) { 2277 rec_gap = mod(msg_next_sent(msg) - 2278 mod(l_ptr->next_in_no)); 2279 } 2280 2281 max_pkt_ack = msg_max_pkt(msg); 2282 if (max_pkt_ack > l_ptr->max_pkt) { 2283 dbg("Link <%s> updated MTU %u -> %u\n", 2284 l_ptr->name, l_ptr->max_pkt, max_pkt_ack); 2285 l_ptr->max_pkt = max_pkt_ack; 2286 l_ptr->max_pkt_probes = 0; 2287 } 2288 2289 max_pkt_ack = 0; 2290 if (msg_probe(msg)) { 2291 l_ptr->stats.recv_probes++; 2292 if (msg_size(msg) > sizeof(l_ptr->proto_msg)) { 2293 max_pkt_ack = msg_size(msg); 2294 } 2295 } 2296 2297 /* Protocol message before retransmits, reduce loss risk */ 2298 2299 tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg)); 2300 2301 if (rec_gap || (msg_probe(msg))) { 2302 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2303 0, rec_gap, 0, 0, max_pkt_ack); 2304 } 2305 if (msg_seq_gap(msg)) { 2306 msg_dbg(msg, "With Gap:"); 2307 l_ptr->stats.recv_nacks++; 2308 tipc_link_retransmit(l_ptr, l_ptr->first_out, 2309 msg_seq_gap(msg)); 2310 } 2311 break; 2312 default: 2313 msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<"); 2314 } 2315 exit: 2316 buf_discard(buf); 2317 } 2318 2319 2320 /* 2321 * tipc_link_tunnel(): Send one message via a link belonging to 2322 * another bearer. Owner node is locked. 2323 */ 2324 static void tipc_link_tunnel(struct link *l_ptr, 2325 struct tipc_msg *tunnel_hdr, 2326 struct tipc_msg *msg, 2327 u32 selector) 2328 { 2329 struct link *tunnel; 2330 struct sk_buff *buf; 2331 u32 length = msg_size(msg); 2332 2333 tunnel = l_ptr->owner->active_links[selector & 1]; 2334 if (!tipc_link_is_up(tunnel)) { 2335 warn("Link changeover error, " 2336 "tunnel link no longer available\n"); 2337 return; 2338 } 2339 msg_set_size(tunnel_hdr, length + INT_H_SIZE); 2340 buf = tipc_buf_acquire(length + INT_H_SIZE); 2341 if (!buf) { 2342 warn("Link changeover error, " 2343 "unable to send tunnel msg\n"); 2344 return; 2345 } 2346 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 2347 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 2348 dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane); 2349 msg_dbg(buf_msg(buf), ">SEND>"); 2350 tipc_link_send_buf(tunnel, buf); 2351 } 2352 2353 2354 2355 /* 2356 * changeover(): Send whole message queue via the remaining link 2357 * Owner node is locked. 2358 */ 2359 2360 void tipc_link_changeover(struct link *l_ptr) 2361 { 2362 u32 msgcount = l_ptr->out_queue_size; 2363 struct sk_buff *crs = l_ptr->first_out; 2364 struct link *tunnel = l_ptr->owner->active_links[0]; 2365 struct tipc_msg tunnel_hdr; 2366 int split_bundles; 2367 2368 if (!tunnel) 2369 return; 2370 2371 if (!l_ptr->owner->permit_changeover) { 2372 warn("Link changeover error, " 2373 "peer did not permit changeover\n"); 2374 return; 2375 } 2376 2377 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 2378 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 2379 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 2380 msg_set_msgcnt(&tunnel_hdr, msgcount); 2381 dbg("Link changeover requires %u tunnel messages\n", msgcount); 2382 2383 if (!l_ptr->first_out) { 2384 struct sk_buff *buf; 2385 2386 buf = tipc_buf_acquire(INT_H_SIZE); 2387 if (buf) { 2388 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); 2389 msg_set_size(&tunnel_hdr, INT_H_SIZE); 2390 dbg("%c->%c:", l_ptr->b_ptr->net_plane, 2391 tunnel->b_ptr->net_plane); 2392 msg_dbg(&tunnel_hdr, "EMPTY>SEND>"); 2393 tipc_link_send_buf(tunnel, buf); 2394 } else { 2395 warn("Link changeover error, " 2396 "unable to send changeover msg\n"); 2397 } 2398 return; 2399 } 2400 2401 split_bundles = (l_ptr->owner->active_links[0] != 2402 l_ptr->owner->active_links[1]); 2403 2404 while (crs) { 2405 struct tipc_msg *msg = buf_msg(crs); 2406 2407 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 2408 struct tipc_msg *m = msg_get_wrapped(msg); 2409 unchar* pos = (unchar*)m; 2410 2411 msgcount = msg_msgcnt(msg); 2412 while (msgcount--) { 2413 msg_set_seqno(m,msg_seqno(msg)); 2414 tipc_link_tunnel(l_ptr, &tunnel_hdr, m, 2415 msg_link_selector(m)); 2416 pos += align(msg_size(m)); 2417 m = (struct tipc_msg *)pos; 2418 } 2419 } else { 2420 tipc_link_tunnel(l_ptr, &tunnel_hdr, msg, 2421 msg_link_selector(msg)); 2422 } 2423 crs = crs->next; 2424 } 2425 } 2426 2427 void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel) 2428 { 2429 struct sk_buff *iter; 2430 struct tipc_msg tunnel_hdr; 2431 2432 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 2433 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 2434 msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); 2435 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 2436 iter = l_ptr->first_out; 2437 while (iter) { 2438 struct sk_buff *outbuf; 2439 struct tipc_msg *msg = buf_msg(iter); 2440 u32 length = msg_size(msg); 2441 2442 if (msg_user(msg) == MSG_BUNDLER) 2443 msg_set_type(msg, CLOSED_MSG); 2444 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 2445 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 2446 msg_set_size(&tunnel_hdr, length + INT_H_SIZE); 2447 outbuf = tipc_buf_acquire(length + INT_H_SIZE); 2448 if (outbuf == NULL) { 2449 warn("Link changeover error, " 2450 "unable to send duplicate msg\n"); 2451 return; 2452 } 2453 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 2454 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 2455 length); 2456 dbg("%c->%c:", l_ptr->b_ptr->net_plane, 2457 tunnel->b_ptr->net_plane); 2458 msg_dbg(buf_msg(outbuf), ">SEND>"); 2459 tipc_link_send_buf(tunnel, outbuf); 2460 if (!tipc_link_is_up(l_ptr)) 2461 return; 2462 iter = iter->next; 2463 } 2464 } 2465 2466 2467 2468 /** 2469 * buf_extract - extracts embedded TIPC message from another message 2470 * @skb: encapsulating message buffer 2471 * @from_pos: offset to extract from 2472 * 2473 * Returns a new message buffer containing an embedded message. The 2474 * encapsulating message itself is left unchanged. 2475 */ 2476 2477 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) 2478 { 2479 struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos); 2480 u32 size = msg_size(msg); 2481 struct sk_buff *eb; 2482 2483 eb = tipc_buf_acquire(size); 2484 if (eb) 2485 skb_copy_to_linear_data(eb, msg, size); 2486 return eb; 2487 } 2488 2489 /* 2490 * link_recv_changeover_msg(): Receive tunneled packet sent 2491 * via other link. Node is locked. Return extracted buffer. 2492 */ 2493 2494 static int link_recv_changeover_msg(struct link **l_ptr, 2495 struct sk_buff **buf) 2496 { 2497 struct sk_buff *tunnel_buf = *buf; 2498 struct link *dest_link; 2499 struct tipc_msg *msg; 2500 struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf); 2501 u32 msg_typ = msg_type(tunnel_msg); 2502 u32 msg_count = msg_msgcnt(tunnel_msg); 2503 2504 dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)]; 2505 if (!dest_link) { 2506 msg_dbg(tunnel_msg, "NOLINK/<REC<"); 2507 goto exit; 2508 } 2509 if (dest_link == *l_ptr) { 2510 err("Unexpected changeover message on link <%s>\n", 2511 (*l_ptr)->name); 2512 goto exit; 2513 } 2514 dbg("%c<-%c:", dest_link->b_ptr->net_plane, 2515 (*l_ptr)->b_ptr->net_plane); 2516 *l_ptr = dest_link; 2517 msg = msg_get_wrapped(tunnel_msg); 2518 2519 if (msg_typ == DUPLICATE_MSG) { 2520 if (less(msg_seqno(msg), mod(dest_link->next_in_no))) { 2521 msg_dbg(tunnel_msg, "DROP/<REC<"); 2522 goto exit; 2523 } 2524 *buf = buf_extract(tunnel_buf,INT_H_SIZE); 2525 if (*buf == NULL) { 2526 warn("Link changeover error, duplicate msg dropped\n"); 2527 goto exit; 2528 } 2529 msg_dbg(tunnel_msg, "TNL<REC<"); 2530 buf_discard(tunnel_buf); 2531 return 1; 2532 } 2533 2534 /* First original message ?: */ 2535 2536 if (tipc_link_is_up(dest_link)) { 2537 msg_dbg(tunnel_msg, "UP/FIRST/<REC<"); 2538 info("Resetting link <%s>, changeover initiated by peer\n", 2539 dest_link->name); 2540 tipc_link_reset(dest_link); 2541 dest_link->exp_msg_count = msg_count; 2542 dbg("Expecting %u tunnelled messages\n", msg_count); 2543 if (!msg_count) 2544 goto exit; 2545 } else if (dest_link->exp_msg_count == START_CHANGEOVER) { 2546 msg_dbg(tunnel_msg, "BLK/FIRST/<REC<"); 2547 dest_link->exp_msg_count = msg_count; 2548 dbg("Expecting %u tunnelled messages\n", msg_count); 2549 if (!msg_count) 2550 goto exit; 2551 } 2552 2553 /* Receive original message */ 2554 2555 if (dest_link->exp_msg_count == 0) { 2556 warn("Link switchover error, " 2557 "got too many tunnelled messages\n"); 2558 msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<"); 2559 dbg_print_link(dest_link, "LINK:"); 2560 goto exit; 2561 } 2562 dest_link->exp_msg_count--; 2563 if (less(msg_seqno(msg), dest_link->reset_checkpoint)) { 2564 msg_dbg(tunnel_msg, "DROP/DUPL/<REC<"); 2565 goto exit; 2566 } else { 2567 *buf = buf_extract(tunnel_buf, INT_H_SIZE); 2568 if (*buf != NULL) { 2569 msg_dbg(tunnel_msg, "TNL<REC<"); 2570 buf_discard(tunnel_buf); 2571 return 1; 2572 } else { 2573 warn("Link changeover error, original msg dropped\n"); 2574 } 2575 } 2576 exit: 2577 *buf = NULL; 2578 buf_discard(tunnel_buf); 2579 return 0; 2580 } 2581 2582 /* 2583 * Bundler functionality: 2584 */ 2585 void tipc_link_recv_bundle(struct sk_buff *buf) 2586 { 2587 u32 msgcount = msg_msgcnt(buf_msg(buf)); 2588 u32 pos = INT_H_SIZE; 2589 struct sk_buff *obuf; 2590 2591 msg_dbg(buf_msg(buf), "<BNDL<: "); 2592 while (msgcount--) { 2593 obuf = buf_extract(buf, pos); 2594 if (obuf == NULL) { 2595 warn("Link unable to unbundle message(s)\n"); 2596 break; 2597 } 2598 pos += align(msg_size(buf_msg(obuf))); 2599 msg_dbg(buf_msg(obuf), " /"); 2600 tipc_net_route_msg(obuf); 2601 } 2602 buf_discard(buf); 2603 } 2604 2605 /* 2606 * Fragmentation/defragmentation: 2607 */ 2608 2609 2610 /* 2611 * link_send_long_buf: Entry for buffers needing fragmentation. 2612 * The buffer is complete, inclusive total message length. 2613 * Returns user data length. 2614 */ 2615 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf) 2616 { 2617 struct tipc_msg *inmsg = buf_msg(buf); 2618 struct tipc_msg fragm_hdr; 2619 u32 insize = msg_size(inmsg); 2620 u32 dsz = msg_data_sz(inmsg); 2621 unchar *crs = buf->data; 2622 u32 rest = insize; 2623 u32 pack_sz = l_ptr->max_pkt; 2624 u32 fragm_sz = pack_sz - INT_H_SIZE; 2625 u32 fragm_no = 1; 2626 u32 destaddr; 2627 2628 if (msg_short(inmsg)) 2629 destaddr = l_ptr->addr; 2630 else 2631 destaddr = msg_destnode(inmsg); 2632 2633 if (msg_routed(inmsg)) 2634 msg_set_prevnode(inmsg, tipc_own_addr); 2635 2636 /* Prepare reusable fragment header: */ 2637 2638 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 2639 INT_H_SIZE, destaddr); 2640 msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg)); 2641 msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++)); 2642 msg_set_fragm_no(&fragm_hdr, fragm_no); 2643 l_ptr->stats.sent_fragmented++; 2644 2645 /* Chop up message: */ 2646 2647 while (rest > 0) { 2648 struct sk_buff *fragm; 2649 2650 if (rest <= fragm_sz) { 2651 fragm_sz = rest; 2652 msg_set_type(&fragm_hdr, LAST_FRAGMENT); 2653 } 2654 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); 2655 if (fragm == NULL) { 2656 warn("Link unable to fragment message\n"); 2657 dsz = -ENOMEM; 2658 goto exit; 2659 } 2660 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 2661 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); 2662 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, 2663 fragm_sz); 2664 /* Send queued messages first, if any: */ 2665 2666 l_ptr->stats.sent_fragments++; 2667 tipc_link_send_buf(l_ptr, fragm); 2668 if (!tipc_link_is_up(l_ptr)) 2669 return dsz; 2670 msg_set_fragm_no(&fragm_hdr, ++fragm_no); 2671 rest -= fragm_sz; 2672 crs += fragm_sz; 2673 msg_set_type(&fragm_hdr, FRAGMENT); 2674 } 2675 exit: 2676 buf_discard(buf); 2677 return dsz; 2678 } 2679 2680 /* 2681 * A pending message being re-assembled must store certain values 2682 * to handle subsequent fragments correctly. The following functions 2683 * help storing these values in unused, available fields in the 2684 * pending message. This makes dynamic memory allocation unecessary. 2685 */ 2686 2687 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno) 2688 { 2689 msg_set_seqno(buf_msg(buf), seqno); 2690 } 2691 2692 static u32 get_fragm_size(struct sk_buff *buf) 2693 { 2694 return msg_ack(buf_msg(buf)); 2695 } 2696 2697 static void set_fragm_size(struct sk_buff *buf, u32 sz) 2698 { 2699 msg_set_ack(buf_msg(buf), sz); 2700 } 2701 2702 static u32 get_expected_frags(struct sk_buff *buf) 2703 { 2704 return msg_bcast_ack(buf_msg(buf)); 2705 } 2706 2707 static void set_expected_frags(struct sk_buff *buf, u32 exp) 2708 { 2709 msg_set_bcast_ack(buf_msg(buf), exp); 2710 } 2711 2712 static u32 get_timer_cnt(struct sk_buff *buf) 2713 { 2714 return msg_reroute_cnt(buf_msg(buf)); 2715 } 2716 2717 static void incr_timer_cnt(struct sk_buff *buf) 2718 { 2719 msg_incr_reroute_cnt(buf_msg(buf)); 2720 } 2721 2722 /* 2723 * tipc_link_recv_fragment(): Called with node lock on. Returns 2724 * the reassembled buffer if message is complete. 2725 */ 2726 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, 2727 struct tipc_msg **m) 2728 { 2729 struct sk_buff *prev = NULL; 2730 struct sk_buff *fbuf = *fb; 2731 struct tipc_msg *fragm = buf_msg(fbuf); 2732 struct sk_buff *pbuf = *pending; 2733 u32 long_msg_seq_no = msg_long_msgno(fragm); 2734 2735 *fb = NULL; 2736 msg_dbg(fragm,"FRG<REC<"); 2737 2738 /* Is there an incomplete message waiting for this fragment? */ 2739 2740 while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) || 2741 (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) { 2742 prev = pbuf; 2743 pbuf = pbuf->next; 2744 } 2745 2746 if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) { 2747 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm); 2748 u32 msg_sz = msg_size(imsg); 2749 u32 fragm_sz = msg_data_sz(fragm); 2750 u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz); 2751 u32 max = TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE; 2752 if (msg_type(imsg) == TIPC_MCAST_MSG) 2753 max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE; 2754 if (msg_size(imsg) > max) { 2755 msg_dbg(fragm,"<REC<Oversized: "); 2756 buf_discard(fbuf); 2757 return 0; 2758 } 2759 pbuf = tipc_buf_acquire(msg_size(imsg)); 2760 if (pbuf != NULL) { 2761 pbuf->next = *pending; 2762 *pending = pbuf; 2763 skb_copy_to_linear_data(pbuf, imsg, 2764 msg_data_sz(fragm)); 2765 /* Prepare buffer for subsequent fragments. */ 2766 2767 set_long_msg_seqno(pbuf, long_msg_seq_no); 2768 set_fragm_size(pbuf,fragm_sz); 2769 set_expected_frags(pbuf,exp_fragm_cnt - 1); 2770 } else { 2771 warn("Link unable to reassemble fragmented message\n"); 2772 } 2773 buf_discard(fbuf); 2774 return 0; 2775 } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) { 2776 u32 dsz = msg_data_sz(fragm); 2777 u32 fsz = get_fragm_size(pbuf); 2778 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz); 2779 u32 exp_frags = get_expected_frags(pbuf) - 1; 2780 skb_copy_to_linear_data_offset(pbuf, crs, 2781 msg_data(fragm), dsz); 2782 buf_discard(fbuf); 2783 2784 /* Is message complete? */ 2785 2786 if (exp_frags == 0) { 2787 if (prev) 2788 prev->next = pbuf->next; 2789 else 2790 *pending = pbuf->next; 2791 msg_reset_reroute_cnt(buf_msg(pbuf)); 2792 *fb = pbuf; 2793 *m = buf_msg(pbuf); 2794 return 1; 2795 } 2796 set_expected_frags(pbuf,exp_frags); 2797 return 0; 2798 } 2799 dbg(" Discarding orphan fragment %x\n",fbuf); 2800 msg_dbg(fragm,"ORPHAN:"); 2801 dbg("Pending long buffers:\n"); 2802 dbg_print_buf_chain(*pending); 2803 buf_discard(fbuf); 2804 return 0; 2805 } 2806 2807 /** 2808 * link_check_defragm_bufs - flush stale incoming message fragments 2809 * @l_ptr: pointer to link 2810 */ 2811 2812 static void link_check_defragm_bufs(struct link *l_ptr) 2813 { 2814 struct sk_buff *prev = NULL; 2815 struct sk_buff *next = NULL; 2816 struct sk_buff *buf = l_ptr->defragm_buf; 2817 2818 if (!buf) 2819 return; 2820 if (!link_working_working(l_ptr)) 2821 return; 2822 while (buf) { 2823 u32 cnt = get_timer_cnt(buf); 2824 2825 next = buf->next; 2826 if (cnt < 4) { 2827 incr_timer_cnt(buf); 2828 prev = buf; 2829 } else { 2830 dbg(" Discarding incomplete long buffer\n"); 2831 msg_dbg(buf_msg(buf), "LONG:"); 2832 dbg_print_link(l_ptr, "curr:"); 2833 dbg("Pending long buffers:\n"); 2834 dbg_print_buf_chain(l_ptr->defragm_buf); 2835 if (prev) 2836 prev->next = buf->next; 2837 else 2838 l_ptr->defragm_buf = buf->next; 2839 buf_discard(buf); 2840 } 2841 buf = next; 2842 } 2843 } 2844 2845 2846 2847 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance) 2848 { 2849 l_ptr->tolerance = tolerance; 2850 l_ptr->continuity_interval = 2851 ((tolerance / 4) > 500) ? 500 : tolerance / 4; 2852 l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4); 2853 } 2854 2855 2856 void tipc_link_set_queue_limits(struct link *l_ptr, u32 window) 2857 { 2858 /* Data messages from this node, inclusive FIRST_FRAGM */ 2859 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window; 2860 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4; 2861 l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5; 2862 l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6; 2863 /* Transiting data messages,inclusive FIRST_FRAGM */ 2864 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300; 2865 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600; 2866 l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900; 2867 l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200; 2868 l_ptr->queue_limit[CONN_MANAGER] = 1200; 2869 l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200; 2870 l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500; 2871 l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000; 2872 /* FRAGMENT and LAST_FRAGMENT packets */ 2873 l_ptr->queue_limit[MSG_FRAGMENTER] = 4000; 2874 } 2875 2876 /** 2877 * link_find_link - locate link by name 2878 * @name - ptr to link name string 2879 * @node - ptr to area to be filled with ptr to associated node 2880 * 2881 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted; 2882 * this also prevents link deletion. 2883 * 2884 * Returns pointer to link (or 0 if invalid link name). 2885 */ 2886 2887 static struct link *link_find_link(const char *name, struct tipc_node **node) 2888 { 2889 struct link_name link_name_parts; 2890 struct bearer *b_ptr; 2891 struct link *l_ptr; 2892 2893 if (!link_name_validate(name, &link_name_parts)) 2894 return NULL; 2895 2896 b_ptr = tipc_bearer_find_interface(link_name_parts.if_local); 2897 if (!b_ptr) 2898 return NULL; 2899 2900 *node = tipc_node_find(link_name_parts.addr_peer); 2901 if (!*node) 2902 return NULL; 2903 2904 l_ptr = (*node)->links[b_ptr->identity]; 2905 if (!l_ptr || strcmp(l_ptr->name, name)) 2906 return NULL; 2907 2908 return l_ptr; 2909 } 2910 2911 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space, 2912 u16 cmd) 2913 { 2914 struct tipc_link_config *args; 2915 u32 new_value; 2916 struct link *l_ptr; 2917 struct tipc_node *node; 2918 int res; 2919 2920 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG)) 2921 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2922 2923 args = (struct tipc_link_config *)TLV_DATA(req_tlv_area); 2924 new_value = ntohl(args->value); 2925 2926 if (!strcmp(args->name, tipc_bclink_name)) { 2927 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) && 2928 (tipc_bclink_set_queue_limits(new_value) == 0)) 2929 return tipc_cfg_reply_none(); 2930 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED 2931 " (cannot change setting on broadcast link)"); 2932 } 2933 2934 read_lock_bh(&tipc_net_lock); 2935 l_ptr = link_find_link(args->name, &node); 2936 if (!l_ptr) { 2937 read_unlock_bh(&tipc_net_lock); 2938 return tipc_cfg_reply_error_string("link not found"); 2939 } 2940 2941 tipc_node_lock(node); 2942 res = -EINVAL; 2943 switch (cmd) { 2944 case TIPC_CMD_SET_LINK_TOL: 2945 if ((new_value >= TIPC_MIN_LINK_TOL) && 2946 (new_value <= TIPC_MAX_LINK_TOL)) { 2947 link_set_supervision_props(l_ptr, new_value); 2948 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2949 0, 0, new_value, 0, 0); 2950 res = 0; 2951 } 2952 break; 2953 case TIPC_CMD_SET_LINK_PRI: 2954 if ((new_value >= TIPC_MIN_LINK_PRI) && 2955 (new_value <= TIPC_MAX_LINK_PRI)) { 2956 l_ptr->priority = new_value; 2957 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2958 0, 0, 0, new_value, 0); 2959 res = 0; 2960 } 2961 break; 2962 case TIPC_CMD_SET_LINK_WINDOW: 2963 if ((new_value >= TIPC_MIN_LINK_WIN) && 2964 (new_value <= TIPC_MAX_LINK_WIN)) { 2965 tipc_link_set_queue_limits(l_ptr, new_value); 2966 res = 0; 2967 } 2968 break; 2969 } 2970 tipc_node_unlock(node); 2971 2972 read_unlock_bh(&tipc_net_lock); 2973 if (res) 2974 return tipc_cfg_reply_error_string("cannot change link setting"); 2975 2976 return tipc_cfg_reply_none(); 2977 } 2978 2979 /** 2980 * link_reset_statistics - reset link statistics 2981 * @l_ptr: pointer to link 2982 */ 2983 2984 static void link_reset_statistics(struct link *l_ptr) 2985 { 2986 memset(&l_ptr->stats, 0, sizeof(l_ptr->stats)); 2987 l_ptr->stats.sent_info = l_ptr->next_out_no; 2988 l_ptr->stats.recv_info = l_ptr->next_in_no; 2989 } 2990 2991 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space) 2992 { 2993 char *link_name; 2994 struct link *l_ptr; 2995 struct tipc_node *node; 2996 2997 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 2998 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2999 3000 link_name = (char *)TLV_DATA(req_tlv_area); 3001 if (!strcmp(link_name, tipc_bclink_name)) { 3002 if (tipc_bclink_reset_stats()) 3003 return tipc_cfg_reply_error_string("link not found"); 3004 return tipc_cfg_reply_none(); 3005 } 3006 3007 read_lock_bh(&tipc_net_lock); 3008 l_ptr = link_find_link(link_name, &node); 3009 if (!l_ptr) { 3010 read_unlock_bh(&tipc_net_lock); 3011 return tipc_cfg_reply_error_string("link not found"); 3012 } 3013 3014 tipc_node_lock(node); 3015 link_reset_statistics(l_ptr); 3016 tipc_node_unlock(node); 3017 read_unlock_bh(&tipc_net_lock); 3018 return tipc_cfg_reply_none(); 3019 } 3020 3021 /** 3022 * percent - convert count to a percentage of total (rounding up or down) 3023 */ 3024 3025 static u32 percent(u32 count, u32 total) 3026 { 3027 return (count * 100 + (total / 2)) / total; 3028 } 3029 3030 /** 3031 * tipc_link_stats - print link statistics 3032 * @name: link name 3033 * @buf: print buffer area 3034 * @buf_size: size of print buffer area 3035 * 3036 * Returns length of print buffer data string (or 0 if error) 3037 */ 3038 3039 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size) 3040 { 3041 struct print_buf pb; 3042 struct link *l_ptr; 3043 struct tipc_node *node; 3044 char *status; 3045 u32 profile_total = 0; 3046 3047 if (!strcmp(name, tipc_bclink_name)) 3048 return tipc_bclink_stats(buf, buf_size); 3049 3050 tipc_printbuf_init(&pb, buf, buf_size); 3051 3052 read_lock_bh(&tipc_net_lock); 3053 l_ptr = link_find_link(name, &node); 3054 if (!l_ptr) { 3055 read_unlock_bh(&tipc_net_lock); 3056 return 0; 3057 } 3058 tipc_node_lock(node); 3059 3060 if (tipc_link_is_active(l_ptr)) 3061 status = "ACTIVE"; 3062 else if (tipc_link_is_up(l_ptr)) 3063 status = "STANDBY"; 3064 else 3065 status = "DEFUNCT"; 3066 tipc_printf(&pb, "Link <%s>\n" 3067 " %s MTU:%u Priority:%u Tolerance:%u ms" 3068 " Window:%u packets\n", 3069 l_ptr->name, status, l_ptr->max_pkt, 3070 l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]); 3071 tipc_printf(&pb, " RX packets:%u fragments:%u/%u bundles:%u/%u\n", 3072 l_ptr->next_in_no - l_ptr->stats.recv_info, 3073 l_ptr->stats.recv_fragments, 3074 l_ptr->stats.recv_fragmented, 3075 l_ptr->stats.recv_bundles, 3076 l_ptr->stats.recv_bundled); 3077 tipc_printf(&pb, " TX packets:%u fragments:%u/%u bundles:%u/%u\n", 3078 l_ptr->next_out_no - l_ptr->stats.sent_info, 3079 l_ptr->stats.sent_fragments, 3080 l_ptr->stats.sent_fragmented, 3081 l_ptr->stats.sent_bundles, 3082 l_ptr->stats.sent_bundled); 3083 profile_total = l_ptr->stats.msg_length_counts; 3084 if (!profile_total) 3085 profile_total = 1; 3086 tipc_printf(&pb, " TX profile sample:%u packets average:%u octets\n" 3087 " 0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% " 3088 "-16354:%u%% -32768:%u%% -66000:%u%%\n", 3089 l_ptr->stats.msg_length_counts, 3090 l_ptr->stats.msg_lengths_total / profile_total, 3091 percent(l_ptr->stats.msg_length_profile[0], profile_total), 3092 percent(l_ptr->stats.msg_length_profile[1], profile_total), 3093 percent(l_ptr->stats.msg_length_profile[2], profile_total), 3094 percent(l_ptr->stats.msg_length_profile[3], profile_total), 3095 percent(l_ptr->stats.msg_length_profile[4], profile_total), 3096 percent(l_ptr->stats.msg_length_profile[5], profile_total), 3097 percent(l_ptr->stats.msg_length_profile[6], profile_total)); 3098 tipc_printf(&pb, " RX states:%u probes:%u naks:%u defs:%u dups:%u\n", 3099 l_ptr->stats.recv_states, 3100 l_ptr->stats.recv_probes, 3101 l_ptr->stats.recv_nacks, 3102 l_ptr->stats.deferred_recv, 3103 l_ptr->stats.duplicates); 3104 tipc_printf(&pb, " TX states:%u probes:%u naks:%u acks:%u dups:%u\n", 3105 l_ptr->stats.sent_states, 3106 l_ptr->stats.sent_probes, 3107 l_ptr->stats.sent_nacks, 3108 l_ptr->stats.sent_acks, 3109 l_ptr->stats.retransmitted); 3110 tipc_printf(&pb, " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", 3111 l_ptr->stats.bearer_congs, 3112 l_ptr->stats.link_congs, 3113 l_ptr->stats.max_queue_sz, 3114 l_ptr->stats.queue_sz_counts 3115 ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts) 3116 : 0); 3117 3118 tipc_node_unlock(node); 3119 read_unlock_bh(&tipc_net_lock); 3120 return tipc_printbuf_validate(&pb); 3121 } 3122 3123 #define MAX_LINK_STATS_INFO 2000 3124 3125 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space) 3126 { 3127 struct sk_buff *buf; 3128 struct tlv_desc *rep_tlv; 3129 int str_len; 3130 3131 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 3132 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 3133 3134 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO)); 3135 if (!buf) 3136 return NULL; 3137 3138 rep_tlv = (struct tlv_desc *)buf->data; 3139 3140 str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area), 3141 (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO); 3142 if (!str_len) { 3143 buf_discard(buf); 3144 return tipc_cfg_reply_error_string("link not found"); 3145 } 3146 3147 skb_put(buf, TLV_SPACE(str_len)); 3148 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 3149 3150 return buf; 3151 } 3152 3153 /** 3154 * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination 3155 * @dest: network address of destination node 3156 * @selector: used to select from set of active links 3157 * 3158 * If no active link can be found, uses default maximum packet size. 3159 */ 3160 3161 u32 tipc_link_get_max_pkt(u32 dest, u32 selector) 3162 { 3163 struct tipc_node *n_ptr; 3164 struct link *l_ptr; 3165 u32 res = MAX_PKT_DEFAULT; 3166 3167 if (dest == tipc_own_addr) 3168 return MAX_MSG_SIZE; 3169 3170 read_lock_bh(&tipc_net_lock); 3171 n_ptr = tipc_node_select(dest, selector); 3172 if (n_ptr) { 3173 tipc_node_lock(n_ptr); 3174 l_ptr = n_ptr->active_links[selector & 1]; 3175 if (l_ptr) 3176 res = l_ptr->max_pkt; 3177 tipc_node_unlock(n_ptr); 3178 } 3179 read_unlock_bh(&tipc_net_lock); 3180 return res; 3181 } 3182 3183 static void link_dump_send_queue(struct link *l_ptr) 3184 { 3185 if (l_ptr->next_out) { 3186 info("\nContents of unsent queue:\n"); 3187 dbg_print_buf_chain(l_ptr->next_out); 3188 } 3189 info("\nContents of send queue:\n"); 3190 if (l_ptr->first_out) { 3191 dbg_print_buf_chain(l_ptr->first_out); 3192 } 3193 info("Empty send queue\n"); 3194 } 3195 3196 static void link_print(struct link *l_ptr, struct print_buf *buf, 3197 const char *str) 3198 { 3199 tipc_printf(buf, str); 3200 if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr)) 3201 return; 3202 tipc_printf(buf, "Link %x<%s>:", 3203 l_ptr->addr, l_ptr->b_ptr->publ.name); 3204 tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no)); 3205 tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no)); 3206 tipc_printf(buf, "SQUE"); 3207 if (l_ptr->first_out) { 3208 tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out))); 3209 if (l_ptr->next_out) 3210 tipc_printf(buf, "%u..", 3211 msg_seqno(buf_msg(l_ptr->next_out))); 3212 tipc_printf(buf, "%u]", msg_seqno(buf_msg(l_ptr->last_out))); 3213 if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) - 3214 msg_seqno(buf_msg(l_ptr->first_out))) 3215 != (l_ptr->out_queue_size - 1)) || 3216 (l_ptr->last_out->next != NULL)) { 3217 tipc_printf(buf, "\nSend queue inconsistency\n"); 3218 tipc_printf(buf, "first_out= %x ", l_ptr->first_out); 3219 tipc_printf(buf, "next_out= %x ", l_ptr->next_out); 3220 tipc_printf(buf, "last_out= %x ", l_ptr->last_out); 3221 link_dump_send_queue(l_ptr); 3222 } 3223 } else 3224 tipc_printf(buf, "[]"); 3225 tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size); 3226 if (l_ptr->oldest_deferred_in) { 3227 u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 3228 u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in)); 3229 tipc_printf(buf, ":RQUE[%u..%u]", o, n); 3230 if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) { 3231 tipc_printf(buf, ":RQSIZ(%u)", 3232 l_ptr->deferred_inqueue_sz); 3233 } 3234 } 3235 if (link_working_unknown(l_ptr)) 3236 tipc_printf(buf, ":WU"); 3237 if (link_reset_reset(l_ptr)) 3238 tipc_printf(buf, ":RR"); 3239 if (link_reset_unknown(l_ptr)) 3240 tipc_printf(buf, ":RU"); 3241 if (link_working_working(l_ptr)) 3242 tipc_printf(buf, ":WW"); 3243 tipc_printf(buf, "\n"); 3244 } 3245 3246