1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* Client connection-specific management code. 3 * 4 * Copyright (C) 2016, 2020 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 * 7 * Client connections need to be cached for a little while after they've made a 8 * call so as to handle retransmitted DATA packets in case the server didn't 9 * receive the final ACK or terminating ABORT we sent it. 10 * 11 * There are flags of relevance to the cache: 12 * 13 * (2) DONT_REUSE - The connection should be discarded as soon as possible and 14 * should not be reused. This is set when an exclusive connection is used 15 * or a call ID counter overflows. 16 * 17 * The caching state may only be changed if the cache lock is held. 18 * 19 * There are two idle client connection expiry durations. If the total number 20 * of connections is below the reap threshold, we use the normal duration; if 21 * it's above, we use the fast duration. 22 */ 23 24 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 25 26 #include <linux/slab.h> 27 #include <linux/idr.h> 28 #include <linux/timer.h> 29 #include <linux/sched/signal.h> 30 31 #include "ar-internal.h" 32 33 __read_mostly unsigned int rxrpc_reap_client_connections = 900; 34 __read_mostly unsigned long rxrpc_conn_idle_client_expiry = 2 * 60 * HZ; 35 __read_mostly unsigned long rxrpc_conn_idle_client_fast_expiry = 2 * HZ; 36 37 /* 38 * We use machine-unique IDs for our client connections. 39 */ 40 DEFINE_IDR(rxrpc_client_conn_ids); 41 static DEFINE_SPINLOCK(rxrpc_conn_id_lock); 42 43 /* 44 * Get a connection ID and epoch for a client connection from the global pool. 45 * The connection struct pointer is then recorded in the idr radix tree. The 46 * epoch doesn't change until the client is rebooted (or, at least, unless the 47 * module is unloaded). 48 */ 49 static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn, 50 gfp_t gfp) 51 { 52 struct rxrpc_net *rxnet = conn->params.local->rxnet; 53 int id; 54 55 _enter(""); 56 57 idr_preload(gfp); 58 spin_lock(&rxrpc_conn_id_lock); 59 60 id = idr_alloc_cyclic(&rxrpc_client_conn_ids, conn, 61 1, 0x40000000, GFP_NOWAIT); 62 if (id < 0) 63 goto error; 64 65 spin_unlock(&rxrpc_conn_id_lock); 66 idr_preload_end(); 67 68 conn->proto.epoch = rxnet->epoch; 69 conn->proto.cid = id << RXRPC_CIDSHIFT; 70 set_bit(RXRPC_CONN_HAS_IDR, &conn->flags); 71 _leave(" [CID %x]", conn->proto.cid); 72 return 0; 73 74 error: 75 spin_unlock(&rxrpc_conn_id_lock); 76 idr_preload_end(); 77 _leave(" = %d", id); 78 return id; 79 } 80 81 /* 82 * Release a connection ID for a client connection from the global pool. 83 */ 84 static void rxrpc_put_client_connection_id(struct rxrpc_connection *conn) 85 { 86 if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) { 87 spin_lock(&rxrpc_conn_id_lock); 88 idr_remove(&rxrpc_client_conn_ids, 89 conn->proto.cid >> RXRPC_CIDSHIFT); 90 spin_unlock(&rxrpc_conn_id_lock); 91 } 92 } 93 94 /* 95 * Destroy the client connection ID tree. 96 */ 97 void rxrpc_destroy_client_conn_ids(void) 98 { 99 struct rxrpc_connection *conn; 100 int id; 101 102 if (!idr_is_empty(&rxrpc_client_conn_ids)) { 103 idr_for_each_entry(&rxrpc_client_conn_ids, conn, id) { 104 pr_err("AF_RXRPC: Leaked client conn %p {%d}\n", 105 conn, atomic_read(&conn->usage)); 106 } 107 BUG(); 108 } 109 110 idr_destroy(&rxrpc_client_conn_ids); 111 } 112 113 /* 114 * Allocate a connection bundle. 115 */ 116 static struct rxrpc_bundle *rxrpc_alloc_bundle(struct rxrpc_conn_parameters *cp, 117 gfp_t gfp) 118 { 119 struct rxrpc_bundle *bundle; 120 121 bundle = kzalloc(sizeof(*bundle), gfp); 122 if (bundle) { 123 bundle->params = *cp; 124 rxrpc_get_peer(bundle->params.peer); 125 atomic_set(&bundle->usage, 1); 126 spin_lock_init(&bundle->channel_lock); 127 INIT_LIST_HEAD(&bundle->waiting_calls); 128 } 129 return bundle; 130 } 131 132 struct rxrpc_bundle *rxrpc_get_bundle(struct rxrpc_bundle *bundle) 133 { 134 atomic_inc(&bundle->usage); 135 return bundle; 136 } 137 138 void rxrpc_put_bundle(struct rxrpc_bundle *bundle) 139 { 140 unsigned int d = bundle->debug_id; 141 unsigned int u = atomic_dec_return(&bundle->usage); 142 143 _debug("PUT B=%x %u", d, u); 144 if (u == 0) { 145 rxrpc_put_peer(bundle->params.peer); 146 kfree(bundle); 147 } 148 } 149 150 /* 151 * Allocate a client connection. 152 */ 153 static struct rxrpc_connection * 154 rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle, gfp_t gfp) 155 { 156 struct rxrpc_connection *conn; 157 struct rxrpc_net *rxnet = bundle->params.local->rxnet; 158 int ret; 159 160 _enter(""); 161 162 conn = rxrpc_alloc_connection(gfp); 163 if (!conn) { 164 _leave(" = -ENOMEM"); 165 return ERR_PTR(-ENOMEM); 166 } 167 168 atomic_set(&conn->usage, 1); 169 conn->bundle = bundle; 170 conn->params = bundle->params; 171 conn->out_clientflag = RXRPC_CLIENT_INITIATED; 172 conn->state = RXRPC_CONN_CLIENT; 173 conn->service_id = conn->params.service_id; 174 175 ret = rxrpc_get_client_connection_id(conn, gfp); 176 if (ret < 0) 177 goto error_0; 178 179 ret = rxrpc_init_client_conn_security(conn); 180 if (ret < 0) 181 goto error_1; 182 183 atomic_inc(&rxnet->nr_conns); 184 write_lock(&rxnet->conn_lock); 185 list_add_tail(&conn->proc_link, &rxnet->conn_proc_list); 186 write_unlock(&rxnet->conn_lock); 187 188 rxrpc_get_bundle(bundle); 189 rxrpc_get_peer(conn->params.peer); 190 rxrpc_get_local(conn->params.local); 191 key_get(conn->params.key); 192 193 trace_rxrpc_conn(conn->debug_id, rxrpc_conn_new_client, 194 atomic_read(&conn->usage), 195 __builtin_return_address(0)); 196 197 atomic_inc(&rxnet->nr_client_conns); 198 trace_rxrpc_client(conn, -1, rxrpc_client_alloc); 199 _leave(" = %p", conn); 200 return conn; 201 202 error_1: 203 rxrpc_put_client_connection_id(conn); 204 error_0: 205 kfree(conn); 206 _leave(" = %d", ret); 207 return ERR_PTR(ret); 208 } 209 210 /* 211 * Determine if a connection may be reused. 212 */ 213 static bool rxrpc_may_reuse_conn(struct rxrpc_connection *conn) 214 { 215 struct rxrpc_net *rxnet; 216 int id_cursor, id, distance, limit; 217 218 if (!conn) 219 goto dont_reuse; 220 221 rxnet = conn->params.local->rxnet; 222 if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags)) 223 goto dont_reuse; 224 225 if (conn->state != RXRPC_CONN_CLIENT || 226 conn->proto.epoch != rxnet->epoch) 227 goto mark_dont_reuse; 228 229 /* The IDR tree gets very expensive on memory if the connection IDs are 230 * widely scattered throughout the number space, so we shall want to 231 * kill off connections that, say, have an ID more than about four 232 * times the maximum number of client conns away from the current 233 * allocation point to try and keep the IDs concentrated. 234 */ 235 id_cursor = idr_get_cursor(&rxrpc_client_conn_ids); 236 id = conn->proto.cid >> RXRPC_CIDSHIFT; 237 distance = id - id_cursor; 238 if (distance < 0) 239 distance = -distance; 240 limit = max_t(unsigned long, atomic_read(&rxnet->nr_conns) * 4, 1024); 241 if (distance > limit) 242 goto mark_dont_reuse; 243 244 return true; 245 246 mark_dont_reuse: 247 set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags); 248 dont_reuse: 249 return false; 250 } 251 252 /* 253 * Look up the conn bundle that matches the connection parameters, adding it if 254 * it doesn't yet exist. 255 */ 256 static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_conn_parameters *cp, 257 gfp_t gfp) 258 { 259 static atomic_t rxrpc_bundle_id; 260 struct rxrpc_bundle *bundle, *candidate; 261 struct rxrpc_local *local = cp->local; 262 struct rb_node *p, **pp, *parent; 263 long diff; 264 265 _enter("{%px,%x,%u,%u}", 266 cp->peer, key_serial(cp->key), cp->security_level, cp->upgrade); 267 268 if (cp->exclusive) 269 return rxrpc_alloc_bundle(cp, gfp); 270 271 /* First, see if the bundle is already there. */ 272 _debug("search 1"); 273 spin_lock(&local->client_bundles_lock); 274 p = local->client_bundles.rb_node; 275 while (p) { 276 bundle = rb_entry(p, struct rxrpc_bundle, local_node); 277 278 #define cmp(X) ((long)bundle->params.X - (long)cp->X) 279 diff = (cmp(peer) ?: 280 cmp(key) ?: 281 cmp(security_level) ?: 282 cmp(upgrade)); 283 #undef cmp 284 if (diff < 0) 285 p = p->rb_left; 286 else if (diff > 0) 287 p = p->rb_right; 288 else 289 goto found_bundle; 290 } 291 spin_unlock(&local->client_bundles_lock); 292 _debug("not found"); 293 294 /* It wasn't. We need to add one. */ 295 candidate = rxrpc_alloc_bundle(cp, gfp); 296 if (!candidate) 297 return NULL; 298 299 _debug("search 2"); 300 spin_lock(&local->client_bundles_lock); 301 pp = &local->client_bundles.rb_node; 302 parent = NULL; 303 while (*pp) { 304 parent = *pp; 305 bundle = rb_entry(parent, struct rxrpc_bundle, local_node); 306 307 #define cmp(X) ((long)bundle->params.X - (long)cp->X) 308 diff = (cmp(peer) ?: 309 cmp(key) ?: 310 cmp(security_level) ?: 311 cmp(upgrade)); 312 #undef cmp 313 if (diff < 0) 314 pp = &(*pp)->rb_left; 315 else if (diff > 0) 316 pp = &(*pp)->rb_right; 317 else 318 goto found_bundle_free; 319 } 320 321 _debug("new bundle"); 322 candidate->debug_id = atomic_inc_return(&rxrpc_bundle_id); 323 rb_link_node(&candidate->local_node, parent, pp); 324 rb_insert_color(&candidate->local_node, &local->client_bundles); 325 rxrpc_get_bundle(candidate); 326 spin_unlock(&local->client_bundles_lock); 327 _leave(" = %u [new]", candidate->debug_id); 328 return candidate; 329 330 found_bundle_free: 331 kfree(candidate); 332 found_bundle: 333 rxrpc_get_bundle(bundle); 334 spin_unlock(&local->client_bundles_lock); 335 _leave(" = %u [found]", bundle->debug_id); 336 return bundle; 337 } 338 339 /* 340 * Create or find a client bundle to use for a call. 341 * 342 * If we return with a connection, the call will be on its waiting list. It's 343 * left to the caller to assign a channel and wake up the call. 344 */ 345 static struct rxrpc_bundle *rxrpc_prep_call(struct rxrpc_sock *rx, 346 struct rxrpc_call *call, 347 struct rxrpc_conn_parameters *cp, 348 struct sockaddr_rxrpc *srx, 349 gfp_t gfp) 350 { 351 struct rxrpc_bundle *bundle; 352 353 _enter("{%d,%lx},", call->debug_id, call->user_call_ID); 354 355 cp->peer = rxrpc_lookup_peer(rx, cp->local, srx, gfp); 356 if (!cp->peer) 357 goto error; 358 359 call->cong_cwnd = cp->peer->cong_cwnd; 360 if (call->cong_cwnd >= call->cong_ssthresh) 361 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 362 else 363 call->cong_mode = RXRPC_CALL_SLOW_START; 364 if (cp->upgrade) 365 __set_bit(RXRPC_CALL_UPGRADE, &call->flags); 366 367 /* Find the client connection bundle. */ 368 bundle = rxrpc_look_up_bundle(cp, gfp); 369 if (!bundle) 370 goto error; 371 372 /* Get this call queued. Someone else may activate it whilst we're 373 * lining up a new connection, but that's fine. 374 */ 375 spin_lock(&bundle->channel_lock); 376 list_add_tail(&call->chan_wait_link, &bundle->waiting_calls); 377 spin_unlock(&bundle->channel_lock); 378 379 _leave(" = [B=%x]", bundle->debug_id); 380 return bundle; 381 382 error: 383 _leave(" = -ENOMEM"); 384 return ERR_PTR(-ENOMEM); 385 } 386 387 /* 388 * Allocate a new connection and add it into a bundle. 389 */ 390 static void rxrpc_add_conn_to_bundle(struct rxrpc_bundle *bundle, gfp_t gfp) 391 __releases(bundle->channel_lock) 392 { 393 struct rxrpc_connection *candidate = NULL, *old = NULL; 394 bool conflict; 395 int i; 396 397 _enter(""); 398 399 conflict = bundle->alloc_conn; 400 if (!conflict) 401 bundle->alloc_conn = true; 402 spin_unlock(&bundle->channel_lock); 403 if (conflict) { 404 _leave(" [conf]"); 405 return; 406 } 407 408 candidate = rxrpc_alloc_client_connection(bundle, gfp); 409 410 spin_lock(&bundle->channel_lock); 411 bundle->alloc_conn = false; 412 413 if (IS_ERR(candidate)) { 414 bundle->alloc_error = PTR_ERR(candidate); 415 spin_unlock(&bundle->channel_lock); 416 _leave(" [err %ld]", PTR_ERR(candidate)); 417 return; 418 } 419 420 bundle->alloc_error = 0; 421 422 for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) { 423 unsigned int shift = i * RXRPC_MAXCALLS; 424 int j; 425 426 old = bundle->conns[i]; 427 if (!rxrpc_may_reuse_conn(old)) { 428 if (old) 429 trace_rxrpc_client(old, -1, rxrpc_client_replace); 430 candidate->bundle_shift = shift; 431 bundle->conns[i] = candidate; 432 for (j = 0; j < RXRPC_MAXCALLS; j++) 433 set_bit(shift + j, &bundle->avail_chans); 434 candidate = NULL; 435 break; 436 } 437 438 old = NULL; 439 } 440 441 spin_unlock(&bundle->channel_lock); 442 443 if (candidate) { 444 _debug("discard C=%x", candidate->debug_id); 445 trace_rxrpc_client(candidate, -1, rxrpc_client_duplicate); 446 rxrpc_put_connection(candidate); 447 } 448 449 rxrpc_put_connection(old); 450 _leave(""); 451 } 452 453 /* 454 * Add a connection to a bundle if there are no usable connections or we have 455 * connections waiting for extra capacity. 456 */ 457 static void rxrpc_maybe_add_conn(struct rxrpc_bundle *bundle, gfp_t gfp) 458 { 459 struct rxrpc_call *call; 460 int i, usable; 461 462 _enter(""); 463 464 spin_lock(&bundle->channel_lock); 465 466 /* See if there are any usable connections. */ 467 usable = 0; 468 for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) 469 if (rxrpc_may_reuse_conn(bundle->conns[i])) 470 usable++; 471 472 if (!usable && !list_empty(&bundle->waiting_calls)) { 473 call = list_first_entry(&bundle->waiting_calls, 474 struct rxrpc_call, chan_wait_link); 475 if (test_bit(RXRPC_CALL_UPGRADE, &call->flags)) 476 bundle->try_upgrade = true; 477 } 478 479 if (!usable) 480 goto alloc_conn; 481 482 if (!bundle->avail_chans && 483 !bundle->try_upgrade && 484 !list_empty(&bundle->waiting_calls) && 485 usable < ARRAY_SIZE(bundle->conns)) 486 goto alloc_conn; 487 488 spin_unlock(&bundle->channel_lock); 489 _leave(""); 490 return; 491 492 alloc_conn: 493 return rxrpc_add_conn_to_bundle(bundle, gfp); 494 } 495 496 /* 497 * Assign a channel to the call at the front of the queue and wake the call up. 498 * We don't increment the callNumber counter until this number has been exposed 499 * to the world. 500 */ 501 static void rxrpc_activate_one_channel(struct rxrpc_connection *conn, 502 unsigned int channel) 503 { 504 struct rxrpc_channel *chan = &conn->channels[channel]; 505 struct rxrpc_bundle *bundle = conn->bundle; 506 struct rxrpc_call *call = list_entry(bundle->waiting_calls.next, 507 struct rxrpc_call, chan_wait_link); 508 u32 call_id = chan->call_counter + 1; 509 510 _enter("C=%x,%u", conn->debug_id, channel); 511 512 trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate); 513 514 /* Cancel the final ACK on the previous call if it hasn't been sent yet 515 * as the DATA packet will implicitly ACK it. 516 */ 517 clear_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags); 518 clear_bit(conn->bundle_shift + channel, &bundle->avail_chans); 519 520 rxrpc_see_call(call); 521 list_del_init(&call->chan_wait_link); 522 call->peer = rxrpc_get_peer(conn->params.peer); 523 call->conn = rxrpc_get_connection(conn); 524 call->cid = conn->proto.cid | channel; 525 call->call_id = call_id; 526 call->security = conn->security; 527 call->security_ix = conn->security_ix; 528 call->service_id = conn->service_id; 529 530 trace_rxrpc_connect_call(call); 531 _net("CONNECT call %08x:%08x as call %d on conn %d", 532 call->cid, call->call_id, call->debug_id, conn->debug_id); 533 534 write_lock_bh(&call->state_lock); 535 call->state = RXRPC_CALL_CLIENT_SEND_REQUEST; 536 write_unlock_bh(&call->state_lock); 537 538 /* Paired with the read barrier in rxrpc_connect_call(). This orders 539 * cid and epoch in the connection wrt to call_id without the need to 540 * take the channel_lock. 541 * 542 * We provisionally assign a callNumber at this point, but we don't 543 * confirm it until the call is about to be exposed. 544 * 545 * TODO: Pair with a barrier in the data_ready handler when that looks 546 * at the call ID through a connection channel. 547 */ 548 smp_wmb(); 549 550 chan->call_id = call_id; 551 chan->call_debug_id = call->debug_id; 552 rcu_assign_pointer(chan->call, call); 553 wake_up(&call->waitq); 554 } 555 556 /* 557 * Remove a connection from the idle list if it's on it. 558 */ 559 static void rxrpc_unidle_conn(struct rxrpc_bundle *bundle, struct rxrpc_connection *conn) 560 { 561 struct rxrpc_net *rxnet = bundle->params.local->rxnet; 562 bool drop_ref; 563 564 if (!list_empty(&conn->cache_link)) { 565 drop_ref = false; 566 spin_lock(&rxnet->client_conn_cache_lock); 567 if (!list_empty(&conn->cache_link)) { 568 list_del_init(&conn->cache_link); 569 drop_ref = true; 570 } 571 spin_unlock(&rxnet->client_conn_cache_lock); 572 if (drop_ref) 573 rxrpc_put_connection(conn); 574 } 575 } 576 577 /* 578 * Assign channels and callNumbers to waiting calls with channel_lock 579 * held by caller. 580 */ 581 static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle) 582 { 583 struct rxrpc_connection *conn; 584 unsigned long avail, mask; 585 unsigned int channel, slot; 586 587 if (bundle->try_upgrade) 588 mask = 1; 589 else 590 mask = ULONG_MAX; 591 592 while (!list_empty(&bundle->waiting_calls)) { 593 avail = bundle->avail_chans & mask; 594 if (!avail) 595 break; 596 channel = __ffs(avail); 597 clear_bit(channel, &bundle->avail_chans); 598 599 slot = channel / RXRPC_MAXCALLS; 600 conn = bundle->conns[slot]; 601 if (!conn) 602 break; 603 604 if (bundle->try_upgrade) 605 set_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags); 606 rxrpc_unidle_conn(bundle, conn); 607 608 channel &= (RXRPC_MAXCALLS - 1); 609 conn->act_chans |= 1 << channel; 610 rxrpc_activate_one_channel(conn, channel); 611 } 612 } 613 614 /* 615 * Assign channels and callNumbers to waiting calls. 616 */ 617 static void rxrpc_activate_channels(struct rxrpc_bundle *bundle) 618 { 619 _enter("B=%x", bundle->debug_id); 620 621 trace_rxrpc_client(NULL, -1, rxrpc_client_activate_chans); 622 623 if (!bundle->avail_chans) 624 return; 625 626 spin_lock(&bundle->channel_lock); 627 rxrpc_activate_channels_locked(bundle); 628 spin_unlock(&bundle->channel_lock); 629 _leave(""); 630 } 631 632 /* 633 * Wait for a callNumber and a channel to be granted to a call. 634 */ 635 static int rxrpc_wait_for_channel(struct rxrpc_bundle *bundle, 636 struct rxrpc_call *call, gfp_t gfp) 637 { 638 DECLARE_WAITQUEUE(myself, current); 639 int ret = 0; 640 641 _enter("%d", call->debug_id); 642 643 if (!gfpflags_allow_blocking(gfp)) { 644 rxrpc_maybe_add_conn(bundle, gfp); 645 rxrpc_activate_channels(bundle); 646 ret = bundle->alloc_error ?: -EAGAIN; 647 goto out; 648 } 649 650 add_wait_queue_exclusive(&call->waitq, &myself); 651 for (;;) { 652 rxrpc_maybe_add_conn(bundle, gfp); 653 rxrpc_activate_channels(bundle); 654 ret = bundle->alloc_error; 655 if (ret < 0) 656 break; 657 658 switch (call->interruptibility) { 659 case RXRPC_INTERRUPTIBLE: 660 case RXRPC_PREINTERRUPTIBLE: 661 set_current_state(TASK_INTERRUPTIBLE); 662 break; 663 case RXRPC_UNINTERRUPTIBLE: 664 default: 665 set_current_state(TASK_UNINTERRUPTIBLE); 666 break; 667 } 668 if (READ_ONCE(call->state) != RXRPC_CALL_CLIENT_AWAIT_CONN) 669 break; 670 if ((call->interruptibility == RXRPC_INTERRUPTIBLE || 671 call->interruptibility == RXRPC_PREINTERRUPTIBLE) && 672 signal_pending(current)) { 673 ret = -ERESTARTSYS; 674 break; 675 } 676 schedule(); 677 } 678 remove_wait_queue(&call->waitq, &myself); 679 __set_current_state(TASK_RUNNING); 680 681 out: 682 _leave(" = %d", ret); 683 return ret; 684 } 685 686 /* 687 * find a connection for a call 688 * - called in process context with IRQs enabled 689 */ 690 int rxrpc_connect_call(struct rxrpc_sock *rx, 691 struct rxrpc_call *call, 692 struct rxrpc_conn_parameters *cp, 693 struct sockaddr_rxrpc *srx, 694 gfp_t gfp) 695 { 696 struct rxrpc_bundle *bundle; 697 struct rxrpc_net *rxnet = cp->local->rxnet; 698 int ret = 0; 699 700 _enter("{%d,%lx},", call->debug_id, call->user_call_ID); 701 702 rxrpc_discard_expired_client_conns(&rxnet->client_conn_reaper); 703 704 bundle = rxrpc_prep_call(rx, call, cp, srx, gfp); 705 if (IS_ERR(bundle)) { 706 ret = PTR_ERR(bundle); 707 goto out; 708 } 709 710 if (call->state == RXRPC_CALL_CLIENT_AWAIT_CONN) { 711 ret = rxrpc_wait_for_channel(bundle, call, gfp); 712 if (ret < 0) 713 goto wait_failed; 714 } 715 716 granted_channel: 717 /* Paired with the write barrier in rxrpc_activate_one_channel(). */ 718 smp_rmb(); 719 720 out_put_bundle: 721 rxrpc_put_bundle(bundle); 722 out: 723 _leave(" = %d", ret); 724 return ret; 725 726 wait_failed: 727 spin_lock(&bundle->channel_lock); 728 list_del_init(&call->chan_wait_link); 729 spin_unlock(&bundle->channel_lock); 730 731 if (call->state != RXRPC_CALL_CLIENT_AWAIT_CONN) { 732 ret = 0; 733 goto granted_channel; 734 } 735 736 trace_rxrpc_client(call->conn, ret, rxrpc_client_chan_wait_failed); 737 rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret); 738 rxrpc_disconnect_client_call(bundle, call); 739 goto out_put_bundle; 740 } 741 742 /* 743 * Note that a call, and thus a connection, is about to be exposed to the 744 * world. 745 */ 746 void rxrpc_expose_client_call(struct rxrpc_call *call) 747 { 748 unsigned int channel = call->cid & RXRPC_CHANNELMASK; 749 struct rxrpc_connection *conn = call->conn; 750 struct rxrpc_channel *chan = &conn->channels[channel]; 751 752 if (!test_and_set_bit(RXRPC_CALL_EXPOSED, &call->flags)) { 753 /* Mark the call ID as being used. If the callNumber counter 754 * exceeds ~2 billion, we kill the connection after its 755 * outstanding calls have finished so that the counter doesn't 756 * wrap. 757 */ 758 chan->call_counter++; 759 if (chan->call_counter >= INT_MAX) 760 set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags); 761 trace_rxrpc_client(conn, channel, rxrpc_client_exposed); 762 } 763 } 764 765 /* 766 * Set the reap timer. 767 */ 768 static void rxrpc_set_client_reap_timer(struct rxrpc_net *rxnet) 769 { 770 if (!rxnet->kill_all_client_conns) { 771 unsigned long now = jiffies; 772 unsigned long reap_at = now + rxrpc_conn_idle_client_expiry; 773 774 if (rxnet->live) 775 timer_reduce(&rxnet->client_conn_reap_timer, reap_at); 776 } 777 } 778 779 /* 780 * Disconnect a client call. 781 */ 782 void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call *call) 783 { 784 struct rxrpc_connection *conn; 785 struct rxrpc_channel *chan = NULL; 786 struct rxrpc_net *rxnet = bundle->params.local->rxnet; 787 unsigned int channel; 788 bool may_reuse; 789 u32 cid; 790 791 _enter("c=%x", call->debug_id); 792 793 spin_lock(&bundle->channel_lock); 794 set_bit(RXRPC_CALL_DISCONNECTED, &call->flags); 795 796 /* Calls that have never actually been assigned a channel can simply be 797 * discarded. 798 */ 799 conn = call->conn; 800 if (!conn) { 801 _debug("call is waiting"); 802 ASSERTCMP(call->call_id, ==, 0); 803 ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags)); 804 list_del_init(&call->chan_wait_link); 805 goto out; 806 } 807 808 cid = call->cid; 809 channel = cid & RXRPC_CHANNELMASK; 810 chan = &conn->channels[channel]; 811 trace_rxrpc_client(conn, channel, rxrpc_client_chan_disconnect); 812 813 if (rcu_access_pointer(chan->call) != call) { 814 spin_unlock(&bundle->channel_lock); 815 BUG(); 816 } 817 818 may_reuse = rxrpc_may_reuse_conn(conn); 819 820 /* If a client call was exposed to the world, we save the result for 821 * retransmission. 822 * 823 * We use a barrier here so that the call number and abort code can be 824 * read without needing to take a lock. 825 * 826 * TODO: Make the incoming packet handler check this and handle 827 * terminal retransmission without requiring access to the call. 828 */ 829 if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) { 830 _debug("exposed %u,%u", call->call_id, call->abort_code); 831 __rxrpc_disconnect_call(conn, call); 832 833 if (test_and_clear_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags)) { 834 trace_rxrpc_client(conn, channel, rxrpc_client_to_active); 835 bundle->try_upgrade = false; 836 if (may_reuse) 837 rxrpc_activate_channels_locked(bundle); 838 } 839 840 } 841 842 /* See if we can pass the channel directly to another call. */ 843 if (may_reuse && !list_empty(&bundle->waiting_calls)) { 844 trace_rxrpc_client(conn, channel, rxrpc_client_chan_pass); 845 rxrpc_activate_one_channel(conn, channel); 846 goto out; 847 } 848 849 /* Schedule the final ACK to be transmitted in a short while so that it 850 * can be skipped if we find a follow-on call. The first DATA packet 851 * of the follow on call will implicitly ACK this call. 852 */ 853 if (call->completion == RXRPC_CALL_SUCCEEDED && 854 test_bit(RXRPC_CALL_EXPOSED, &call->flags)) { 855 unsigned long final_ack_at = jiffies + 2; 856 857 WRITE_ONCE(chan->final_ack_at, final_ack_at); 858 smp_wmb(); /* vs rxrpc_process_delayed_final_acks() */ 859 set_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags); 860 rxrpc_reduce_conn_timer(conn, final_ack_at); 861 } 862 863 /* Deactivate the channel. */ 864 rcu_assign_pointer(chan->call, NULL); 865 set_bit(conn->bundle_shift + channel, &conn->bundle->avail_chans); 866 conn->act_chans &= ~(1 << channel); 867 868 /* If no channels remain active, then put the connection on the idle 869 * list for a short while. Give it a ref to stop it going away if it 870 * becomes unbundled. 871 */ 872 if (!conn->act_chans) { 873 trace_rxrpc_client(conn, channel, rxrpc_client_to_idle); 874 conn->idle_timestamp = jiffies; 875 876 rxrpc_get_connection(conn); 877 spin_lock(&rxnet->client_conn_cache_lock); 878 list_move_tail(&conn->cache_link, &rxnet->idle_client_conns); 879 spin_unlock(&rxnet->client_conn_cache_lock); 880 881 rxrpc_set_client_reap_timer(rxnet); 882 } 883 884 out: 885 spin_unlock(&bundle->channel_lock); 886 _leave(""); 887 return; 888 } 889 890 /* 891 * Remove a connection from a bundle. 892 */ 893 static void rxrpc_unbundle_conn(struct rxrpc_connection *conn) 894 { 895 struct rxrpc_bundle *bundle = conn->bundle; 896 struct rxrpc_local *local = bundle->params.local; 897 unsigned int bindex; 898 bool need_drop = false, need_put = false; 899 int i; 900 901 _enter("C=%x", conn->debug_id); 902 903 if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK) 904 rxrpc_process_delayed_final_acks(conn, true); 905 906 spin_lock(&bundle->channel_lock); 907 bindex = conn->bundle_shift / RXRPC_MAXCALLS; 908 if (bundle->conns[bindex] == conn) { 909 _debug("clear slot %u", bindex); 910 bundle->conns[bindex] = NULL; 911 for (i = 0; i < RXRPC_MAXCALLS; i++) 912 clear_bit(conn->bundle_shift + i, &bundle->avail_chans); 913 need_drop = true; 914 } 915 spin_unlock(&bundle->channel_lock); 916 917 /* If there are no more connections, remove the bundle */ 918 if (!bundle->avail_chans) { 919 _debug("maybe unbundle"); 920 spin_lock(&local->client_bundles_lock); 921 922 for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) 923 if (bundle->conns[i]) 924 break; 925 if (i == ARRAY_SIZE(bundle->conns) && !bundle->params.exclusive) { 926 _debug("erase bundle"); 927 rb_erase(&bundle->local_node, &local->client_bundles); 928 need_put = true; 929 } 930 931 spin_unlock(&local->client_bundles_lock); 932 if (need_put) 933 rxrpc_put_bundle(bundle); 934 } 935 936 if (need_drop) 937 rxrpc_put_connection(conn); 938 _leave(""); 939 } 940 941 /* 942 * Clean up a dead client connection. 943 */ 944 static void rxrpc_kill_client_conn(struct rxrpc_connection *conn) 945 { 946 struct rxrpc_local *local = conn->params.local; 947 struct rxrpc_net *rxnet = local->rxnet; 948 949 _enter("C=%x", conn->debug_id); 950 951 trace_rxrpc_client(conn, -1, rxrpc_client_cleanup); 952 atomic_dec(&rxnet->nr_client_conns); 953 954 rxrpc_put_client_connection_id(conn); 955 rxrpc_kill_connection(conn); 956 } 957 958 /* 959 * Clean up a dead client connections. 960 */ 961 void rxrpc_put_client_conn(struct rxrpc_connection *conn) 962 { 963 const void *here = __builtin_return_address(0); 964 unsigned int debug_id = conn->debug_id; 965 int n; 966 967 n = atomic_dec_return(&conn->usage); 968 trace_rxrpc_conn(debug_id, rxrpc_conn_put_client, n, here); 969 if (n <= 0) { 970 ASSERTCMP(n, >=, 0); 971 rxrpc_kill_client_conn(conn); 972 } 973 } 974 975 /* 976 * Discard expired client connections from the idle list. Each conn in the 977 * idle list has been exposed and holds an extra ref because of that. 978 * 979 * This may be called from conn setup or from a work item so cannot be 980 * considered non-reentrant. 981 */ 982 void rxrpc_discard_expired_client_conns(struct work_struct *work) 983 { 984 struct rxrpc_connection *conn; 985 struct rxrpc_net *rxnet = 986 container_of(work, struct rxrpc_net, client_conn_reaper); 987 unsigned long expiry, conn_expires_at, now; 988 unsigned int nr_conns; 989 990 _enter(""); 991 992 if (list_empty(&rxnet->idle_client_conns)) { 993 _leave(" [empty]"); 994 return; 995 } 996 997 /* Don't double up on the discarding */ 998 if (!spin_trylock(&rxnet->client_conn_discard_lock)) { 999 _leave(" [already]"); 1000 return; 1001 } 1002 1003 /* We keep an estimate of what the number of conns ought to be after 1004 * we've discarded some so that we don't overdo the discarding. 1005 */ 1006 nr_conns = atomic_read(&rxnet->nr_client_conns); 1007 1008 next: 1009 spin_lock(&rxnet->client_conn_cache_lock); 1010 1011 if (list_empty(&rxnet->idle_client_conns)) 1012 goto out; 1013 1014 conn = list_entry(rxnet->idle_client_conns.next, 1015 struct rxrpc_connection, cache_link); 1016 1017 if (!rxnet->kill_all_client_conns) { 1018 /* If the number of connections is over the reap limit, we 1019 * expedite discard by reducing the expiry timeout. We must, 1020 * however, have at least a short grace period to be able to do 1021 * final-ACK or ABORT retransmission. 1022 */ 1023 expiry = rxrpc_conn_idle_client_expiry; 1024 if (nr_conns > rxrpc_reap_client_connections) 1025 expiry = rxrpc_conn_idle_client_fast_expiry; 1026 if (conn->params.local->service_closed) 1027 expiry = rxrpc_closed_conn_expiry * HZ; 1028 1029 conn_expires_at = conn->idle_timestamp + expiry; 1030 1031 now = READ_ONCE(jiffies); 1032 if (time_after(conn_expires_at, now)) 1033 goto not_yet_expired; 1034 } 1035 1036 trace_rxrpc_client(conn, -1, rxrpc_client_discard); 1037 list_del_init(&conn->cache_link); 1038 1039 spin_unlock(&rxnet->client_conn_cache_lock); 1040 1041 rxrpc_unbundle_conn(conn); 1042 rxrpc_put_connection(conn); /* Drop the ->cache_link ref */ 1043 1044 nr_conns--; 1045 goto next; 1046 1047 not_yet_expired: 1048 /* The connection at the front of the queue hasn't yet expired, so 1049 * schedule the work item for that point if we discarded something. 1050 * 1051 * We don't worry if the work item is already scheduled - it can look 1052 * after rescheduling itself at a later time. We could cancel it, but 1053 * then things get messier. 1054 */ 1055 _debug("not yet"); 1056 if (!rxnet->kill_all_client_conns) 1057 timer_reduce(&rxnet->client_conn_reap_timer, conn_expires_at); 1058 1059 out: 1060 spin_unlock(&rxnet->client_conn_cache_lock); 1061 spin_unlock(&rxnet->client_conn_discard_lock); 1062 _leave(""); 1063 } 1064 1065 /* 1066 * Preemptively destroy all the client connection records rather than waiting 1067 * for them to time out 1068 */ 1069 void rxrpc_destroy_all_client_connections(struct rxrpc_net *rxnet) 1070 { 1071 _enter(""); 1072 1073 spin_lock(&rxnet->client_conn_cache_lock); 1074 rxnet->kill_all_client_conns = true; 1075 spin_unlock(&rxnet->client_conn_cache_lock); 1076 1077 del_timer_sync(&rxnet->client_conn_reap_timer); 1078 1079 if (!rxrpc_queue_work(&rxnet->client_conn_reaper)) 1080 _debug("destroy: queue failed"); 1081 1082 _leave(""); 1083 } 1084 1085 /* 1086 * Clean up the client connections on a local endpoint. 1087 */ 1088 void rxrpc_clean_up_local_conns(struct rxrpc_local *local) 1089 { 1090 struct rxrpc_connection *conn, *tmp; 1091 struct rxrpc_net *rxnet = local->rxnet; 1092 LIST_HEAD(graveyard); 1093 1094 _enter(""); 1095 1096 spin_lock(&rxnet->client_conn_cache_lock); 1097 1098 list_for_each_entry_safe(conn, tmp, &rxnet->idle_client_conns, 1099 cache_link) { 1100 if (conn->params.local == local) { 1101 trace_rxrpc_client(conn, -1, rxrpc_client_discard); 1102 list_move(&conn->cache_link, &graveyard); 1103 } 1104 } 1105 1106 spin_unlock(&rxnet->client_conn_cache_lock); 1107 1108 while (!list_empty(&graveyard)) { 1109 conn = list_entry(graveyard.next, 1110 struct rxrpc_connection, cache_link); 1111 list_del_init(&conn->cache_link); 1112 rxrpc_unbundle_conn(conn); 1113 rxrpc_put_connection(conn); 1114 } 1115 1116 _leave(" [culled]"); 1117 } 1118