1 /* 2 * net/tipc/link.c: TIPC link code 3 * 4 * Copyright (c) 1996-2007, Ericsson AB 5 * Copyright (c) 2004-2007, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37 #include "core.h" 38 #include "dbg.h" 39 #include "link.h" 40 #include "net.h" 41 #include "node.h" 42 #include "port.h" 43 #include "addr.h" 44 #include "node_subscr.h" 45 #include "name_distr.h" 46 #include "bearer.h" 47 #include "name_table.h" 48 #include "discover.h" 49 #include "config.h" 50 #include "bcast.h" 51 52 53 /* 54 * Out-of-range value for link session numbers 55 */ 56 57 #define INVALID_SESSION 0x10000 58 59 /* 60 * Limit for deferred reception queue: 61 */ 62 63 #define DEF_QUEUE_LIMIT 256u 64 65 /* 66 * Link state events: 67 */ 68 69 #define STARTING_EVT 856384768 /* link processing trigger */ 70 #define TRAFFIC_MSG_EVT 560815u /* rx'd ??? */ 71 #define TIMEOUT_EVT 560817u /* link timer expired */ 72 73 /* 74 * The following two 'message types' is really just implementation 75 * data conveniently stored in the message header. 76 * They must not be considered part of the protocol 77 */ 78 #define OPEN_MSG 0 79 #define CLOSED_MSG 1 80 81 /* 82 * State value stored in 'exp_msg_count' 83 */ 84 85 #define START_CHANGEOVER 100000u 86 87 /** 88 * struct link_name - deconstructed link name 89 * @addr_local: network address of node at this end 90 * @if_local: name of interface at this end 91 * @addr_peer: network address of node at far end 92 * @if_peer: name of interface at far end 93 */ 94 95 struct link_name { 96 u32 addr_local; 97 char if_local[TIPC_MAX_IF_NAME]; 98 u32 addr_peer; 99 char if_peer[TIPC_MAX_IF_NAME]; 100 }; 101 102 #if 0 103 104 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */ 105 106 /** 107 * struct link_event - link up/down event notification 108 */ 109 110 struct link_event { 111 u32 addr; 112 int up; 113 void (*fcn)(u32, char *, int); 114 char name[TIPC_MAX_LINK_NAME]; 115 }; 116 117 #endif 118 119 static void link_handle_out_of_seq_msg(struct link *l_ptr, 120 struct sk_buff *buf); 121 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf); 122 static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf); 123 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance); 124 static int link_send_sections_long(struct port *sender, 125 struct iovec const *msg_sect, 126 u32 num_sect, u32 destnode); 127 static void link_check_defragm_bufs(struct link *l_ptr); 128 static void link_state_event(struct link *l_ptr, u32 event); 129 static void link_reset_statistics(struct link *l_ptr); 130 static void link_print(struct link *l_ptr, struct print_buf *buf, 131 const char *str); 132 133 /* 134 * Debugging code used by link routines only 135 * 136 * When debugging link problems on a system that has multiple links, 137 * the standard TIPC debugging routines may not be useful since they 138 * allow the output from multiple links to be intermixed. For this reason 139 * routines of the form "dbg_link_XXX()" have been created that will capture 140 * debug info into a link's personal print buffer, which can then be dumped 141 * into the TIPC system log (TIPC_LOG) upon request. 142 * 143 * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size 144 * of the print buffer used by each link. If LINK_LOG_BUF_SIZE is set to 0, 145 * the dbg_link_XXX() routines simply send their output to the standard 146 * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful 147 * when there is only a single link in the system being debugged. 148 * 149 * Notes: 150 * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE 151 * - "l_ptr" must be valid when using dbg_link_XXX() macros 152 */ 153 154 #define LINK_LOG_BUF_SIZE 0 155 156 #define dbg_link(fmt, arg...) \ 157 do { \ 158 if (LINK_LOG_BUF_SIZE) \ 159 tipc_printf(&l_ptr->print_buf, fmt, ## arg); \ 160 } while (0) 161 #define dbg_link_msg(msg, txt) \ 162 do { \ 163 if (LINK_LOG_BUF_SIZE) \ 164 tipc_msg_dbg(&l_ptr->print_buf, msg, txt); \ 165 } while (0) 166 #define dbg_link_state(txt) \ 167 do { \ 168 if (LINK_LOG_BUF_SIZE) \ 169 link_print(l_ptr, &l_ptr->print_buf, txt); \ 170 } while (0) 171 #define dbg_link_dump() do { \ 172 if (LINK_LOG_BUF_SIZE) { \ 173 tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \ 174 tipc_printbuf_move(LOG, &l_ptr->print_buf); \ 175 } \ 176 } while (0) 177 178 static void dbg_print_link(struct link *l_ptr, const char *str) 179 { 180 if (DBG_OUTPUT != TIPC_NULL) 181 link_print(l_ptr, DBG_OUTPUT, str); 182 } 183 184 static void dbg_print_buf_chain(struct sk_buff *root_buf) 185 { 186 if (DBG_OUTPUT != TIPC_NULL) { 187 struct sk_buff *buf = root_buf; 188 189 while (buf) { 190 msg_dbg(buf_msg(buf), "In chain: "); 191 buf = buf->next; 192 } 193 } 194 } 195 196 /* 197 * Simple link routines 198 */ 199 200 static unsigned int align(unsigned int i) 201 { 202 return (i + 3) & ~3u; 203 } 204 205 static int link_working_working(struct link *l_ptr) 206 { 207 return (l_ptr->state == WORKING_WORKING); 208 } 209 210 static int link_working_unknown(struct link *l_ptr) 211 { 212 return (l_ptr->state == WORKING_UNKNOWN); 213 } 214 215 static int link_reset_unknown(struct link *l_ptr) 216 { 217 return (l_ptr->state == RESET_UNKNOWN); 218 } 219 220 static int link_reset_reset(struct link *l_ptr) 221 { 222 return (l_ptr->state == RESET_RESET); 223 } 224 225 static int link_blocked(struct link *l_ptr) 226 { 227 return (l_ptr->exp_msg_count || l_ptr->blocked); 228 } 229 230 static int link_congested(struct link *l_ptr) 231 { 232 return (l_ptr->out_queue_size >= l_ptr->queue_limit[0]); 233 } 234 235 static u32 link_max_pkt(struct link *l_ptr) 236 { 237 return l_ptr->max_pkt; 238 } 239 240 static void link_init_max_pkt(struct link *l_ptr) 241 { 242 u32 max_pkt; 243 244 max_pkt = (l_ptr->b_ptr->publ.mtu & ~3); 245 if (max_pkt > MAX_MSG_SIZE) 246 max_pkt = MAX_MSG_SIZE; 247 248 l_ptr->max_pkt_target = max_pkt; 249 if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT) 250 l_ptr->max_pkt = l_ptr->max_pkt_target; 251 else 252 l_ptr->max_pkt = MAX_PKT_DEFAULT; 253 254 l_ptr->max_pkt_probes = 0; 255 } 256 257 static u32 link_next_sent(struct link *l_ptr) 258 { 259 if (l_ptr->next_out) 260 return msg_seqno(buf_msg(l_ptr->next_out)); 261 return mod(l_ptr->next_out_no); 262 } 263 264 static u32 link_last_sent(struct link *l_ptr) 265 { 266 return mod(link_next_sent(l_ptr) - 1); 267 } 268 269 /* 270 * Simple non-static link routines (i.e. referenced outside this file) 271 */ 272 273 int tipc_link_is_up(struct link *l_ptr) 274 { 275 if (!l_ptr) 276 return 0; 277 return (link_working_working(l_ptr) || link_working_unknown(l_ptr)); 278 } 279 280 int tipc_link_is_active(struct link *l_ptr) 281 { 282 return ((l_ptr->owner->active_links[0] == l_ptr) || 283 (l_ptr->owner->active_links[1] == l_ptr)); 284 } 285 286 /** 287 * link_name_validate - validate & (optionally) deconstruct link name 288 * @name - ptr to link name string 289 * @name_parts - ptr to area for link name components (or NULL if not needed) 290 * 291 * Returns 1 if link name is valid, otherwise 0. 292 */ 293 294 static int link_name_validate(const char *name, struct link_name *name_parts) 295 { 296 char name_copy[TIPC_MAX_LINK_NAME]; 297 char *addr_local; 298 char *if_local; 299 char *addr_peer; 300 char *if_peer; 301 char dummy; 302 u32 z_local, c_local, n_local; 303 u32 z_peer, c_peer, n_peer; 304 u32 if_local_len; 305 u32 if_peer_len; 306 307 /* copy link name & ensure length is OK */ 308 309 name_copy[TIPC_MAX_LINK_NAME - 1] = 0; 310 /* need above in case non-Posix strncpy() doesn't pad with nulls */ 311 strncpy(name_copy, name, TIPC_MAX_LINK_NAME); 312 if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0) 313 return 0; 314 315 /* ensure all component parts of link name are present */ 316 317 addr_local = name_copy; 318 if ((if_local = strchr(addr_local, ':')) == NULL) 319 return 0; 320 *(if_local++) = 0; 321 if ((addr_peer = strchr(if_local, '-')) == NULL) 322 return 0; 323 *(addr_peer++) = 0; 324 if_local_len = addr_peer - if_local; 325 if ((if_peer = strchr(addr_peer, ':')) == NULL) 326 return 0; 327 *(if_peer++) = 0; 328 if_peer_len = strlen(if_peer) + 1; 329 330 /* validate component parts of link name */ 331 332 if ((sscanf(addr_local, "%u.%u.%u%c", 333 &z_local, &c_local, &n_local, &dummy) != 3) || 334 (sscanf(addr_peer, "%u.%u.%u%c", 335 &z_peer, &c_peer, &n_peer, &dummy) != 3) || 336 (z_local > 255) || (c_local > 4095) || (n_local > 4095) || 337 (z_peer > 255) || (c_peer > 4095) || (n_peer > 4095) || 338 (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) || 339 (if_peer_len <= 1) || (if_peer_len > TIPC_MAX_IF_NAME) || 340 (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) || 341 (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1))) 342 return 0; 343 344 /* return link name components, if necessary */ 345 346 if (name_parts) { 347 name_parts->addr_local = tipc_addr(z_local, c_local, n_local); 348 strcpy(name_parts->if_local, if_local); 349 name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer); 350 strcpy(name_parts->if_peer, if_peer); 351 } 352 return 1; 353 } 354 355 /** 356 * link_timeout - handle expiration of link timer 357 * @l_ptr: pointer to link 358 * 359 * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict 360 * with tipc_link_delete(). (There is no risk that the node will be deleted by 361 * another thread because tipc_link_delete() always cancels the link timer before 362 * tipc_node_delete() is called.) 363 */ 364 365 static void link_timeout(struct link *l_ptr) 366 { 367 tipc_node_lock(l_ptr->owner); 368 369 /* update counters used in statistical profiling of send traffic */ 370 371 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 372 l_ptr->stats.queue_sz_counts++; 373 374 if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) 375 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; 376 377 if (l_ptr->first_out) { 378 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 379 u32 length = msg_size(msg); 380 381 if ((msg_user(msg) == MSG_FRAGMENTER) 382 && (msg_type(msg) == FIRST_FRAGMENT)) { 383 length = msg_size(msg_get_wrapped(msg)); 384 } 385 if (length) { 386 l_ptr->stats.msg_lengths_total += length; 387 l_ptr->stats.msg_length_counts++; 388 if (length <= 64) 389 l_ptr->stats.msg_length_profile[0]++; 390 else if (length <= 256) 391 l_ptr->stats.msg_length_profile[1]++; 392 else if (length <= 1024) 393 l_ptr->stats.msg_length_profile[2]++; 394 else if (length <= 4096) 395 l_ptr->stats.msg_length_profile[3]++; 396 else if (length <= 16384) 397 l_ptr->stats.msg_length_profile[4]++; 398 else if (length <= 32768) 399 l_ptr->stats.msg_length_profile[5]++; 400 else 401 l_ptr->stats.msg_length_profile[6]++; 402 } 403 } 404 405 /* do all other link processing performed on a periodic basis */ 406 407 link_check_defragm_bufs(l_ptr); 408 409 link_state_event(l_ptr, TIMEOUT_EVT); 410 411 if (l_ptr->next_out) 412 tipc_link_push_queue(l_ptr); 413 414 tipc_node_unlock(l_ptr->owner); 415 } 416 417 static void link_set_timer(struct link *l_ptr, u32 time) 418 { 419 k_start_timer(&l_ptr->timer, time); 420 } 421 422 /** 423 * tipc_link_create - create a new link 424 * @b_ptr: pointer to associated bearer 425 * @peer: network address of node at other end of link 426 * @media_addr: media address to use when sending messages over link 427 * 428 * Returns pointer to link. 429 */ 430 431 struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer, 432 const struct tipc_media_addr *media_addr) 433 { 434 struct link *l_ptr; 435 struct tipc_msg *msg; 436 char *if_name; 437 438 l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC); 439 if (!l_ptr) { 440 warn("Link creation failed, no memory\n"); 441 return NULL; 442 } 443 444 if (LINK_LOG_BUF_SIZE) { 445 char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC); 446 447 if (!pb) { 448 kfree(l_ptr); 449 warn("Link creation failed, no memory for print buffer\n"); 450 return NULL; 451 } 452 tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE); 453 } 454 455 l_ptr->addr = peer; 456 if_name = strchr(b_ptr->publ.name, ':') + 1; 457 sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:", 458 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 459 tipc_node(tipc_own_addr), 460 if_name, 461 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); 462 /* note: peer i/f is appended to link name by reset/activate */ 463 memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); 464 l_ptr->checkpoint = 1; 465 l_ptr->b_ptr = b_ptr; 466 link_set_supervision_props(l_ptr, b_ptr->media->tolerance); 467 l_ptr->state = RESET_UNKNOWN; 468 469 l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg; 470 msg = l_ptr->pmsg; 471 msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr); 472 msg_set_size(msg, sizeof(l_ptr->proto_msg)); 473 msg_set_session(msg, (tipc_random & 0xffff)); 474 msg_set_bearer_id(msg, b_ptr->identity); 475 strcpy((char *)msg_data(msg), if_name); 476 477 l_ptr->priority = b_ptr->priority; 478 tipc_link_set_queue_limits(l_ptr, b_ptr->media->window); 479 480 link_init_max_pkt(l_ptr); 481 482 l_ptr->next_out_no = 1; 483 INIT_LIST_HEAD(&l_ptr->waiting_ports); 484 485 link_reset_statistics(l_ptr); 486 487 l_ptr->owner = tipc_node_attach_link(l_ptr); 488 if (!l_ptr->owner) { 489 if (LINK_LOG_BUF_SIZE) 490 kfree(l_ptr->print_buf.buf); 491 kfree(l_ptr); 492 return NULL; 493 } 494 495 k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); 496 list_add_tail(&l_ptr->link_list, &b_ptr->links); 497 tipc_k_signal((Handler)tipc_link_start, (unsigned long)l_ptr); 498 499 dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n", 500 l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit); 501 502 return l_ptr; 503 } 504 505 /** 506 * tipc_link_delete - delete a link 507 * @l_ptr: pointer to link 508 * 509 * Note: 'tipc_net_lock' is write_locked, bearer is locked. 510 * This routine must not grab the node lock until after link timer cancellation 511 * to avoid a potential deadlock situation. 512 */ 513 514 void tipc_link_delete(struct link *l_ptr) 515 { 516 if (!l_ptr) { 517 err("Attempt to delete non-existent link\n"); 518 return; 519 } 520 521 dbg("tipc_link_delete()\n"); 522 523 k_cancel_timer(&l_ptr->timer); 524 525 tipc_node_lock(l_ptr->owner); 526 tipc_link_reset(l_ptr); 527 tipc_node_detach_link(l_ptr->owner, l_ptr); 528 tipc_link_stop(l_ptr); 529 list_del_init(&l_ptr->link_list); 530 if (LINK_LOG_BUF_SIZE) 531 kfree(l_ptr->print_buf.buf); 532 tipc_node_unlock(l_ptr->owner); 533 k_term_timer(&l_ptr->timer); 534 kfree(l_ptr); 535 } 536 537 void tipc_link_start(struct link *l_ptr) 538 { 539 dbg("tipc_link_start %x\n", l_ptr); 540 link_state_event(l_ptr, STARTING_EVT); 541 } 542 543 /** 544 * link_schedule_port - schedule port for deferred sending 545 * @l_ptr: pointer to link 546 * @origport: reference to sending port 547 * @sz: amount of data to be sent 548 * 549 * Schedules port for renewed sending of messages after link congestion 550 * has abated. 551 */ 552 553 static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz) 554 { 555 struct port *p_ptr; 556 557 spin_lock_bh(&tipc_port_list_lock); 558 p_ptr = tipc_port_lock(origport); 559 if (p_ptr) { 560 if (!p_ptr->wakeup) 561 goto exit; 562 if (!list_empty(&p_ptr->wait_list)) 563 goto exit; 564 p_ptr->congested_link = l_ptr; 565 p_ptr->publ.congested = 1; 566 p_ptr->waiting_pkts = 1 + ((sz - 1) / link_max_pkt(l_ptr)); 567 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 568 l_ptr->stats.link_congs++; 569 exit: 570 tipc_port_unlock(p_ptr); 571 } 572 spin_unlock_bh(&tipc_port_list_lock); 573 return -ELINKCONG; 574 } 575 576 void tipc_link_wakeup_ports(struct link *l_ptr, int all) 577 { 578 struct port *p_ptr; 579 struct port *temp_p_ptr; 580 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 581 582 if (all) 583 win = 100000; 584 if (win <= 0) 585 return; 586 if (!spin_trylock_bh(&tipc_port_list_lock)) 587 return; 588 if (link_congested(l_ptr)) 589 goto exit; 590 list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports, 591 wait_list) { 592 if (win <= 0) 593 break; 594 list_del_init(&p_ptr->wait_list); 595 p_ptr->congested_link = NULL; 596 spin_lock_bh(p_ptr->publ.lock); 597 p_ptr->publ.congested = 0; 598 p_ptr->wakeup(&p_ptr->publ); 599 win -= p_ptr->waiting_pkts; 600 spin_unlock_bh(p_ptr->publ.lock); 601 } 602 603 exit: 604 spin_unlock_bh(&tipc_port_list_lock); 605 } 606 607 /** 608 * link_release_outqueue - purge link's outbound message queue 609 * @l_ptr: pointer to link 610 */ 611 612 static void link_release_outqueue(struct link *l_ptr) 613 { 614 struct sk_buff *buf = l_ptr->first_out; 615 struct sk_buff *next; 616 617 while (buf) { 618 next = buf->next; 619 buf_discard(buf); 620 buf = next; 621 } 622 l_ptr->first_out = NULL; 623 l_ptr->out_queue_size = 0; 624 } 625 626 /** 627 * tipc_link_reset_fragments - purge link's inbound message fragments queue 628 * @l_ptr: pointer to link 629 */ 630 631 void tipc_link_reset_fragments(struct link *l_ptr) 632 { 633 struct sk_buff *buf = l_ptr->defragm_buf; 634 struct sk_buff *next; 635 636 while (buf) { 637 next = buf->next; 638 buf_discard(buf); 639 buf = next; 640 } 641 l_ptr->defragm_buf = NULL; 642 } 643 644 /** 645 * tipc_link_stop - purge all inbound and outbound messages associated with link 646 * @l_ptr: pointer to link 647 */ 648 649 void tipc_link_stop(struct link *l_ptr) 650 { 651 struct sk_buff *buf; 652 struct sk_buff *next; 653 654 buf = l_ptr->oldest_deferred_in; 655 while (buf) { 656 next = buf->next; 657 buf_discard(buf); 658 buf = next; 659 } 660 661 buf = l_ptr->first_out; 662 while (buf) { 663 next = buf->next; 664 buf_discard(buf); 665 buf = next; 666 } 667 668 tipc_link_reset_fragments(l_ptr); 669 670 buf_discard(l_ptr->proto_msg_queue); 671 l_ptr->proto_msg_queue = NULL; 672 } 673 674 #if 0 675 676 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */ 677 678 static void link_recv_event(struct link_event *ev) 679 { 680 ev->fcn(ev->addr, ev->name, ev->up); 681 kfree(ev); 682 } 683 684 static void link_send_event(void (*fcn)(u32 a, char *n, int up), 685 struct link *l_ptr, int up) 686 { 687 struct link_event *ev; 688 689 ev = kmalloc(sizeof(*ev), GFP_ATOMIC); 690 if (!ev) { 691 warn("Link event allocation failure\n"); 692 return; 693 } 694 ev->addr = l_ptr->addr; 695 ev->up = up; 696 ev->fcn = fcn; 697 memcpy(ev->name, l_ptr->name, TIPC_MAX_LINK_NAME); 698 tipc_k_signal((Handler)link_recv_event, (unsigned long)ev); 699 } 700 701 #else 702 703 #define link_send_event(fcn, l_ptr, up) do { } while (0) 704 705 #endif 706 707 void tipc_link_reset(struct link *l_ptr) 708 { 709 struct sk_buff *buf; 710 u32 prev_state = l_ptr->state; 711 u32 checkpoint = l_ptr->next_in_no; 712 int was_active_link = tipc_link_is_active(l_ptr); 713 714 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); 715 716 /* Link is down, accept any session */ 717 l_ptr->peer_session = INVALID_SESSION; 718 719 /* Prepare for max packet size negotiation */ 720 link_init_max_pkt(l_ptr); 721 722 l_ptr->state = RESET_UNKNOWN; 723 dbg_link_state("Resetting Link\n"); 724 725 if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET)) 726 return; 727 728 tipc_node_link_down(l_ptr->owner, l_ptr); 729 tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); 730 #if 0 731 tipc_printf(TIPC_CONS, "\nReset link <%s>\n", l_ptr->name); 732 dbg_link_dump(); 733 #endif 734 if (was_active_link && tipc_node_has_active_links(l_ptr->owner) && 735 l_ptr->owner->permit_changeover) { 736 l_ptr->reset_checkpoint = checkpoint; 737 l_ptr->exp_msg_count = START_CHANGEOVER; 738 } 739 740 /* Clean up all queues: */ 741 742 link_release_outqueue(l_ptr); 743 buf_discard(l_ptr->proto_msg_queue); 744 l_ptr->proto_msg_queue = NULL; 745 buf = l_ptr->oldest_deferred_in; 746 while (buf) { 747 struct sk_buff *next = buf->next; 748 buf_discard(buf); 749 buf = next; 750 } 751 if (!list_empty(&l_ptr->waiting_ports)) 752 tipc_link_wakeup_ports(l_ptr, 1); 753 754 l_ptr->retransm_queue_head = 0; 755 l_ptr->retransm_queue_size = 0; 756 l_ptr->last_out = NULL; 757 l_ptr->first_out = NULL; 758 l_ptr->next_out = NULL; 759 l_ptr->unacked_window = 0; 760 l_ptr->checkpoint = 1; 761 l_ptr->next_out_no = 1; 762 l_ptr->deferred_inqueue_sz = 0; 763 l_ptr->oldest_deferred_in = NULL; 764 l_ptr->newest_deferred_in = NULL; 765 l_ptr->fsm_msg_cnt = 0; 766 l_ptr->stale_count = 0; 767 link_reset_statistics(l_ptr); 768 769 link_send_event(tipc_cfg_link_event, l_ptr, 0); 770 if (!in_own_cluster(l_ptr->addr)) 771 link_send_event(tipc_disc_link_event, l_ptr, 0); 772 } 773 774 775 static void link_activate(struct link *l_ptr) 776 { 777 l_ptr->next_in_no = l_ptr->stats.recv_info = 1; 778 tipc_node_link_up(l_ptr->owner, l_ptr); 779 tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr); 780 link_send_event(tipc_cfg_link_event, l_ptr, 1); 781 if (!in_own_cluster(l_ptr->addr)) 782 link_send_event(tipc_disc_link_event, l_ptr, 1); 783 } 784 785 /** 786 * link_state_event - link finite state machine 787 * @l_ptr: pointer to link 788 * @event: state machine event to process 789 */ 790 791 static void link_state_event(struct link *l_ptr, unsigned event) 792 { 793 struct link *other; 794 u32 cont_intv = l_ptr->continuity_interval; 795 796 if (!l_ptr->started && (event != STARTING_EVT)) 797 return; /* Not yet. */ 798 799 if (link_blocked(l_ptr)) { 800 if (event == TIMEOUT_EVT) { 801 link_set_timer(l_ptr, cont_intv); 802 } 803 return; /* Changeover going on */ 804 } 805 dbg_link("STATE_EV: <%s> ", l_ptr->name); 806 807 switch (l_ptr->state) { 808 case WORKING_WORKING: 809 dbg_link("WW/"); 810 switch (event) { 811 case TRAFFIC_MSG_EVT: 812 dbg_link("TRF-"); 813 /* fall through */ 814 case ACTIVATE_MSG: 815 dbg_link("ACT\n"); 816 break; 817 case TIMEOUT_EVT: 818 dbg_link("TIM "); 819 if (l_ptr->next_in_no != l_ptr->checkpoint) { 820 l_ptr->checkpoint = l_ptr->next_in_no; 821 if (tipc_bclink_acks_missing(l_ptr->owner)) { 822 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 823 0, 0, 0, 0, 0); 824 l_ptr->fsm_msg_cnt++; 825 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { 826 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 827 1, 0, 0, 0, 0); 828 l_ptr->fsm_msg_cnt++; 829 } 830 link_set_timer(l_ptr, cont_intv); 831 break; 832 } 833 dbg_link(" -> WU\n"); 834 l_ptr->state = WORKING_UNKNOWN; 835 l_ptr->fsm_msg_cnt = 0; 836 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 837 l_ptr->fsm_msg_cnt++; 838 link_set_timer(l_ptr, cont_intv / 4); 839 break; 840 case RESET_MSG: 841 dbg_link("RES -> RR\n"); 842 info("Resetting link <%s>, requested by peer\n", 843 l_ptr->name); 844 tipc_link_reset(l_ptr); 845 l_ptr->state = RESET_RESET; 846 l_ptr->fsm_msg_cnt = 0; 847 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 848 l_ptr->fsm_msg_cnt++; 849 link_set_timer(l_ptr, cont_intv); 850 break; 851 default: 852 err("Unknown link event %u in WW state\n", event); 853 } 854 break; 855 case WORKING_UNKNOWN: 856 dbg_link("WU/"); 857 switch (event) { 858 case TRAFFIC_MSG_EVT: 859 dbg_link("TRF-"); 860 case ACTIVATE_MSG: 861 dbg_link("ACT -> WW\n"); 862 l_ptr->state = WORKING_WORKING; 863 l_ptr->fsm_msg_cnt = 0; 864 link_set_timer(l_ptr, cont_intv); 865 break; 866 case RESET_MSG: 867 dbg_link("RES -> RR\n"); 868 info("Resetting link <%s>, requested by peer " 869 "while probing\n", l_ptr->name); 870 tipc_link_reset(l_ptr); 871 l_ptr->state = RESET_RESET; 872 l_ptr->fsm_msg_cnt = 0; 873 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 874 l_ptr->fsm_msg_cnt++; 875 link_set_timer(l_ptr, cont_intv); 876 break; 877 case TIMEOUT_EVT: 878 dbg_link("TIM "); 879 if (l_ptr->next_in_no != l_ptr->checkpoint) { 880 dbg_link("-> WW \n"); 881 l_ptr->state = WORKING_WORKING; 882 l_ptr->fsm_msg_cnt = 0; 883 l_ptr->checkpoint = l_ptr->next_in_no; 884 if (tipc_bclink_acks_missing(l_ptr->owner)) { 885 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 886 0, 0, 0, 0, 0); 887 l_ptr->fsm_msg_cnt++; 888 } 889 link_set_timer(l_ptr, cont_intv); 890 } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) { 891 dbg_link("Probing %u/%u,timer = %u ms)\n", 892 l_ptr->fsm_msg_cnt, l_ptr->abort_limit, 893 cont_intv / 4); 894 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 895 1, 0, 0, 0, 0); 896 l_ptr->fsm_msg_cnt++; 897 link_set_timer(l_ptr, cont_intv / 4); 898 } else { /* Link has failed */ 899 dbg_link("-> RU (%u probes unanswered)\n", 900 l_ptr->fsm_msg_cnt); 901 warn("Resetting link <%s>, peer not responding\n", 902 l_ptr->name); 903 tipc_link_reset(l_ptr); 904 l_ptr->state = RESET_UNKNOWN; 905 l_ptr->fsm_msg_cnt = 0; 906 tipc_link_send_proto_msg(l_ptr, RESET_MSG, 907 0, 0, 0, 0, 0); 908 l_ptr->fsm_msg_cnt++; 909 link_set_timer(l_ptr, cont_intv); 910 } 911 break; 912 default: 913 err("Unknown link event %u in WU state\n", event); 914 } 915 break; 916 case RESET_UNKNOWN: 917 dbg_link("RU/"); 918 switch (event) { 919 case TRAFFIC_MSG_EVT: 920 dbg_link("TRF-\n"); 921 break; 922 case ACTIVATE_MSG: 923 other = l_ptr->owner->active_links[0]; 924 if (other && link_working_unknown(other)) { 925 dbg_link("ACT\n"); 926 break; 927 } 928 dbg_link("ACT -> WW\n"); 929 l_ptr->state = WORKING_WORKING; 930 l_ptr->fsm_msg_cnt = 0; 931 link_activate(l_ptr); 932 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 933 l_ptr->fsm_msg_cnt++; 934 link_set_timer(l_ptr, cont_intv); 935 break; 936 case RESET_MSG: 937 dbg_link("RES \n"); 938 dbg_link(" -> RR\n"); 939 l_ptr->state = RESET_RESET; 940 l_ptr->fsm_msg_cnt = 0; 941 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0); 942 l_ptr->fsm_msg_cnt++; 943 link_set_timer(l_ptr, cont_intv); 944 break; 945 case STARTING_EVT: 946 dbg_link("START-"); 947 l_ptr->started = 1; 948 /* fall through */ 949 case TIMEOUT_EVT: 950 dbg_link("TIM \n"); 951 tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); 952 l_ptr->fsm_msg_cnt++; 953 link_set_timer(l_ptr, cont_intv); 954 break; 955 default: 956 err("Unknown link event %u in RU state\n", event); 957 } 958 break; 959 case RESET_RESET: 960 dbg_link("RR/ "); 961 switch (event) { 962 case TRAFFIC_MSG_EVT: 963 dbg_link("TRF-"); 964 /* fall through */ 965 case ACTIVATE_MSG: 966 other = l_ptr->owner->active_links[0]; 967 if (other && link_working_unknown(other)) { 968 dbg_link("ACT\n"); 969 break; 970 } 971 dbg_link("ACT -> WW\n"); 972 l_ptr->state = WORKING_WORKING; 973 l_ptr->fsm_msg_cnt = 0; 974 link_activate(l_ptr); 975 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 976 l_ptr->fsm_msg_cnt++; 977 link_set_timer(l_ptr, cont_intv); 978 break; 979 case RESET_MSG: 980 dbg_link("RES\n"); 981 break; 982 case TIMEOUT_EVT: 983 dbg_link("TIM\n"); 984 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 985 l_ptr->fsm_msg_cnt++; 986 link_set_timer(l_ptr, cont_intv); 987 dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt); 988 break; 989 default: 990 err("Unknown link event %u in RR state\n", event); 991 } 992 break; 993 default: 994 err("Unknown link state %u/%u\n", l_ptr->state, event); 995 } 996 } 997 998 /* 999 * link_bundle_buf(): Append contents of a buffer to 1000 * the tail of an existing one. 1001 */ 1002 1003 static int link_bundle_buf(struct link *l_ptr, 1004 struct sk_buff *bundler, 1005 struct sk_buff *buf) 1006 { 1007 struct tipc_msg *bundler_msg = buf_msg(bundler); 1008 struct tipc_msg *msg = buf_msg(buf); 1009 u32 size = msg_size(msg); 1010 u32 bundle_size = msg_size(bundler_msg); 1011 u32 to_pos = align(bundle_size); 1012 u32 pad = to_pos - bundle_size; 1013 1014 if (msg_user(bundler_msg) != MSG_BUNDLER) 1015 return 0; 1016 if (msg_type(bundler_msg) != OPEN_MSG) 1017 return 0; 1018 if (skb_tailroom(bundler) < (pad + size)) 1019 return 0; 1020 if (link_max_pkt(l_ptr) < (to_pos + size)) 1021 return 0; 1022 1023 skb_put(bundler, pad + size); 1024 skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size); 1025 msg_set_size(bundler_msg, to_pos + size); 1026 msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1); 1027 dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n", 1028 msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg)); 1029 msg_dbg(msg, "PACKD:"); 1030 buf_discard(buf); 1031 l_ptr->stats.sent_bundled++; 1032 return 1; 1033 } 1034 1035 static void link_add_to_outqueue(struct link *l_ptr, 1036 struct sk_buff *buf, 1037 struct tipc_msg *msg) 1038 { 1039 u32 ack = mod(l_ptr->next_in_no - 1); 1040 u32 seqno = mod(l_ptr->next_out_no++); 1041 1042 msg_set_word(msg, 2, ((ack << 16) | seqno)); 1043 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1044 buf->next = NULL; 1045 if (l_ptr->first_out) { 1046 l_ptr->last_out->next = buf; 1047 l_ptr->last_out = buf; 1048 } else 1049 l_ptr->first_out = l_ptr->last_out = buf; 1050 l_ptr->out_queue_size++; 1051 } 1052 1053 /* 1054 * tipc_link_send_buf() is the 'full path' for messages, called from 1055 * inside TIPC when the 'fast path' in tipc_send_buf 1056 * has failed, and from link_send() 1057 */ 1058 1059 int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf) 1060 { 1061 struct tipc_msg *msg = buf_msg(buf); 1062 u32 size = msg_size(msg); 1063 u32 dsz = msg_data_sz(msg); 1064 u32 queue_size = l_ptr->out_queue_size; 1065 u32 imp = msg_tot_importance(msg); 1066 u32 queue_limit = l_ptr->queue_limit[imp]; 1067 u32 max_packet = link_max_pkt(l_ptr); 1068 1069 msg_set_prevnode(msg, tipc_own_addr); /* If routed message */ 1070 1071 /* Match msg importance against queue limits: */ 1072 1073 if (unlikely(queue_size >= queue_limit)) { 1074 if (imp <= TIPC_CRITICAL_IMPORTANCE) { 1075 return link_schedule_port(l_ptr, msg_origport(msg), 1076 size); 1077 } 1078 msg_dbg(msg, "TIPC: Congestion, throwing away\n"); 1079 buf_discard(buf); 1080 if (imp > CONN_MANAGER) { 1081 warn("Resetting link <%s>, send queue full", l_ptr->name); 1082 tipc_link_reset(l_ptr); 1083 } 1084 return dsz; 1085 } 1086 1087 /* Fragmentation needed ? */ 1088 1089 if (size > max_packet) 1090 return tipc_link_send_long_buf(l_ptr, buf); 1091 1092 /* Packet can be queued or sent: */ 1093 1094 if (queue_size > l_ptr->stats.max_queue_sz) 1095 l_ptr->stats.max_queue_sz = queue_size; 1096 1097 if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && 1098 !link_congested(l_ptr))) { 1099 link_add_to_outqueue(l_ptr, buf, msg); 1100 1101 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { 1102 l_ptr->unacked_window = 0; 1103 } else { 1104 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1105 l_ptr->stats.bearer_congs++; 1106 l_ptr->next_out = buf; 1107 } 1108 return dsz; 1109 } 1110 /* Congestion: can message be bundled ?: */ 1111 1112 if ((msg_user(msg) != CHANGEOVER_PROTOCOL) && 1113 (msg_user(msg) != MSG_FRAGMENTER)) { 1114 1115 /* Try adding message to an existing bundle */ 1116 1117 if (l_ptr->next_out && 1118 link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { 1119 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); 1120 return dsz; 1121 } 1122 1123 /* Try creating a new bundle */ 1124 1125 if (size <= max_packet * 2 / 3) { 1126 struct sk_buff *bundler = buf_acquire(max_packet); 1127 struct tipc_msg bundler_hdr; 1128 1129 if (bundler) { 1130 msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG, 1131 INT_H_SIZE, l_ptr->addr); 1132 skb_copy_to_linear_data(bundler, &bundler_hdr, 1133 INT_H_SIZE); 1134 skb_trim(bundler, INT_H_SIZE); 1135 link_bundle_buf(l_ptr, bundler, buf); 1136 buf = bundler; 1137 msg = buf_msg(buf); 1138 l_ptr->stats.sent_bundles++; 1139 } 1140 } 1141 } 1142 if (!l_ptr->next_out) 1143 l_ptr->next_out = buf; 1144 link_add_to_outqueue(l_ptr, buf, msg); 1145 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); 1146 return dsz; 1147 } 1148 1149 /* 1150 * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has 1151 * not been selected yet, and the the owner node is not locked 1152 * Called by TIPC internal users, e.g. the name distributor 1153 */ 1154 1155 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) 1156 { 1157 struct link *l_ptr; 1158 struct tipc_node *n_ptr; 1159 int res = -ELINKCONG; 1160 1161 read_lock_bh(&tipc_net_lock); 1162 n_ptr = tipc_node_select(dest, selector); 1163 if (n_ptr) { 1164 tipc_node_lock(n_ptr); 1165 l_ptr = n_ptr->active_links[selector & 1]; 1166 if (l_ptr) { 1167 dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest); 1168 res = tipc_link_send_buf(l_ptr, buf); 1169 } else { 1170 dbg("Attempt to send msg to unreachable node:\n"); 1171 msg_dbg(buf_msg(buf),">>>"); 1172 buf_discard(buf); 1173 } 1174 tipc_node_unlock(n_ptr); 1175 } else { 1176 dbg("Attempt to send msg to unknown node:\n"); 1177 msg_dbg(buf_msg(buf),">>>"); 1178 buf_discard(buf); 1179 } 1180 read_unlock_bh(&tipc_net_lock); 1181 return res; 1182 } 1183 1184 /* 1185 * link_send_buf_fast: Entry for data messages where the 1186 * destination link is known and the header is complete, 1187 * inclusive total message length. Very time critical. 1188 * Link is locked. Returns user data length. 1189 */ 1190 1191 static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf, 1192 u32 *used_max_pkt) 1193 { 1194 struct tipc_msg *msg = buf_msg(buf); 1195 int res = msg_data_sz(msg); 1196 1197 if (likely(!link_congested(l_ptr))) { 1198 if (likely(msg_size(msg) <= link_max_pkt(l_ptr))) { 1199 if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { 1200 link_add_to_outqueue(l_ptr, buf, msg); 1201 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, 1202 &l_ptr->media_addr))) { 1203 l_ptr->unacked_window = 0; 1204 msg_dbg(msg,"SENT_FAST:"); 1205 return res; 1206 } 1207 dbg("failed sent fast...\n"); 1208 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1209 l_ptr->stats.bearer_congs++; 1210 l_ptr->next_out = buf; 1211 return res; 1212 } 1213 } 1214 else 1215 *used_max_pkt = link_max_pkt(l_ptr); 1216 } 1217 return tipc_link_send_buf(l_ptr, buf); /* All other cases */ 1218 } 1219 1220 /* 1221 * tipc_send_buf_fast: Entry for data messages where the 1222 * destination node is known and the header is complete, 1223 * inclusive total message length. 1224 * Returns user data length. 1225 */ 1226 int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode) 1227 { 1228 struct link *l_ptr; 1229 struct tipc_node *n_ptr; 1230 int res; 1231 u32 selector = msg_origport(buf_msg(buf)) & 1; 1232 u32 dummy; 1233 1234 if (destnode == tipc_own_addr) 1235 return tipc_port_recv_msg(buf); 1236 1237 read_lock_bh(&tipc_net_lock); 1238 n_ptr = tipc_node_select(destnode, selector); 1239 if (likely(n_ptr)) { 1240 tipc_node_lock(n_ptr); 1241 l_ptr = n_ptr->active_links[selector]; 1242 dbg("send_fast: buf %x selected %x, destnode = %x\n", 1243 buf, l_ptr, destnode); 1244 if (likely(l_ptr)) { 1245 res = link_send_buf_fast(l_ptr, buf, &dummy); 1246 tipc_node_unlock(n_ptr); 1247 read_unlock_bh(&tipc_net_lock); 1248 return res; 1249 } 1250 tipc_node_unlock(n_ptr); 1251 } 1252 read_unlock_bh(&tipc_net_lock); 1253 res = msg_data_sz(buf_msg(buf)); 1254 tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1255 return res; 1256 } 1257 1258 1259 /* 1260 * tipc_link_send_sections_fast: Entry for messages where the 1261 * destination processor is known and the header is complete, 1262 * except for total message length. 1263 * Returns user data length or errno. 1264 */ 1265 int tipc_link_send_sections_fast(struct port *sender, 1266 struct iovec const *msg_sect, 1267 const u32 num_sect, 1268 u32 destaddr) 1269 { 1270 struct tipc_msg *hdr = &sender->publ.phdr; 1271 struct link *l_ptr; 1272 struct sk_buff *buf; 1273 struct tipc_node *node; 1274 int res; 1275 u32 selector = msg_origport(hdr) & 1; 1276 1277 again: 1278 /* 1279 * Try building message using port's max_pkt hint. 1280 * (Must not hold any locks while building message.) 1281 */ 1282 1283 res = msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt, 1284 !sender->user_port, &buf); 1285 1286 read_lock_bh(&tipc_net_lock); 1287 node = tipc_node_select(destaddr, selector); 1288 if (likely(node)) { 1289 tipc_node_lock(node); 1290 l_ptr = node->active_links[selector]; 1291 if (likely(l_ptr)) { 1292 if (likely(buf)) { 1293 res = link_send_buf_fast(l_ptr, buf, 1294 &sender->publ.max_pkt); 1295 if (unlikely(res < 0)) 1296 buf_discard(buf); 1297 exit: 1298 tipc_node_unlock(node); 1299 read_unlock_bh(&tipc_net_lock); 1300 return res; 1301 } 1302 1303 /* Exit if build request was invalid */ 1304 1305 if (unlikely(res < 0)) 1306 goto exit; 1307 1308 /* Exit if link (or bearer) is congested */ 1309 1310 if (link_congested(l_ptr) || 1311 !list_empty(&l_ptr->b_ptr->cong_links)) { 1312 res = link_schedule_port(l_ptr, 1313 sender->publ.ref, res); 1314 goto exit; 1315 } 1316 1317 /* 1318 * Message size exceeds max_pkt hint; update hint, 1319 * then re-try fast path or fragment the message 1320 */ 1321 1322 sender->publ.max_pkt = link_max_pkt(l_ptr); 1323 tipc_node_unlock(node); 1324 read_unlock_bh(&tipc_net_lock); 1325 1326 1327 if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt) 1328 goto again; 1329 1330 return link_send_sections_long(sender, msg_sect, 1331 num_sect, destaddr); 1332 } 1333 tipc_node_unlock(node); 1334 } 1335 read_unlock_bh(&tipc_net_lock); 1336 1337 /* Couldn't find a link to the destination node */ 1338 1339 if (buf) 1340 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1341 if (res >= 0) 1342 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1343 TIPC_ERR_NO_NODE); 1344 return res; 1345 } 1346 1347 /* 1348 * link_send_sections_long(): Entry for long messages where the 1349 * destination node is known and the header is complete, 1350 * inclusive total message length. 1351 * Link and bearer congestion status have been checked to be ok, 1352 * and are ignored if they change. 1353 * 1354 * Note that fragments do not use the full link MTU so that they won't have 1355 * to undergo refragmentation if link changeover causes them to be sent 1356 * over another link with an additional tunnel header added as prefix. 1357 * (Refragmentation will still occur if the other link has a smaller MTU.) 1358 * 1359 * Returns user data length or errno. 1360 */ 1361 static int link_send_sections_long(struct port *sender, 1362 struct iovec const *msg_sect, 1363 u32 num_sect, 1364 u32 destaddr) 1365 { 1366 struct link *l_ptr; 1367 struct tipc_node *node; 1368 struct tipc_msg *hdr = &sender->publ.phdr; 1369 u32 dsz = msg_data_sz(hdr); 1370 u32 max_pkt,fragm_sz,rest; 1371 struct tipc_msg fragm_hdr; 1372 struct sk_buff *buf,*buf_chain,*prev; 1373 u32 fragm_crs,fragm_rest,hsz,sect_rest; 1374 const unchar *sect_crs; 1375 int curr_sect; 1376 u32 fragm_no; 1377 1378 again: 1379 fragm_no = 1; 1380 max_pkt = sender->publ.max_pkt - INT_H_SIZE; 1381 /* leave room for tunnel header in case of link changeover */ 1382 fragm_sz = max_pkt - INT_H_SIZE; 1383 /* leave room for fragmentation header in each fragment */ 1384 rest = dsz; 1385 fragm_crs = 0; 1386 fragm_rest = 0; 1387 sect_rest = 0; 1388 sect_crs = NULL; 1389 curr_sect = -1; 1390 1391 /* Prepare reusable fragment header: */ 1392 1393 msg_dbg(hdr, ">FRAGMENTING>"); 1394 msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 1395 INT_H_SIZE, msg_destnode(hdr)); 1396 msg_set_link_selector(&fragm_hdr, sender->publ.ref); 1397 msg_set_size(&fragm_hdr, max_pkt); 1398 msg_set_fragm_no(&fragm_hdr, 1); 1399 1400 /* Prepare header of first fragment: */ 1401 1402 buf_chain = buf = buf_acquire(max_pkt); 1403 if (!buf) 1404 return -ENOMEM; 1405 buf->next = NULL; 1406 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); 1407 hsz = msg_hdr_sz(hdr); 1408 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz); 1409 msg_dbg(buf_msg(buf), ">BUILD>"); 1410 1411 /* Chop up message: */ 1412 1413 fragm_crs = INT_H_SIZE + hsz; 1414 fragm_rest = fragm_sz - hsz; 1415 1416 do { /* For all sections */ 1417 u32 sz; 1418 1419 if (!sect_rest) { 1420 sect_rest = msg_sect[++curr_sect].iov_len; 1421 sect_crs = (const unchar *)msg_sect[curr_sect].iov_base; 1422 } 1423 1424 if (sect_rest < fragm_rest) 1425 sz = sect_rest; 1426 else 1427 sz = fragm_rest; 1428 1429 if (likely(!sender->user_port)) { 1430 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { 1431 error: 1432 for (; buf_chain; buf_chain = buf) { 1433 buf = buf_chain->next; 1434 buf_discard(buf_chain); 1435 } 1436 return -EFAULT; 1437 } 1438 } else 1439 skb_copy_to_linear_data_offset(buf, fragm_crs, 1440 sect_crs, sz); 1441 sect_crs += sz; 1442 sect_rest -= sz; 1443 fragm_crs += sz; 1444 fragm_rest -= sz; 1445 rest -= sz; 1446 1447 if (!fragm_rest && rest) { 1448 1449 /* Initiate new fragment: */ 1450 if (rest <= fragm_sz) { 1451 fragm_sz = rest; 1452 msg_set_type(&fragm_hdr,LAST_FRAGMENT); 1453 } else { 1454 msg_set_type(&fragm_hdr, FRAGMENT); 1455 } 1456 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 1457 msg_set_fragm_no(&fragm_hdr, ++fragm_no); 1458 prev = buf; 1459 buf = buf_acquire(fragm_sz + INT_H_SIZE); 1460 if (!buf) 1461 goto error; 1462 1463 buf->next = NULL; 1464 prev->next = buf; 1465 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); 1466 fragm_crs = INT_H_SIZE; 1467 fragm_rest = fragm_sz; 1468 msg_dbg(buf_msg(buf)," >BUILD>"); 1469 } 1470 } 1471 while (rest > 0); 1472 1473 /* 1474 * Now we have a buffer chain. Select a link and check 1475 * that packet size is still OK 1476 */ 1477 node = tipc_node_select(destaddr, sender->publ.ref & 1); 1478 if (likely(node)) { 1479 tipc_node_lock(node); 1480 l_ptr = node->active_links[sender->publ.ref & 1]; 1481 if (!l_ptr) { 1482 tipc_node_unlock(node); 1483 goto reject; 1484 } 1485 if (link_max_pkt(l_ptr) < max_pkt) { 1486 sender->publ.max_pkt = link_max_pkt(l_ptr); 1487 tipc_node_unlock(node); 1488 for (; buf_chain; buf_chain = buf) { 1489 buf = buf_chain->next; 1490 buf_discard(buf_chain); 1491 } 1492 goto again; 1493 } 1494 } else { 1495 reject: 1496 for (; buf_chain; buf_chain = buf) { 1497 buf = buf_chain->next; 1498 buf_discard(buf_chain); 1499 } 1500 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1501 TIPC_ERR_NO_NODE); 1502 } 1503 1504 /* Append whole chain to send queue: */ 1505 1506 buf = buf_chain; 1507 l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1); 1508 if (!l_ptr->next_out) 1509 l_ptr->next_out = buf_chain; 1510 l_ptr->stats.sent_fragmented++; 1511 while (buf) { 1512 struct sk_buff *next = buf->next; 1513 struct tipc_msg *msg = buf_msg(buf); 1514 1515 l_ptr->stats.sent_fragments++; 1516 msg_set_long_msgno(msg, l_ptr->long_msg_seq_no); 1517 link_add_to_outqueue(l_ptr, buf, msg); 1518 msg_dbg(msg, ">ADD>"); 1519 buf = next; 1520 } 1521 1522 /* Send it, if possible: */ 1523 1524 tipc_link_push_queue(l_ptr); 1525 tipc_node_unlock(node); 1526 return dsz; 1527 } 1528 1529 /* 1530 * tipc_link_push_packet: Push one unsent packet to the media 1531 */ 1532 u32 tipc_link_push_packet(struct link *l_ptr) 1533 { 1534 struct sk_buff *buf = l_ptr->first_out; 1535 u32 r_q_size = l_ptr->retransm_queue_size; 1536 u32 r_q_head = l_ptr->retransm_queue_head; 1537 1538 /* Step to position where retransmission failed, if any, */ 1539 /* consider that buffers may have been released in meantime */ 1540 1541 if (r_q_size && buf) { 1542 u32 last = lesser(mod(r_q_head + r_q_size), 1543 link_last_sent(l_ptr)); 1544 u32 first = msg_seqno(buf_msg(buf)); 1545 1546 while (buf && less(first, r_q_head)) { 1547 first = mod(first + 1); 1548 buf = buf->next; 1549 } 1550 l_ptr->retransm_queue_head = r_q_head = first; 1551 l_ptr->retransm_queue_size = r_q_size = mod(last - first); 1552 } 1553 1554 /* Continue retransmission now, if there is anything: */ 1555 1556 if (r_q_size && buf && !skb_cloned(buf)) { 1557 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1558 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 1559 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1560 msg_dbg(buf_msg(buf), ">DEF-RETR>"); 1561 l_ptr->retransm_queue_head = mod(++r_q_head); 1562 l_ptr->retransm_queue_size = --r_q_size; 1563 l_ptr->stats.retransmitted++; 1564 return 0; 1565 } else { 1566 l_ptr->stats.bearer_congs++; 1567 msg_dbg(buf_msg(buf), "|>DEF-RETR>"); 1568 return PUSH_FAILED; 1569 } 1570 } 1571 1572 /* Send deferred protocol message, if any: */ 1573 1574 buf = l_ptr->proto_msg_queue; 1575 if (buf) { 1576 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1577 msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in); 1578 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1579 msg_dbg(buf_msg(buf), ">DEF-PROT>"); 1580 l_ptr->unacked_window = 0; 1581 buf_discard(buf); 1582 l_ptr->proto_msg_queue = NULL; 1583 return 0; 1584 } else { 1585 msg_dbg(buf_msg(buf), "|>DEF-PROT>"); 1586 l_ptr->stats.bearer_congs++; 1587 return PUSH_FAILED; 1588 } 1589 } 1590 1591 /* Send one deferred data message, if send window not full: */ 1592 1593 buf = l_ptr->next_out; 1594 if (buf) { 1595 struct tipc_msg *msg = buf_msg(buf); 1596 u32 next = msg_seqno(msg); 1597 u32 first = msg_seqno(buf_msg(l_ptr->first_out)); 1598 1599 if (mod(next - first) < l_ptr->queue_limit[0]) { 1600 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1601 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1602 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1603 if (msg_user(msg) == MSG_BUNDLER) 1604 msg_set_type(msg, CLOSED_MSG); 1605 msg_dbg(msg, ">PUSH-DATA>"); 1606 l_ptr->next_out = buf->next; 1607 return 0; 1608 } else { 1609 msg_dbg(msg, "|PUSH-DATA|"); 1610 l_ptr->stats.bearer_congs++; 1611 return PUSH_FAILED; 1612 } 1613 } 1614 } 1615 return PUSH_FINISHED; 1616 } 1617 1618 /* 1619 * push_queue(): push out the unsent messages of a link where 1620 * congestion has abated. Node is locked 1621 */ 1622 void tipc_link_push_queue(struct link *l_ptr) 1623 { 1624 u32 res; 1625 1626 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) 1627 return; 1628 1629 do { 1630 res = tipc_link_push_packet(l_ptr); 1631 } while (!res); 1632 1633 if (res == PUSH_FAILED) 1634 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1635 } 1636 1637 static void link_reset_all(unsigned long addr) 1638 { 1639 struct tipc_node *n_ptr; 1640 char addr_string[16]; 1641 u32 i; 1642 1643 read_lock_bh(&tipc_net_lock); 1644 n_ptr = tipc_node_find((u32)addr); 1645 if (!n_ptr) { 1646 read_unlock_bh(&tipc_net_lock); 1647 return; /* node no longer exists */ 1648 } 1649 1650 tipc_node_lock(n_ptr); 1651 1652 warn("Resetting all links to %s\n", 1653 addr_string_fill(addr_string, n_ptr->addr)); 1654 1655 for (i = 0; i < MAX_BEARERS; i++) { 1656 if (n_ptr->links[i]) { 1657 link_print(n_ptr->links[i], TIPC_OUTPUT, 1658 "Resetting link\n"); 1659 tipc_link_reset(n_ptr->links[i]); 1660 } 1661 } 1662 1663 tipc_node_unlock(n_ptr); 1664 read_unlock_bh(&tipc_net_lock); 1665 } 1666 1667 static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf) 1668 { 1669 struct tipc_msg *msg = buf_msg(buf); 1670 1671 warn("Retransmission failure on link <%s>\n", l_ptr->name); 1672 tipc_msg_dbg(TIPC_OUTPUT, msg, ">RETR-FAIL>"); 1673 1674 if (l_ptr->addr) { 1675 1676 /* Handle failure on standard link */ 1677 1678 link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n"); 1679 tipc_link_reset(l_ptr); 1680 1681 } else { 1682 1683 /* Handle failure on broadcast link */ 1684 1685 struct tipc_node *n_ptr; 1686 char addr_string[16]; 1687 1688 tipc_printf(TIPC_OUTPUT, "Msg seq number: %u, ", msg_seqno(msg)); 1689 tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n", 1690 (unsigned long) TIPC_SKB_CB(buf)->handle); 1691 1692 n_ptr = l_ptr->owner->next; 1693 tipc_node_lock(n_ptr); 1694 1695 addr_string_fill(addr_string, n_ptr->addr); 1696 tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string); 1697 tipc_printf(TIPC_OUTPUT, "Supported: %d, ", n_ptr->bclink.supported); 1698 tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked); 1699 tipc_printf(TIPC_OUTPUT, "Last in: %u, ", n_ptr->bclink.last_in); 1700 tipc_printf(TIPC_OUTPUT, "Gap after: %u, ", n_ptr->bclink.gap_after); 1701 tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to); 1702 tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync); 1703 1704 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); 1705 1706 tipc_node_unlock(n_ptr); 1707 1708 l_ptr->stale_count = 0; 1709 } 1710 } 1711 1712 void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf, 1713 u32 retransmits) 1714 { 1715 struct tipc_msg *msg; 1716 1717 if (!buf) 1718 return; 1719 1720 msg = buf_msg(buf); 1721 1722 dbg("Retransmitting %u in link %x\n", retransmits, l_ptr); 1723 1724 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 1725 if (!skb_cloned(buf)) { 1726 msg_dbg(msg, ">NO_RETR->BCONG>"); 1727 dbg_print_link(l_ptr, " "); 1728 l_ptr->retransm_queue_head = msg_seqno(msg); 1729 l_ptr->retransm_queue_size = retransmits; 1730 return; 1731 } else { 1732 /* Don't retransmit if driver already has the buffer */ 1733 } 1734 } else { 1735 /* Detect repeated retransmit failures on uncongested bearer */ 1736 1737 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 1738 if (++l_ptr->stale_count > 100) { 1739 link_retransmit_failure(l_ptr, buf); 1740 return; 1741 } 1742 } else { 1743 l_ptr->last_retransmitted = msg_seqno(msg); 1744 l_ptr->stale_count = 1; 1745 } 1746 } 1747 1748 while (retransmits && (buf != l_ptr->next_out) && buf && !skb_cloned(buf)) { 1749 msg = buf_msg(buf); 1750 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1751 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1752 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1753 msg_dbg(buf_msg(buf), ">RETR>"); 1754 buf = buf->next; 1755 retransmits--; 1756 l_ptr->stats.retransmitted++; 1757 } else { 1758 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1759 l_ptr->stats.bearer_congs++; 1760 l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf)); 1761 l_ptr->retransm_queue_size = retransmits; 1762 return; 1763 } 1764 } 1765 1766 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; 1767 } 1768 1769 /** 1770 * link_insert_deferred_queue - insert deferred messages back into receive chain 1771 */ 1772 1773 static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr, 1774 struct sk_buff *buf) 1775 { 1776 u32 seq_no; 1777 1778 if (l_ptr->oldest_deferred_in == NULL) 1779 return buf; 1780 1781 seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 1782 if (seq_no == mod(l_ptr->next_in_no)) { 1783 l_ptr->newest_deferred_in->next = buf; 1784 buf = l_ptr->oldest_deferred_in; 1785 l_ptr->oldest_deferred_in = NULL; 1786 l_ptr->deferred_inqueue_sz = 0; 1787 } 1788 return buf; 1789 } 1790 1791 /** 1792 * link_recv_buf_validate - validate basic format of received message 1793 * 1794 * This routine ensures a TIPC message has an acceptable header, and at least 1795 * as much data as the header indicates it should. The routine also ensures 1796 * that the entire message header is stored in the main fragment of the message 1797 * buffer, to simplify future access to message header fields. 1798 * 1799 * Note: Having extra info present in the message header or data areas is OK. 1800 * TIPC will ignore the excess, under the assumption that it is optional info 1801 * introduced by a later release of the protocol. 1802 */ 1803 1804 static int link_recv_buf_validate(struct sk_buff *buf) 1805 { 1806 static u32 min_data_hdr_size[8] = { 1807 SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE, 1808 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE 1809 }; 1810 1811 struct tipc_msg *msg; 1812 u32 tipc_hdr[2]; 1813 u32 size; 1814 u32 hdr_size; 1815 u32 min_hdr_size; 1816 1817 if (unlikely(buf->len < MIN_H_SIZE)) 1818 return 0; 1819 1820 msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr); 1821 if (msg == NULL) 1822 return 0; 1823 1824 if (unlikely(msg_version(msg) != TIPC_VERSION)) 1825 return 0; 1826 1827 size = msg_size(msg); 1828 hdr_size = msg_hdr_sz(msg); 1829 min_hdr_size = msg_isdata(msg) ? 1830 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE; 1831 1832 if (unlikely((hdr_size < min_hdr_size) || 1833 (size < hdr_size) || 1834 (buf->len < size) || 1835 (size - hdr_size > TIPC_MAX_USER_MSG_SIZE))) 1836 return 0; 1837 1838 return pskb_may_pull(buf, hdr_size); 1839 } 1840 1841 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr) 1842 { 1843 read_lock_bh(&tipc_net_lock); 1844 while (head) { 1845 struct bearer *b_ptr = (struct bearer *)tb_ptr; 1846 struct tipc_node *n_ptr; 1847 struct link *l_ptr; 1848 struct sk_buff *crs; 1849 struct sk_buff *buf = head; 1850 struct tipc_msg *msg; 1851 u32 seq_no; 1852 u32 ackd; 1853 u32 released = 0; 1854 int type; 1855 1856 head = head->next; 1857 1858 /* Ensure message is well-formed */ 1859 1860 if (unlikely(!link_recv_buf_validate(buf))) 1861 goto cont; 1862 1863 /* Ensure message data is a single contiguous unit */ 1864 1865 if (unlikely(buf_linearize(buf))) { 1866 goto cont; 1867 } 1868 1869 /* Handle arrival of a non-unicast link message */ 1870 1871 msg = buf_msg(buf); 1872 1873 if (unlikely(msg_non_seq(msg))) { 1874 if (msg_user(msg) == LINK_CONFIG) 1875 tipc_disc_recv_msg(buf, b_ptr); 1876 else 1877 tipc_bclink_recv_pkt(buf); 1878 continue; 1879 } 1880 1881 if (unlikely(!msg_short(msg) && 1882 (msg_destnode(msg) != tipc_own_addr))) 1883 goto cont; 1884 1885 /* Locate unicast link endpoint that should handle message */ 1886 1887 n_ptr = tipc_node_find(msg_prevnode(msg)); 1888 if (unlikely(!n_ptr)) 1889 goto cont; 1890 tipc_node_lock(n_ptr); 1891 1892 l_ptr = n_ptr->links[b_ptr->identity]; 1893 if (unlikely(!l_ptr)) { 1894 tipc_node_unlock(n_ptr); 1895 goto cont; 1896 } 1897 1898 /* Validate message sequence number info */ 1899 1900 seq_no = msg_seqno(msg); 1901 ackd = msg_ack(msg); 1902 1903 /* Release acked messages */ 1904 1905 if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) { 1906 if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported) 1907 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1908 } 1909 1910 crs = l_ptr->first_out; 1911 while ((crs != l_ptr->next_out) && 1912 less_eq(msg_seqno(buf_msg(crs)), ackd)) { 1913 struct sk_buff *next = crs->next; 1914 1915 buf_discard(crs); 1916 crs = next; 1917 released++; 1918 } 1919 if (released) { 1920 l_ptr->first_out = crs; 1921 l_ptr->out_queue_size -= released; 1922 } 1923 1924 /* Try sending any messages link endpoint has pending */ 1925 1926 if (unlikely(l_ptr->next_out)) 1927 tipc_link_push_queue(l_ptr); 1928 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1929 tipc_link_wakeup_ports(l_ptr, 0); 1930 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { 1931 l_ptr->stats.sent_acks++; 1932 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1933 } 1934 1935 /* Now (finally!) process the incoming message */ 1936 1937 protocol_check: 1938 if (likely(link_working_working(l_ptr))) { 1939 if (likely(seq_no == mod(l_ptr->next_in_no))) { 1940 l_ptr->next_in_no++; 1941 if (unlikely(l_ptr->oldest_deferred_in)) 1942 head = link_insert_deferred_queue(l_ptr, 1943 head); 1944 if (likely(msg_is_dest(msg, tipc_own_addr))) { 1945 deliver: 1946 if (likely(msg_isdata(msg))) { 1947 tipc_node_unlock(n_ptr); 1948 tipc_port_recv_msg(buf); 1949 continue; 1950 } 1951 switch (msg_user(msg)) { 1952 case MSG_BUNDLER: 1953 l_ptr->stats.recv_bundles++; 1954 l_ptr->stats.recv_bundled += 1955 msg_msgcnt(msg); 1956 tipc_node_unlock(n_ptr); 1957 tipc_link_recv_bundle(buf); 1958 continue; 1959 case ROUTE_DISTRIBUTOR: 1960 tipc_node_unlock(n_ptr); 1961 tipc_cltr_recv_routing_table(buf); 1962 continue; 1963 case NAME_DISTRIBUTOR: 1964 tipc_node_unlock(n_ptr); 1965 tipc_named_recv(buf); 1966 continue; 1967 case CONN_MANAGER: 1968 tipc_node_unlock(n_ptr); 1969 tipc_port_recv_proto_msg(buf); 1970 continue; 1971 case MSG_FRAGMENTER: 1972 l_ptr->stats.recv_fragments++; 1973 if (tipc_link_recv_fragment(&l_ptr->defragm_buf, 1974 &buf, &msg)) { 1975 l_ptr->stats.recv_fragmented++; 1976 goto deliver; 1977 } 1978 break; 1979 case CHANGEOVER_PROTOCOL: 1980 type = msg_type(msg); 1981 if (link_recv_changeover_msg(&l_ptr, &buf)) { 1982 msg = buf_msg(buf); 1983 seq_no = msg_seqno(msg); 1984 if (type == ORIGINAL_MSG) 1985 goto deliver; 1986 goto protocol_check; 1987 } 1988 break; 1989 } 1990 } 1991 tipc_node_unlock(n_ptr); 1992 tipc_net_route_msg(buf); 1993 continue; 1994 } 1995 link_handle_out_of_seq_msg(l_ptr, buf); 1996 head = link_insert_deferred_queue(l_ptr, head); 1997 tipc_node_unlock(n_ptr); 1998 continue; 1999 } 2000 2001 if (msg_user(msg) == LINK_PROTOCOL) { 2002 link_recv_proto_msg(l_ptr, buf); 2003 head = link_insert_deferred_queue(l_ptr, head); 2004 tipc_node_unlock(n_ptr); 2005 continue; 2006 } 2007 msg_dbg(msg,"NSEQ<REC<"); 2008 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 2009 2010 if (link_working_working(l_ptr)) { 2011 /* Re-insert in front of queue */ 2012 msg_dbg(msg,"RECV-REINS:"); 2013 buf->next = head; 2014 head = buf; 2015 tipc_node_unlock(n_ptr); 2016 continue; 2017 } 2018 tipc_node_unlock(n_ptr); 2019 cont: 2020 buf_discard(buf); 2021 } 2022 read_unlock_bh(&tipc_net_lock); 2023 } 2024 2025 /* 2026 * link_defer_buf(): Sort a received out-of-sequence packet 2027 * into the deferred reception queue. 2028 * Returns the increase of the queue length,i.e. 0 or 1 2029 */ 2030 2031 u32 tipc_link_defer_pkt(struct sk_buff **head, 2032 struct sk_buff **tail, 2033 struct sk_buff *buf) 2034 { 2035 struct sk_buff *prev = NULL; 2036 struct sk_buff *crs = *head; 2037 u32 seq_no = msg_seqno(buf_msg(buf)); 2038 2039 buf->next = NULL; 2040 2041 /* Empty queue ? */ 2042 if (*head == NULL) { 2043 *head = *tail = buf; 2044 return 1; 2045 } 2046 2047 /* Last ? */ 2048 if (less(msg_seqno(buf_msg(*tail)), seq_no)) { 2049 (*tail)->next = buf; 2050 *tail = buf; 2051 return 1; 2052 } 2053 2054 /* Scan through queue and sort it in */ 2055 do { 2056 struct tipc_msg *msg = buf_msg(crs); 2057 2058 if (less(seq_no, msg_seqno(msg))) { 2059 buf->next = crs; 2060 if (prev) 2061 prev->next = buf; 2062 else 2063 *head = buf; 2064 return 1; 2065 } 2066 if (seq_no == msg_seqno(msg)) { 2067 break; 2068 } 2069 prev = crs; 2070 crs = crs->next; 2071 } 2072 while (crs); 2073 2074 /* Message is a duplicate of an existing message */ 2075 2076 buf_discard(buf); 2077 return 0; 2078 } 2079 2080 /** 2081 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet 2082 */ 2083 2084 static void link_handle_out_of_seq_msg(struct link *l_ptr, 2085 struct sk_buff *buf) 2086 { 2087 u32 seq_no = msg_seqno(buf_msg(buf)); 2088 2089 if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { 2090 link_recv_proto_msg(l_ptr, buf); 2091 return; 2092 } 2093 2094 dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n", 2095 seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no); 2096 2097 /* Record OOS packet arrival (force mismatch on next timeout) */ 2098 2099 l_ptr->checkpoint--; 2100 2101 /* 2102 * Discard packet if a duplicate; otherwise add it to deferred queue 2103 * and notify peer of gap as per protocol specification 2104 */ 2105 2106 if (less(seq_no, mod(l_ptr->next_in_no))) { 2107 l_ptr->stats.duplicates++; 2108 buf_discard(buf); 2109 return; 2110 } 2111 2112 if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, 2113 &l_ptr->newest_deferred_in, buf)) { 2114 l_ptr->deferred_inqueue_sz++; 2115 l_ptr->stats.deferred_recv++; 2116 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 2117 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 2118 } else 2119 l_ptr->stats.duplicates++; 2120 } 2121 2122 /* 2123 * Send protocol message to the other endpoint. 2124 */ 2125 void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg, 2126 u32 gap, u32 tolerance, u32 priority, u32 ack_mtu) 2127 { 2128 struct sk_buff *buf = NULL; 2129 struct tipc_msg *msg = l_ptr->pmsg; 2130 u32 msg_size = sizeof(l_ptr->proto_msg); 2131 2132 if (link_blocked(l_ptr)) 2133 return; 2134 msg_set_type(msg, msg_typ); 2135 msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); 2136 msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); 2137 msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); 2138 2139 if (msg_typ == STATE_MSG) { 2140 u32 next_sent = mod(l_ptr->next_out_no); 2141 2142 if (!tipc_link_is_up(l_ptr)) 2143 return; 2144 if (l_ptr->next_out) 2145 next_sent = msg_seqno(buf_msg(l_ptr->next_out)); 2146 msg_set_next_sent(msg, next_sent); 2147 if (l_ptr->oldest_deferred_in) { 2148 u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 2149 gap = mod(rec - mod(l_ptr->next_in_no)); 2150 } 2151 msg_set_seq_gap(msg, gap); 2152 if (gap) 2153 l_ptr->stats.sent_nacks++; 2154 msg_set_link_tolerance(msg, tolerance); 2155 msg_set_linkprio(msg, priority); 2156 msg_set_max_pkt(msg, ack_mtu); 2157 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 2158 msg_set_probe(msg, probe_msg != 0); 2159 if (probe_msg) { 2160 u32 mtu = l_ptr->max_pkt; 2161 2162 if ((mtu < l_ptr->max_pkt_target) && 2163 link_working_working(l_ptr) && 2164 l_ptr->fsm_msg_cnt) { 2165 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; 2166 if (l_ptr->max_pkt_probes == 10) { 2167 l_ptr->max_pkt_target = (msg_size - 4); 2168 l_ptr->max_pkt_probes = 0; 2169 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; 2170 } 2171 l_ptr->max_pkt_probes++; 2172 } 2173 2174 l_ptr->stats.sent_probes++; 2175 } 2176 l_ptr->stats.sent_states++; 2177 } else { /* RESET_MSG or ACTIVATE_MSG */ 2178 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1)); 2179 msg_set_seq_gap(msg, 0); 2180 msg_set_next_sent(msg, 1); 2181 msg_set_link_tolerance(msg, l_ptr->tolerance); 2182 msg_set_linkprio(msg, l_ptr->priority); 2183 msg_set_max_pkt(msg, l_ptr->max_pkt_target); 2184 } 2185 2186 if (tipc_node_has_redundant_links(l_ptr->owner)) { 2187 msg_set_redundant_link(msg); 2188 } else { 2189 msg_clear_redundant_link(msg); 2190 } 2191 msg_set_linkprio(msg, l_ptr->priority); 2192 2193 /* Ensure sequence number will not fit : */ 2194 2195 msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2))); 2196 2197 /* Congestion? */ 2198 2199 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 2200 if (!l_ptr->proto_msg_queue) { 2201 l_ptr->proto_msg_queue = 2202 buf_acquire(sizeof(l_ptr->proto_msg)); 2203 } 2204 buf = l_ptr->proto_msg_queue; 2205 if (!buf) 2206 return; 2207 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 2208 return; 2209 } 2210 msg_set_timestamp(msg, jiffies_to_msecs(jiffies)); 2211 2212 /* Message can be sent */ 2213 2214 msg_dbg(msg, ">>"); 2215 2216 buf = buf_acquire(msg_size); 2217 if (!buf) 2218 return; 2219 2220 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 2221 msg_set_size(buf_msg(buf), msg_size); 2222 2223 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 2224 l_ptr->unacked_window = 0; 2225 buf_discard(buf); 2226 return; 2227 } 2228 2229 /* New congestion */ 2230 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 2231 l_ptr->proto_msg_queue = buf; 2232 l_ptr->stats.bearer_congs++; 2233 } 2234 2235 /* 2236 * Receive protocol message : 2237 * Note that network plane id propagates through the network, and may 2238 * change at any time. The node with lowest address rules 2239 */ 2240 2241 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf) 2242 { 2243 u32 rec_gap = 0; 2244 u32 max_pkt_info; 2245 u32 max_pkt_ack; 2246 u32 msg_tol; 2247 struct tipc_msg *msg = buf_msg(buf); 2248 2249 dbg("AT(%u):", jiffies_to_msecs(jiffies)); 2250 msg_dbg(msg, "<<"); 2251 if (link_blocked(l_ptr)) 2252 goto exit; 2253 2254 /* record unnumbered packet arrival (force mismatch on next timeout) */ 2255 2256 l_ptr->checkpoint--; 2257 2258 if (l_ptr->b_ptr->net_plane != msg_net_plane(msg)) 2259 if (tipc_own_addr > msg_prevnode(msg)) 2260 l_ptr->b_ptr->net_plane = msg_net_plane(msg); 2261 2262 l_ptr->owner->permit_changeover = msg_redundant_link(msg); 2263 2264 switch (msg_type(msg)) { 2265 2266 case RESET_MSG: 2267 if (!link_working_unknown(l_ptr) && 2268 (l_ptr->peer_session != INVALID_SESSION)) { 2269 if (msg_session(msg) == l_ptr->peer_session) { 2270 dbg("Duplicate RESET: %u<->%u\n", 2271 msg_session(msg), l_ptr->peer_session); 2272 break; /* duplicate: ignore */ 2273 } 2274 } 2275 /* fall thru' */ 2276 case ACTIVATE_MSG: 2277 /* Update link settings according other endpoint's values */ 2278 2279 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg)); 2280 2281 if ((msg_tol = msg_link_tolerance(msg)) && 2282 (msg_tol > l_ptr->tolerance)) 2283 link_set_supervision_props(l_ptr, msg_tol); 2284 2285 if (msg_linkprio(msg) > l_ptr->priority) 2286 l_ptr->priority = msg_linkprio(msg); 2287 2288 max_pkt_info = msg_max_pkt(msg); 2289 if (max_pkt_info) { 2290 if (max_pkt_info < l_ptr->max_pkt_target) 2291 l_ptr->max_pkt_target = max_pkt_info; 2292 if (l_ptr->max_pkt > l_ptr->max_pkt_target) 2293 l_ptr->max_pkt = l_ptr->max_pkt_target; 2294 } else { 2295 l_ptr->max_pkt = l_ptr->max_pkt_target; 2296 } 2297 l_ptr->owner->bclink.supported = (max_pkt_info != 0); 2298 2299 link_state_event(l_ptr, msg_type(msg)); 2300 2301 l_ptr->peer_session = msg_session(msg); 2302 l_ptr->peer_bearer_id = msg_bearer_id(msg); 2303 2304 /* Synchronize broadcast sequence numbers */ 2305 if (!tipc_node_has_redundant_links(l_ptr->owner)) { 2306 l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg)); 2307 } 2308 break; 2309 case STATE_MSG: 2310 2311 if ((msg_tol = msg_link_tolerance(msg))) 2312 link_set_supervision_props(l_ptr, msg_tol); 2313 2314 if (msg_linkprio(msg) && 2315 (msg_linkprio(msg) != l_ptr->priority)) { 2316 warn("Resetting link <%s>, priority change %u->%u\n", 2317 l_ptr->name, l_ptr->priority, msg_linkprio(msg)); 2318 l_ptr->priority = msg_linkprio(msg); 2319 tipc_link_reset(l_ptr); /* Enforce change to take effect */ 2320 break; 2321 } 2322 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 2323 l_ptr->stats.recv_states++; 2324 if (link_reset_unknown(l_ptr)) 2325 break; 2326 2327 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) { 2328 rec_gap = mod(msg_next_sent(msg) - 2329 mod(l_ptr->next_in_no)); 2330 } 2331 2332 max_pkt_ack = msg_max_pkt(msg); 2333 if (max_pkt_ack > l_ptr->max_pkt) { 2334 dbg("Link <%s> updated MTU %u -> %u\n", 2335 l_ptr->name, l_ptr->max_pkt, max_pkt_ack); 2336 l_ptr->max_pkt = max_pkt_ack; 2337 l_ptr->max_pkt_probes = 0; 2338 } 2339 2340 max_pkt_ack = 0; 2341 if (msg_probe(msg)) { 2342 l_ptr->stats.recv_probes++; 2343 if (msg_size(msg) > sizeof(l_ptr->proto_msg)) { 2344 max_pkt_ack = msg_size(msg); 2345 } 2346 } 2347 2348 /* Protocol message before retransmits, reduce loss risk */ 2349 2350 tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg)); 2351 2352 if (rec_gap || (msg_probe(msg))) { 2353 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2354 0, rec_gap, 0, 0, max_pkt_ack); 2355 } 2356 if (msg_seq_gap(msg)) { 2357 msg_dbg(msg, "With Gap:"); 2358 l_ptr->stats.recv_nacks++; 2359 tipc_link_retransmit(l_ptr, l_ptr->first_out, 2360 msg_seq_gap(msg)); 2361 } 2362 break; 2363 default: 2364 msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<"); 2365 } 2366 exit: 2367 buf_discard(buf); 2368 } 2369 2370 2371 /* 2372 * tipc_link_tunnel(): Send one message via a link belonging to 2373 * another bearer. Owner node is locked. 2374 */ 2375 void tipc_link_tunnel(struct link *l_ptr, 2376 struct tipc_msg *tunnel_hdr, 2377 struct tipc_msg *msg, 2378 u32 selector) 2379 { 2380 struct link *tunnel; 2381 struct sk_buff *buf; 2382 u32 length = msg_size(msg); 2383 2384 tunnel = l_ptr->owner->active_links[selector & 1]; 2385 if (!tipc_link_is_up(tunnel)) { 2386 warn("Link changeover error, " 2387 "tunnel link no longer available\n"); 2388 return; 2389 } 2390 msg_set_size(tunnel_hdr, length + INT_H_SIZE); 2391 buf = buf_acquire(length + INT_H_SIZE); 2392 if (!buf) { 2393 warn("Link changeover error, " 2394 "unable to send tunnel msg\n"); 2395 return; 2396 } 2397 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 2398 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 2399 dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane); 2400 msg_dbg(buf_msg(buf), ">SEND>"); 2401 tipc_link_send_buf(tunnel, buf); 2402 } 2403 2404 2405 2406 /* 2407 * changeover(): Send whole message queue via the remaining link 2408 * Owner node is locked. 2409 */ 2410 2411 void tipc_link_changeover(struct link *l_ptr) 2412 { 2413 u32 msgcount = l_ptr->out_queue_size; 2414 struct sk_buff *crs = l_ptr->first_out; 2415 struct link *tunnel = l_ptr->owner->active_links[0]; 2416 struct tipc_msg tunnel_hdr; 2417 int split_bundles; 2418 2419 if (!tunnel) 2420 return; 2421 2422 if (!l_ptr->owner->permit_changeover) { 2423 warn("Link changeover error, " 2424 "peer did not permit changeover\n"); 2425 return; 2426 } 2427 2428 msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 2429 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 2430 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 2431 msg_set_msgcnt(&tunnel_hdr, msgcount); 2432 dbg("Link changeover requires %u tunnel messages\n", msgcount); 2433 2434 if (!l_ptr->first_out) { 2435 struct sk_buff *buf; 2436 2437 buf = buf_acquire(INT_H_SIZE); 2438 if (buf) { 2439 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); 2440 msg_set_size(&tunnel_hdr, INT_H_SIZE); 2441 dbg("%c->%c:", l_ptr->b_ptr->net_plane, 2442 tunnel->b_ptr->net_plane); 2443 msg_dbg(&tunnel_hdr, "EMPTY>SEND>"); 2444 tipc_link_send_buf(tunnel, buf); 2445 } else { 2446 warn("Link changeover error, " 2447 "unable to send changeover msg\n"); 2448 } 2449 return; 2450 } 2451 2452 split_bundles = (l_ptr->owner->active_links[0] != 2453 l_ptr->owner->active_links[1]); 2454 2455 while (crs) { 2456 struct tipc_msg *msg = buf_msg(crs); 2457 2458 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 2459 struct tipc_msg *m = msg_get_wrapped(msg); 2460 unchar* pos = (unchar*)m; 2461 2462 msgcount = msg_msgcnt(msg); 2463 while (msgcount--) { 2464 msg_set_seqno(m,msg_seqno(msg)); 2465 tipc_link_tunnel(l_ptr, &tunnel_hdr, m, 2466 msg_link_selector(m)); 2467 pos += align(msg_size(m)); 2468 m = (struct tipc_msg *)pos; 2469 } 2470 } else { 2471 tipc_link_tunnel(l_ptr, &tunnel_hdr, msg, 2472 msg_link_selector(msg)); 2473 } 2474 crs = crs->next; 2475 } 2476 } 2477 2478 void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel) 2479 { 2480 struct sk_buff *iter; 2481 struct tipc_msg tunnel_hdr; 2482 2483 msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 2484 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 2485 msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); 2486 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 2487 iter = l_ptr->first_out; 2488 while (iter) { 2489 struct sk_buff *outbuf; 2490 struct tipc_msg *msg = buf_msg(iter); 2491 u32 length = msg_size(msg); 2492 2493 if (msg_user(msg) == MSG_BUNDLER) 2494 msg_set_type(msg, CLOSED_MSG); 2495 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 2496 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 2497 msg_set_size(&tunnel_hdr, length + INT_H_SIZE); 2498 outbuf = buf_acquire(length + INT_H_SIZE); 2499 if (outbuf == NULL) { 2500 warn("Link changeover error, " 2501 "unable to send duplicate msg\n"); 2502 return; 2503 } 2504 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 2505 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 2506 length); 2507 dbg("%c->%c:", l_ptr->b_ptr->net_plane, 2508 tunnel->b_ptr->net_plane); 2509 msg_dbg(buf_msg(outbuf), ">SEND>"); 2510 tipc_link_send_buf(tunnel, outbuf); 2511 if (!tipc_link_is_up(l_ptr)) 2512 return; 2513 iter = iter->next; 2514 } 2515 } 2516 2517 2518 2519 /** 2520 * buf_extract - extracts embedded TIPC message from another message 2521 * @skb: encapsulating message buffer 2522 * @from_pos: offset to extract from 2523 * 2524 * Returns a new message buffer containing an embedded message. The 2525 * encapsulating message itself is left unchanged. 2526 */ 2527 2528 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) 2529 { 2530 struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos); 2531 u32 size = msg_size(msg); 2532 struct sk_buff *eb; 2533 2534 eb = buf_acquire(size); 2535 if (eb) 2536 skb_copy_to_linear_data(eb, msg, size); 2537 return eb; 2538 } 2539 2540 /* 2541 * link_recv_changeover_msg(): Receive tunneled packet sent 2542 * via other link. Node is locked. Return extracted buffer. 2543 */ 2544 2545 static int link_recv_changeover_msg(struct link **l_ptr, 2546 struct sk_buff **buf) 2547 { 2548 struct sk_buff *tunnel_buf = *buf; 2549 struct link *dest_link; 2550 struct tipc_msg *msg; 2551 struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf); 2552 u32 msg_typ = msg_type(tunnel_msg); 2553 u32 msg_count = msg_msgcnt(tunnel_msg); 2554 2555 dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)]; 2556 if (!dest_link) { 2557 msg_dbg(tunnel_msg, "NOLINK/<REC<"); 2558 goto exit; 2559 } 2560 if (dest_link == *l_ptr) { 2561 err("Unexpected changeover message on link <%s>\n", 2562 (*l_ptr)->name); 2563 goto exit; 2564 } 2565 dbg("%c<-%c:", dest_link->b_ptr->net_plane, 2566 (*l_ptr)->b_ptr->net_plane); 2567 *l_ptr = dest_link; 2568 msg = msg_get_wrapped(tunnel_msg); 2569 2570 if (msg_typ == DUPLICATE_MSG) { 2571 if (less(msg_seqno(msg), mod(dest_link->next_in_no))) { 2572 msg_dbg(tunnel_msg, "DROP/<REC<"); 2573 goto exit; 2574 } 2575 *buf = buf_extract(tunnel_buf,INT_H_SIZE); 2576 if (*buf == NULL) { 2577 warn("Link changeover error, duplicate msg dropped\n"); 2578 goto exit; 2579 } 2580 msg_dbg(tunnel_msg, "TNL<REC<"); 2581 buf_discard(tunnel_buf); 2582 return 1; 2583 } 2584 2585 /* First original message ?: */ 2586 2587 if (tipc_link_is_up(dest_link)) { 2588 msg_dbg(tunnel_msg, "UP/FIRST/<REC<"); 2589 info("Resetting link <%s>, changeover initiated by peer\n", 2590 dest_link->name); 2591 tipc_link_reset(dest_link); 2592 dest_link->exp_msg_count = msg_count; 2593 dbg("Expecting %u tunnelled messages\n", msg_count); 2594 if (!msg_count) 2595 goto exit; 2596 } else if (dest_link->exp_msg_count == START_CHANGEOVER) { 2597 msg_dbg(tunnel_msg, "BLK/FIRST/<REC<"); 2598 dest_link->exp_msg_count = msg_count; 2599 dbg("Expecting %u tunnelled messages\n", msg_count); 2600 if (!msg_count) 2601 goto exit; 2602 } 2603 2604 /* Receive original message */ 2605 2606 if (dest_link->exp_msg_count == 0) { 2607 warn("Link switchover error, " 2608 "got too many tunnelled messages\n"); 2609 msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<"); 2610 dbg_print_link(dest_link, "LINK:"); 2611 goto exit; 2612 } 2613 dest_link->exp_msg_count--; 2614 if (less(msg_seqno(msg), dest_link->reset_checkpoint)) { 2615 msg_dbg(tunnel_msg, "DROP/DUPL/<REC<"); 2616 goto exit; 2617 } else { 2618 *buf = buf_extract(tunnel_buf, INT_H_SIZE); 2619 if (*buf != NULL) { 2620 msg_dbg(tunnel_msg, "TNL<REC<"); 2621 buf_discard(tunnel_buf); 2622 return 1; 2623 } else { 2624 warn("Link changeover error, original msg dropped\n"); 2625 } 2626 } 2627 exit: 2628 *buf = NULL; 2629 buf_discard(tunnel_buf); 2630 return 0; 2631 } 2632 2633 /* 2634 * Bundler functionality: 2635 */ 2636 void tipc_link_recv_bundle(struct sk_buff *buf) 2637 { 2638 u32 msgcount = msg_msgcnt(buf_msg(buf)); 2639 u32 pos = INT_H_SIZE; 2640 struct sk_buff *obuf; 2641 2642 msg_dbg(buf_msg(buf), "<BNDL<: "); 2643 while (msgcount--) { 2644 obuf = buf_extract(buf, pos); 2645 if (obuf == NULL) { 2646 warn("Link unable to unbundle message(s)\n"); 2647 break; 2648 } 2649 pos += align(msg_size(buf_msg(obuf))); 2650 msg_dbg(buf_msg(obuf), " /"); 2651 tipc_net_route_msg(obuf); 2652 } 2653 buf_discard(buf); 2654 } 2655 2656 /* 2657 * Fragmentation/defragmentation: 2658 */ 2659 2660 2661 /* 2662 * tipc_link_send_long_buf: Entry for buffers needing fragmentation. 2663 * The buffer is complete, inclusive total message length. 2664 * Returns user data length. 2665 */ 2666 int tipc_link_send_long_buf(struct link *l_ptr, struct sk_buff *buf) 2667 { 2668 struct tipc_msg *inmsg = buf_msg(buf); 2669 struct tipc_msg fragm_hdr; 2670 u32 insize = msg_size(inmsg); 2671 u32 dsz = msg_data_sz(inmsg); 2672 unchar *crs = buf->data; 2673 u32 rest = insize; 2674 u32 pack_sz = link_max_pkt(l_ptr); 2675 u32 fragm_sz = pack_sz - INT_H_SIZE; 2676 u32 fragm_no = 1; 2677 u32 destaddr; 2678 2679 if (msg_short(inmsg)) 2680 destaddr = l_ptr->addr; 2681 else 2682 destaddr = msg_destnode(inmsg); 2683 2684 if (msg_routed(inmsg)) 2685 msg_set_prevnode(inmsg, tipc_own_addr); 2686 2687 /* Prepare reusable fragment header: */ 2688 2689 msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 2690 INT_H_SIZE, destaddr); 2691 msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg)); 2692 msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++)); 2693 msg_set_fragm_no(&fragm_hdr, fragm_no); 2694 l_ptr->stats.sent_fragmented++; 2695 2696 /* Chop up message: */ 2697 2698 while (rest > 0) { 2699 struct sk_buff *fragm; 2700 2701 if (rest <= fragm_sz) { 2702 fragm_sz = rest; 2703 msg_set_type(&fragm_hdr, LAST_FRAGMENT); 2704 } 2705 fragm = buf_acquire(fragm_sz + INT_H_SIZE); 2706 if (fragm == NULL) { 2707 warn("Link unable to fragment message\n"); 2708 dsz = -ENOMEM; 2709 goto exit; 2710 } 2711 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 2712 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); 2713 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, 2714 fragm_sz); 2715 /* Send queued messages first, if any: */ 2716 2717 l_ptr->stats.sent_fragments++; 2718 tipc_link_send_buf(l_ptr, fragm); 2719 if (!tipc_link_is_up(l_ptr)) 2720 return dsz; 2721 msg_set_fragm_no(&fragm_hdr, ++fragm_no); 2722 rest -= fragm_sz; 2723 crs += fragm_sz; 2724 msg_set_type(&fragm_hdr, FRAGMENT); 2725 } 2726 exit: 2727 buf_discard(buf); 2728 return dsz; 2729 } 2730 2731 /* 2732 * A pending message being re-assembled must store certain values 2733 * to handle subsequent fragments correctly. The following functions 2734 * help storing these values in unused, available fields in the 2735 * pending message. This makes dynamic memory allocation unecessary. 2736 */ 2737 2738 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno) 2739 { 2740 msg_set_seqno(buf_msg(buf), seqno); 2741 } 2742 2743 static u32 get_fragm_size(struct sk_buff *buf) 2744 { 2745 return msg_ack(buf_msg(buf)); 2746 } 2747 2748 static void set_fragm_size(struct sk_buff *buf, u32 sz) 2749 { 2750 msg_set_ack(buf_msg(buf), sz); 2751 } 2752 2753 static u32 get_expected_frags(struct sk_buff *buf) 2754 { 2755 return msg_bcast_ack(buf_msg(buf)); 2756 } 2757 2758 static void set_expected_frags(struct sk_buff *buf, u32 exp) 2759 { 2760 msg_set_bcast_ack(buf_msg(buf), exp); 2761 } 2762 2763 static u32 get_timer_cnt(struct sk_buff *buf) 2764 { 2765 return msg_reroute_cnt(buf_msg(buf)); 2766 } 2767 2768 static void incr_timer_cnt(struct sk_buff *buf) 2769 { 2770 msg_incr_reroute_cnt(buf_msg(buf)); 2771 } 2772 2773 /* 2774 * tipc_link_recv_fragment(): Called with node lock on. Returns 2775 * the reassembled buffer if message is complete. 2776 */ 2777 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, 2778 struct tipc_msg **m) 2779 { 2780 struct sk_buff *prev = NULL; 2781 struct sk_buff *fbuf = *fb; 2782 struct tipc_msg *fragm = buf_msg(fbuf); 2783 struct sk_buff *pbuf = *pending; 2784 u32 long_msg_seq_no = msg_long_msgno(fragm); 2785 2786 *fb = NULL; 2787 msg_dbg(fragm,"FRG<REC<"); 2788 2789 /* Is there an incomplete message waiting for this fragment? */ 2790 2791 while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) 2792 || (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) { 2793 prev = pbuf; 2794 pbuf = pbuf->next; 2795 } 2796 2797 if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) { 2798 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm); 2799 u32 msg_sz = msg_size(imsg); 2800 u32 fragm_sz = msg_data_sz(fragm); 2801 u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz); 2802 u32 max = TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE; 2803 if (msg_type(imsg) == TIPC_MCAST_MSG) 2804 max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE; 2805 if (msg_size(imsg) > max) { 2806 msg_dbg(fragm,"<REC<Oversized: "); 2807 buf_discard(fbuf); 2808 return 0; 2809 } 2810 pbuf = buf_acquire(msg_size(imsg)); 2811 if (pbuf != NULL) { 2812 pbuf->next = *pending; 2813 *pending = pbuf; 2814 skb_copy_to_linear_data(pbuf, imsg, 2815 msg_data_sz(fragm)); 2816 /* Prepare buffer for subsequent fragments. */ 2817 2818 set_long_msg_seqno(pbuf, long_msg_seq_no); 2819 set_fragm_size(pbuf,fragm_sz); 2820 set_expected_frags(pbuf,exp_fragm_cnt - 1); 2821 } else { 2822 warn("Link unable to reassemble fragmented message\n"); 2823 } 2824 buf_discard(fbuf); 2825 return 0; 2826 } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) { 2827 u32 dsz = msg_data_sz(fragm); 2828 u32 fsz = get_fragm_size(pbuf); 2829 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz); 2830 u32 exp_frags = get_expected_frags(pbuf) - 1; 2831 skb_copy_to_linear_data_offset(pbuf, crs, 2832 msg_data(fragm), dsz); 2833 buf_discard(fbuf); 2834 2835 /* Is message complete? */ 2836 2837 if (exp_frags == 0) { 2838 if (prev) 2839 prev->next = pbuf->next; 2840 else 2841 *pending = pbuf->next; 2842 msg_reset_reroute_cnt(buf_msg(pbuf)); 2843 *fb = pbuf; 2844 *m = buf_msg(pbuf); 2845 return 1; 2846 } 2847 set_expected_frags(pbuf,exp_frags); 2848 return 0; 2849 } 2850 dbg(" Discarding orphan fragment %x\n",fbuf); 2851 msg_dbg(fragm,"ORPHAN:"); 2852 dbg("Pending long buffers:\n"); 2853 dbg_print_buf_chain(*pending); 2854 buf_discard(fbuf); 2855 return 0; 2856 } 2857 2858 /** 2859 * link_check_defragm_bufs - flush stale incoming message fragments 2860 * @l_ptr: pointer to link 2861 */ 2862 2863 static void link_check_defragm_bufs(struct link *l_ptr) 2864 { 2865 struct sk_buff *prev = NULL; 2866 struct sk_buff *next = NULL; 2867 struct sk_buff *buf = l_ptr->defragm_buf; 2868 2869 if (!buf) 2870 return; 2871 if (!link_working_working(l_ptr)) 2872 return; 2873 while (buf) { 2874 u32 cnt = get_timer_cnt(buf); 2875 2876 next = buf->next; 2877 if (cnt < 4) { 2878 incr_timer_cnt(buf); 2879 prev = buf; 2880 } else { 2881 dbg(" Discarding incomplete long buffer\n"); 2882 msg_dbg(buf_msg(buf), "LONG:"); 2883 dbg_print_link(l_ptr, "curr:"); 2884 dbg("Pending long buffers:\n"); 2885 dbg_print_buf_chain(l_ptr->defragm_buf); 2886 if (prev) 2887 prev->next = buf->next; 2888 else 2889 l_ptr->defragm_buf = buf->next; 2890 buf_discard(buf); 2891 } 2892 buf = next; 2893 } 2894 } 2895 2896 2897 2898 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance) 2899 { 2900 l_ptr->tolerance = tolerance; 2901 l_ptr->continuity_interval = 2902 ((tolerance / 4) > 500) ? 500 : tolerance / 4; 2903 l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4); 2904 } 2905 2906 2907 void tipc_link_set_queue_limits(struct link *l_ptr, u32 window) 2908 { 2909 /* Data messages from this node, inclusive FIRST_FRAGM */ 2910 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window; 2911 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4; 2912 l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5; 2913 l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6; 2914 /* Transiting data messages,inclusive FIRST_FRAGM */ 2915 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300; 2916 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600; 2917 l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900; 2918 l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200; 2919 l_ptr->queue_limit[CONN_MANAGER] = 1200; 2920 l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200; 2921 l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500; 2922 l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000; 2923 /* FRAGMENT and LAST_FRAGMENT packets */ 2924 l_ptr->queue_limit[MSG_FRAGMENTER] = 4000; 2925 } 2926 2927 /** 2928 * link_find_link - locate link by name 2929 * @name - ptr to link name string 2930 * @node - ptr to area to be filled with ptr to associated node 2931 * 2932 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted; 2933 * this also prevents link deletion. 2934 * 2935 * Returns pointer to link (or 0 if invalid link name). 2936 */ 2937 2938 static struct link *link_find_link(const char *name, struct tipc_node **node) 2939 { 2940 struct link_name link_name_parts; 2941 struct bearer *b_ptr; 2942 struct link *l_ptr; 2943 2944 if (!link_name_validate(name, &link_name_parts)) 2945 return NULL; 2946 2947 b_ptr = tipc_bearer_find_interface(link_name_parts.if_local); 2948 if (!b_ptr) 2949 return NULL; 2950 2951 *node = tipc_node_find(link_name_parts.addr_peer); 2952 if (!*node) 2953 return NULL; 2954 2955 l_ptr = (*node)->links[b_ptr->identity]; 2956 if (!l_ptr || strcmp(l_ptr->name, name)) 2957 return NULL; 2958 2959 return l_ptr; 2960 } 2961 2962 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space, 2963 u16 cmd) 2964 { 2965 struct tipc_link_config *args; 2966 u32 new_value; 2967 struct link *l_ptr; 2968 struct tipc_node *node; 2969 int res; 2970 2971 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG)) 2972 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2973 2974 args = (struct tipc_link_config *)TLV_DATA(req_tlv_area); 2975 new_value = ntohl(args->value); 2976 2977 if (!strcmp(args->name, tipc_bclink_name)) { 2978 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) && 2979 (tipc_bclink_set_queue_limits(new_value) == 0)) 2980 return tipc_cfg_reply_none(); 2981 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED 2982 " (cannot change setting on broadcast link)"); 2983 } 2984 2985 read_lock_bh(&tipc_net_lock); 2986 l_ptr = link_find_link(args->name, &node); 2987 if (!l_ptr) { 2988 read_unlock_bh(&tipc_net_lock); 2989 return tipc_cfg_reply_error_string("link not found"); 2990 } 2991 2992 tipc_node_lock(node); 2993 res = -EINVAL; 2994 switch (cmd) { 2995 case TIPC_CMD_SET_LINK_TOL: 2996 if ((new_value >= TIPC_MIN_LINK_TOL) && 2997 (new_value <= TIPC_MAX_LINK_TOL)) { 2998 link_set_supervision_props(l_ptr, new_value); 2999 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 3000 0, 0, new_value, 0, 0); 3001 res = 0; 3002 } 3003 break; 3004 case TIPC_CMD_SET_LINK_PRI: 3005 if ((new_value >= TIPC_MIN_LINK_PRI) && 3006 (new_value <= TIPC_MAX_LINK_PRI)) { 3007 l_ptr->priority = new_value; 3008 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 3009 0, 0, 0, new_value, 0); 3010 res = 0; 3011 } 3012 break; 3013 case TIPC_CMD_SET_LINK_WINDOW: 3014 if ((new_value >= TIPC_MIN_LINK_WIN) && 3015 (new_value <= TIPC_MAX_LINK_WIN)) { 3016 tipc_link_set_queue_limits(l_ptr, new_value); 3017 res = 0; 3018 } 3019 break; 3020 } 3021 tipc_node_unlock(node); 3022 3023 read_unlock_bh(&tipc_net_lock); 3024 if (res) 3025 return tipc_cfg_reply_error_string("cannot change link setting"); 3026 3027 return tipc_cfg_reply_none(); 3028 } 3029 3030 /** 3031 * link_reset_statistics - reset link statistics 3032 * @l_ptr: pointer to link 3033 */ 3034 3035 static void link_reset_statistics(struct link *l_ptr) 3036 { 3037 memset(&l_ptr->stats, 0, sizeof(l_ptr->stats)); 3038 l_ptr->stats.sent_info = l_ptr->next_out_no; 3039 l_ptr->stats.recv_info = l_ptr->next_in_no; 3040 } 3041 3042 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space) 3043 { 3044 char *link_name; 3045 struct link *l_ptr; 3046 struct tipc_node *node; 3047 3048 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 3049 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 3050 3051 link_name = (char *)TLV_DATA(req_tlv_area); 3052 if (!strcmp(link_name, tipc_bclink_name)) { 3053 if (tipc_bclink_reset_stats()) 3054 return tipc_cfg_reply_error_string("link not found"); 3055 return tipc_cfg_reply_none(); 3056 } 3057 3058 read_lock_bh(&tipc_net_lock); 3059 l_ptr = link_find_link(link_name, &node); 3060 if (!l_ptr) { 3061 read_unlock_bh(&tipc_net_lock); 3062 return tipc_cfg_reply_error_string("link not found"); 3063 } 3064 3065 tipc_node_lock(node); 3066 link_reset_statistics(l_ptr); 3067 tipc_node_unlock(node); 3068 read_unlock_bh(&tipc_net_lock); 3069 return tipc_cfg_reply_none(); 3070 } 3071 3072 /** 3073 * percent - convert count to a percentage of total (rounding up or down) 3074 */ 3075 3076 static u32 percent(u32 count, u32 total) 3077 { 3078 return (count * 100 + (total / 2)) / total; 3079 } 3080 3081 /** 3082 * tipc_link_stats - print link statistics 3083 * @name: link name 3084 * @buf: print buffer area 3085 * @buf_size: size of print buffer area 3086 * 3087 * Returns length of print buffer data string (or 0 if error) 3088 */ 3089 3090 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size) 3091 { 3092 struct print_buf pb; 3093 struct link *l_ptr; 3094 struct tipc_node *node; 3095 char *status; 3096 u32 profile_total = 0; 3097 3098 if (!strcmp(name, tipc_bclink_name)) 3099 return tipc_bclink_stats(buf, buf_size); 3100 3101 tipc_printbuf_init(&pb, buf, buf_size); 3102 3103 read_lock_bh(&tipc_net_lock); 3104 l_ptr = link_find_link(name, &node); 3105 if (!l_ptr) { 3106 read_unlock_bh(&tipc_net_lock); 3107 return 0; 3108 } 3109 tipc_node_lock(node); 3110 3111 if (tipc_link_is_active(l_ptr)) 3112 status = "ACTIVE"; 3113 else if (tipc_link_is_up(l_ptr)) 3114 status = "STANDBY"; 3115 else 3116 status = "DEFUNCT"; 3117 tipc_printf(&pb, "Link <%s>\n" 3118 " %s MTU:%u Priority:%u Tolerance:%u ms" 3119 " Window:%u packets\n", 3120 l_ptr->name, status, link_max_pkt(l_ptr), 3121 l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]); 3122 tipc_printf(&pb, " RX packets:%u fragments:%u/%u bundles:%u/%u\n", 3123 l_ptr->next_in_no - l_ptr->stats.recv_info, 3124 l_ptr->stats.recv_fragments, 3125 l_ptr->stats.recv_fragmented, 3126 l_ptr->stats.recv_bundles, 3127 l_ptr->stats.recv_bundled); 3128 tipc_printf(&pb, " TX packets:%u fragments:%u/%u bundles:%u/%u\n", 3129 l_ptr->next_out_no - l_ptr->stats.sent_info, 3130 l_ptr->stats.sent_fragments, 3131 l_ptr->stats.sent_fragmented, 3132 l_ptr->stats.sent_bundles, 3133 l_ptr->stats.sent_bundled); 3134 profile_total = l_ptr->stats.msg_length_counts; 3135 if (!profile_total) 3136 profile_total = 1; 3137 tipc_printf(&pb, " TX profile sample:%u packets average:%u octets\n" 3138 " 0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% " 3139 "-16354:%u%% -32768:%u%% -66000:%u%%\n", 3140 l_ptr->stats.msg_length_counts, 3141 l_ptr->stats.msg_lengths_total / profile_total, 3142 percent(l_ptr->stats.msg_length_profile[0], profile_total), 3143 percent(l_ptr->stats.msg_length_profile[1], profile_total), 3144 percent(l_ptr->stats.msg_length_profile[2], profile_total), 3145 percent(l_ptr->stats.msg_length_profile[3], profile_total), 3146 percent(l_ptr->stats.msg_length_profile[4], profile_total), 3147 percent(l_ptr->stats.msg_length_profile[5], profile_total), 3148 percent(l_ptr->stats.msg_length_profile[6], profile_total)); 3149 tipc_printf(&pb, " RX states:%u probes:%u naks:%u defs:%u dups:%u\n", 3150 l_ptr->stats.recv_states, 3151 l_ptr->stats.recv_probes, 3152 l_ptr->stats.recv_nacks, 3153 l_ptr->stats.deferred_recv, 3154 l_ptr->stats.duplicates); 3155 tipc_printf(&pb, " TX states:%u probes:%u naks:%u acks:%u dups:%u\n", 3156 l_ptr->stats.sent_states, 3157 l_ptr->stats.sent_probes, 3158 l_ptr->stats.sent_nacks, 3159 l_ptr->stats.sent_acks, 3160 l_ptr->stats.retransmitted); 3161 tipc_printf(&pb, " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", 3162 l_ptr->stats.bearer_congs, 3163 l_ptr->stats.link_congs, 3164 l_ptr->stats.max_queue_sz, 3165 l_ptr->stats.queue_sz_counts 3166 ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts) 3167 : 0); 3168 3169 tipc_node_unlock(node); 3170 read_unlock_bh(&tipc_net_lock); 3171 return tipc_printbuf_validate(&pb); 3172 } 3173 3174 #define MAX_LINK_STATS_INFO 2000 3175 3176 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space) 3177 { 3178 struct sk_buff *buf; 3179 struct tlv_desc *rep_tlv; 3180 int str_len; 3181 3182 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 3183 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 3184 3185 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO)); 3186 if (!buf) 3187 return NULL; 3188 3189 rep_tlv = (struct tlv_desc *)buf->data; 3190 3191 str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area), 3192 (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO); 3193 if (!str_len) { 3194 buf_discard(buf); 3195 return tipc_cfg_reply_error_string("link not found"); 3196 } 3197 3198 skb_put(buf, TLV_SPACE(str_len)); 3199 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 3200 3201 return buf; 3202 } 3203 3204 #if 0 3205 int link_control(const char *name, u32 op, u32 val) 3206 { 3207 int res = -EINVAL; 3208 struct link *l_ptr; 3209 u32 bearer_id; 3210 struct tipc_node * node; 3211 u32 a; 3212 3213 a = link_name2addr(name, &bearer_id); 3214 read_lock_bh(&tipc_net_lock); 3215 node = tipc_node_find(a); 3216 if (node) { 3217 tipc_node_lock(node); 3218 l_ptr = node->links[bearer_id]; 3219 if (l_ptr) { 3220 if (op == TIPC_REMOVE_LINK) { 3221 struct bearer *b_ptr = l_ptr->b_ptr; 3222 spin_lock_bh(&b_ptr->publ.lock); 3223 tipc_link_delete(l_ptr); 3224 spin_unlock_bh(&b_ptr->publ.lock); 3225 } 3226 if (op == TIPC_CMD_BLOCK_LINK) { 3227 tipc_link_reset(l_ptr); 3228 l_ptr->blocked = 1; 3229 } 3230 if (op == TIPC_CMD_UNBLOCK_LINK) { 3231 l_ptr->blocked = 0; 3232 } 3233 res = 0; 3234 } 3235 tipc_node_unlock(node); 3236 } 3237 read_unlock_bh(&tipc_net_lock); 3238 return res; 3239 } 3240 #endif 3241 3242 /** 3243 * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination 3244 * @dest: network address of destination node 3245 * @selector: used to select from set of active links 3246 * 3247 * If no active link can be found, uses default maximum packet size. 3248 */ 3249 3250 u32 tipc_link_get_max_pkt(u32 dest, u32 selector) 3251 { 3252 struct tipc_node *n_ptr; 3253 struct link *l_ptr; 3254 u32 res = MAX_PKT_DEFAULT; 3255 3256 if (dest == tipc_own_addr) 3257 return MAX_MSG_SIZE; 3258 3259 read_lock_bh(&tipc_net_lock); 3260 n_ptr = tipc_node_select(dest, selector); 3261 if (n_ptr) { 3262 tipc_node_lock(n_ptr); 3263 l_ptr = n_ptr->active_links[selector & 1]; 3264 if (l_ptr) 3265 res = link_max_pkt(l_ptr); 3266 tipc_node_unlock(n_ptr); 3267 } 3268 read_unlock_bh(&tipc_net_lock); 3269 return res; 3270 } 3271 3272 #if 0 3273 static void link_dump_rec_queue(struct link *l_ptr) 3274 { 3275 struct sk_buff *crs; 3276 3277 if (!l_ptr->oldest_deferred_in) { 3278 info("Reception queue empty\n"); 3279 return; 3280 } 3281 info("Contents of Reception queue:\n"); 3282 crs = l_ptr->oldest_deferred_in; 3283 while (crs) { 3284 if (crs->data == (void *)0x0000a3a3) { 3285 info("buffer %x invalid\n", crs); 3286 return; 3287 } 3288 msg_dbg(buf_msg(crs), "In rec queue: \n"); 3289 crs = crs->next; 3290 } 3291 } 3292 #endif 3293 3294 static void link_dump_send_queue(struct link *l_ptr) 3295 { 3296 if (l_ptr->next_out) { 3297 info("\nContents of unsent queue:\n"); 3298 dbg_print_buf_chain(l_ptr->next_out); 3299 } 3300 info("\nContents of send queue:\n"); 3301 if (l_ptr->first_out) { 3302 dbg_print_buf_chain(l_ptr->first_out); 3303 } 3304 info("Empty send queue\n"); 3305 } 3306 3307 static void link_print(struct link *l_ptr, struct print_buf *buf, 3308 const char *str) 3309 { 3310 tipc_printf(buf, str); 3311 if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr)) 3312 return; 3313 tipc_printf(buf, "Link %x<%s>:", 3314 l_ptr->addr, l_ptr->b_ptr->publ.name); 3315 tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no)); 3316 tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no)); 3317 tipc_printf(buf, "SQUE"); 3318 if (l_ptr->first_out) { 3319 tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out))); 3320 if (l_ptr->next_out) 3321 tipc_printf(buf, "%u..", 3322 msg_seqno(buf_msg(l_ptr->next_out))); 3323 tipc_printf(buf, "%u]", 3324 msg_seqno(buf_msg 3325 (l_ptr->last_out)), l_ptr->out_queue_size); 3326 if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) - 3327 msg_seqno(buf_msg(l_ptr->first_out))) 3328 != (l_ptr->out_queue_size - 1)) 3329 || (l_ptr->last_out->next != NULL)) { 3330 tipc_printf(buf, "\nSend queue inconsistency\n"); 3331 tipc_printf(buf, "first_out= %x ", l_ptr->first_out); 3332 tipc_printf(buf, "next_out= %x ", l_ptr->next_out); 3333 tipc_printf(buf, "last_out= %x ", l_ptr->last_out); 3334 link_dump_send_queue(l_ptr); 3335 } 3336 } else 3337 tipc_printf(buf, "[]"); 3338 tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size); 3339 if (l_ptr->oldest_deferred_in) { 3340 u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 3341 u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in)); 3342 tipc_printf(buf, ":RQUE[%u..%u]", o, n); 3343 if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) { 3344 tipc_printf(buf, ":RQSIZ(%u)", 3345 l_ptr->deferred_inqueue_sz); 3346 } 3347 } 3348 if (link_working_unknown(l_ptr)) 3349 tipc_printf(buf, ":WU"); 3350 if (link_reset_reset(l_ptr)) 3351 tipc_printf(buf, ":RR"); 3352 if (link_reset_unknown(l_ptr)) 3353 tipc_printf(buf, ":RU"); 3354 if (link_working_working(l_ptr)) 3355 tipc_printf(buf, ":WW"); 3356 tipc_printf(buf, "\n"); 3357 } 3358 3359