1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* Processing of received RxRPC packets 3 * 4 * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include "ar-internal.h" 11 12 static void rxrpc_proto_abort(struct rxrpc_call *call, rxrpc_seq_t seq, 13 enum rxrpc_abort_reason why) 14 { 15 rxrpc_abort_call(call, seq, RX_PROTOCOL_ERROR, -EBADMSG, why); 16 } 17 18 /* 19 * Do TCP-style congestion management [RFC 5681]. 20 */ 21 static void rxrpc_congestion_management(struct rxrpc_call *call, 22 struct sk_buff *skb, 23 struct rxrpc_ack_summary *summary, 24 rxrpc_serial_t acked_serial) 25 { 26 enum rxrpc_congest_change change = rxrpc_cong_no_change; 27 unsigned int cumulative_acks = call->cong_cumul_acks; 28 unsigned int cwnd = call->cong_cwnd; 29 bool resend = false; 30 31 summary->flight_size = 32 (call->tx_top - call->acks_hard_ack) - summary->nr_acks; 33 34 if (test_and_clear_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags)) { 35 summary->retrans_timeo = true; 36 call->cong_ssthresh = max_t(unsigned int, 37 summary->flight_size / 2, 2); 38 cwnd = 1; 39 if (cwnd >= call->cong_ssthresh && 40 call->cong_mode == RXRPC_CALL_SLOW_START) { 41 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 42 call->cong_tstamp = skb->tstamp; 43 cumulative_acks = 0; 44 } 45 } 46 47 cumulative_acks += summary->nr_new_acks; 48 cumulative_acks += summary->nr_rot_new_acks; 49 if (cumulative_acks > 255) 50 cumulative_acks = 255; 51 52 summary->mode = call->cong_mode; 53 summary->cwnd = call->cong_cwnd; 54 summary->ssthresh = call->cong_ssthresh; 55 summary->cumulative_acks = cumulative_acks; 56 summary->dup_acks = call->cong_dup_acks; 57 58 switch (call->cong_mode) { 59 case RXRPC_CALL_SLOW_START: 60 if (summary->saw_nacks) 61 goto packet_loss_detected; 62 if (summary->cumulative_acks > 0) 63 cwnd += 1; 64 if (cwnd >= call->cong_ssthresh) { 65 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 66 call->cong_tstamp = skb->tstamp; 67 } 68 goto out; 69 70 case RXRPC_CALL_CONGEST_AVOIDANCE: 71 if (summary->saw_nacks) 72 goto packet_loss_detected; 73 74 /* We analyse the number of packets that get ACK'd per RTT 75 * period and increase the window if we managed to fill it. 76 */ 77 if (call->peer->rtt_count == 0) 78 goto out; 79 if (ktime_before(skb->tstamp, 80 ktime_add_us(call->cong_tstamp, 81 call->peer->srtt_us >> 3))) 82 goto out_no_clear_ca; 83 change = rxrpc_cong_rtt_window_end; 84 call->cong_tstamp = skb->tstamp; 85 if (cumulative_acks >= cwnd) 86 cwnd++; 87 goto out; 88 89 case RXRPC_CALL_PACKET_LOSS: 90 if (!summary->saw_nacks) 91 goto resume_normality; 92 93 if (summary->new_low_nack) { 94 change = rxrpc_cong_new_low_nack; 95 call->cong_dup_acks = 1; 96 if (call->cong_extra > 1) 97 call->cong_extra = 1; 98 goto send_extra_data; 99 } 100 101 call->cong_dup_acks++; 102 if (call->cong_dup_acks < 3) 103 goto send_extra_data; 104 105 change = rxrpc_cong_begin_retransmission; 106 call->cong_mode = RXRPC_CALL_FAST_RETRANSMIT; 107 call->cong_ssthresh = max_t(unsigned int, 108 summary->flight_size / 2, 2); 109 cwnd = call->cong_ssthresh + 3; 110 call->cong_extra = 0; 111 call->cong_dup_acks = 0; 112 resend = true; 113 goto out; 114 115 case RXRPC_CALL_FAST_RETRANSMIT: 116 if (!summary->new_low_nack) { 117 if (summary->nr_new_acks == 0) 118 cwnd += 1; 119 call->cong_dup_acks++; 120 if (call->cong_dup_acks == 2) { 121 change = rxrpc_cong_retransmit_again; 122 call->cong_dup_acks = 0; 123 resend = true; 124 } 125 } else { 126 change = rxrpc_cong_progress; 127 cwnd = call->cong_ssthresh; 128 if (!summary->saw_nacks) 129 goto resume_normality; 130 } 131 goto out; 132 133 default: 134 BUG(); 135 goto out; 136 } 137 138 resume_normality: 139 change = rxrpc_cong_cleared_nacks; 140 call->cong_dup_acks = 0; 141 call->cong_extra = 0; 142 call->cong_tstamp = skb->tstamp; 143 if (cwnd < call->cong_ssthresh) 144 call->cong_mode = RXRPC_CALL_SLOW_START; 145 else 146 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 147 out: 148 cumulative_acks = 0; 149 out_no_clear_ca: 150 if (cwnd >= RXRPC_TX_MAX_WINDOW) 151 cwnd = RXRPC_TX_MAX_WINDOW; 152 call->cong_cwnd = cwnd; 153 call->cong_cumul_acks = cumulative_acks; 154 trace_rxrpc_congest(call, summary, acked_serial, change); 155 if (resend) 156 rxrpc_resend(call, skb); 157 return; 158 159 packet_loss_detected: 160 change = rxrpc_cong_saw_nack; 161 call->cong_mode = RXRPC_CALL_PACKET_LOSS; 162 call->cong_dup_acks = 0; 163 goto send_extra_data; 164 165 send_extra_data: 166 /* Send some previously unsent DATA if we have some to advance the ACK 167 * state. 168 */ 169 if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) || 170 summary->nr_acks != call->tx_top - call->acks_hard_ack) { 171 call->cong_extra++; 172 wake_up(&call->waitq); 173 } 174 goto out_no_clear_ca; 175 } 176 177 /* 178 * Degrade the congestion window if we haven't transmitted a packet for >1RTT. 179 */ 180 void rxrpc_congestion_degrade(struct rxrpc_call *call) 181 { 182 ktime_t rtt, now; 183 184 if (call->cong_mode != RXRPC_CALL_SLOW_START && 185 call->cong_mode != RXRPC_CALL_CONGEST_AVOIDANCE) 186 return; 187 if (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_AWAIT_REPLY) 188 return; 189 190 rtt = ns_to_ktime(call->peer->srtt_us * (1000 / 8)); 191 now = ktime_get_real(); 192 if (!ktime_before(ktime_add(call->tx_last_sent, rtt), now)) 193 return; 194 195 trace_rxrpc_reset_cwnd(call, now); 196 rxrpc_inc_stat(call->rxnet, stat_tx_data_cwnd_reset); 197 call->tx_last_sent = now; 198 call->cong_mode = RXRPC_CALL_SLOW_START; 199 call->cong_ssthresh = max_t(unsigned int, call->cong_ssthresh, 200 call->cong_cwnd * 3 / 4); 201 call->cong_cwnd = max_t(unsigned int, call->cong_cwnd / 2, RXRPC_MIN_CWND); 202 } 203 204 /* 205 * Apply a hard ACK by advancing the Tx window. 206 */ 207 static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to, 208 struct rxrpc_ack_summary *summary) 209 { 210 struct rxrpc_txbuf *txb; 211 bool rot_last = false; 212 213 list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) { 214 if (before_eq(txb->seq, call->acks_hard_ack)) 215 continue; 216 summary->nr_rot_new_acks++; 217 if (test_bit(RXRPC_TXBUF_LAST, &txb->flags)) { 218 set_bit(RXRPC_CALL_TX_LAST, &call->flags); 219 rot_last = true; 220 } 221 if (txb->seq == to) 222 break; 223 } 224 225 if (rot_last) 226 set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags); 227 228 _enter("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last); 229 230 if (call->acks_lowest_nak == call->acks_hard_ack) { 231 call->acks_lowest_nak = to; 232 } else if (after(to, call->acks_lowest_nak)) { 233 summary->new_low_nack = true; 234 call->acks_lowest_nak = to; 235 } 236 237 smp_store_release(&call->acks_hard_ack, to); 238 239 trace_rxrpc_txqueue(call, (rot_last ? 240 rxrpc_txqueue_rotate_last : 241 rxrpc_txqueue_rotate)); 242 wake_up(&call->waitq); 243 return rot_last; 244 } 245 246 /* 247 * End the transmission phase of a call. 248 * 249 * This occurs when we get an ACKALL packet, the first DATA packet of a reply, 250 * or a final ACK packet. 251 */ 252 static void rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun, 253 enum rxrpc_abort_reason abort_why) 254 { 255 ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags)); 256 257 switch (__rxrpc_call_state(call)) { 258 case RXRPC_CALL_CLIENT_SEND_REQUEST: 259 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 260 if (reply_begun) { 261 rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_RECV_REPLY); 262 trace_rxrpc_txqueue(call, rxrpc_txqueue_end); 263 break; 264 } 265 266 rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY); 267 trace_rxrpc_txqueue(call, rxrpc_txqueue_await_reply); 268 break; 269 270 case RXRPC_CALL_SERVER_AWAIT_ACK: 271 rxrpc_call_completed(call); 272 trace_rxrpc_txqueue(call, rxrpc_txqueue_end); 273 break; 274 275 default: 276 kdebug("end_tx %s", rxrpc_call_states[__rxrpc_call_state(call)]); 277 rxrpc_proto_abort(call, call->tx_top, abort_why); 278 break; 279 } 280 } 281 282 /* 283 * Begin the reply reception phase of a call. 284 */ 285 static bool rxrpc_receiving_reply(struct rxrpc_call *call) 286 { 287 struct rxrpc_ack_summary summary = { 0 }; 288 unsigned long now, timo; 289 rxrpc_seq_t top = READ_ONCE(call->tx_top); 290 291 if (call->ackr_reason) { 292 now = jiffies; 293 timo = now + MAX_JIFFY_OFFSET; 294 295 WRITE_ONCE(call->delay_ack_at, timo); 296 trace_rxrpc_timer(call, rxrpc_timer_init_for_reply, now); 297 } 298 299 if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags)) { 300 if (!rxrpc_rotate_tx_window(call, top, &summary)) { 301 rxrpc_proto_abort(call, top, rxrpc_eproto_early_reply); 302 return false; 303 } 304 } 305 306 rxrpc_end_tx_phase(call, true, rxrpc_eproto_unexpected_reply); 307 return true; 308 } 309 310 /* 311 * End the packet reception phase. 312 */ 313 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) 314 { 315 rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); 316 317 _enter("%d,%s", call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)]); 318 319 trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); 320 321 switch (__rxrpc_call_state(call)) { 322 case RXRPC_CALL_CLIENT_RECV_REPLY: 323 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); 324 rxrpc_call_completed(call); 325 break; 326 327 case RXRPC_CALL_SERVER_RECV_REQUEST: 328 rxrpc_set_call_state(call, RXRPC_CALL_SERVER_ACK_REQUEST); 329 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; 330 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_processing_op); 331 break; 332 333 default: 334 break; 335 } 336 } 337 338 static void rxrpc_input_update_ack_window(struct rxrpc_call *call, 339 rxrpc_seq_t window, rxrpc_seq_t wtop) 340 { 341 atomic64_set_release(&call->ackr_window, ((u64)wtop) << 32 | window); 342 } 343 344 /* 345 * Push a DATA packet onto the Rx queue. 346 */ 347 static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb, 348 rxrpc_seq_t window, rxrpc_seq_t wtop, 349 enum rxrpc_receive_trace why) 350 { 351 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 352 bool last = sp->hdr.flags & RXRPC_LAST_PACKET; 353 354 __skb_queue_tail(&call->recvmsg_queue, skb); 355 rxrpc_input_update_ack_window(call, window, wtop); 356 trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq); 357 if (last) 358 rxrpc_end_rx_phase(call, sp->hdr.serial); 359 } 360 361 /* 362 * Process a DATA packet. 363 */ 364 static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb, 365 bool *_notify) 366 { 367 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 368 struct sk_buff *oos; 369 rxrpc_serial_t serial = sp->hdr.serial; 370 u64 win = atomic64_read(&call->ackr_window); 371 rxrpc_seq_t window = lower_32_bits(win); 372 rxrpc_seq_t wtop = upper_32_bits(win); 373 rxrpc_seq_t wlimit = window + call->rx_winsize - 1; 374 rxrpc_seq_t seq = sp->hdr.seq; 375 bool last = sp->hdr.flags & RXRPC_LAST_PACKET; 376 int ack_reason = -1; 377 378 rxrpc_inc_stat(call->rxnet, stat_rx_data); 379 if (sp->hdr.flags & RXRPC_REQUEST_ACK) 380 rxrpc_inc_stat(call->rxnet, stat_rx_data_reqack); 381 if (sp->hdr.flags & RXRPC_JUMBO_PACKET) 382 rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo); 383 384 if (last) { 385 if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) && 386 seq + 1 != wtop) 387 return rxrpc_proto_abort(call, seq, rxrpc_eproto_different_last); 388 } else { 389 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && 390 after_eq(seq, wtop)) { 391 pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n", 392 call->debug_id, seq, window, wtop, wlimit); 393 return rxrpc_proto_abort(call, seq, rxrpc_eproto_data_after_last); 394 } 395 } 396 397 if (after(seq, call->rx_highest_seq)) 398 call->rx_highest_seq = seq; 399 400 trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags); 401 402 if (before(seq, window)) { 403 ack_reason = RXRPC_ACK_DUPLICATE; 404 goto send_ack; 405 } 406 if (after(seq, wlimit)) { 407 ack_reason = RXRPC_ACK_EXCEEDS_WINDOW; 408 goto send_ack; 409 } 410 411 /* Queue the packet. */ 412 if (seq == window) { 413 rxrpc_seq_t reset_from; 414 bool reset_sack = false; 415 416 if (sp->hdr.flags & RXRPC_REQUEST_ACK) 417 ack_reason = RXRPC_ACK_REQUESTED; 418 /* Send an immediate ACK if we fill in a hole */ 419 else if (!skb_queue_empty(&call->rx_oos_queue)) 420 ack_reason = RXRPC_ACK_DELAY; 421 else 422 atomic_inc_return(&call->ackr_nr_unacked); 423 424 window++; 425 if (after(window, wtop)) 426 wtop = window; 427 428 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg); 429 430 spin_lock(&call->recvmsg_queue.lock); 431 rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue); 432 *_notify = true; 433 434 while ((oos = skb_peek(&call->rx_oos_queue))) { 435 struct rxrpc_skb_priv *osp = rxrpc_skb(oos); 436 437 if (after(osp->hdr.seq, window)) 438 break; 439 440 __skb_unlink(oos, &call->rx_oos_queue); 441 last = osp->hdr.flags & RXRPC_LAST_PACKET; 442 seq = osp->hdr.seq; 443 if (!reset_sack) { 444 reset_from = seq; 445 reset_sack = true; 446 } 447 448 window++; 449 rxrpc_input_queue_data(call, oos, window, wtop, 450 rxrpc_receive_queue_oos); 451 } 452 453 spin_unlock(&call->recvmsg_queue.lock); 454 455 if (reset_sack) { 456 do { 457 call->ackr_sack_table[reset_from % RXRPC_SACK_SIZE] = 0; 458 } while (reset_from++, before(reset_from, window)); 459 } 460 } else { 461 bool keep = false; 462 463 ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE; 464 465 if (!call->ackr_sack_table[seq % RXRPC_SACK_SIZE]) { 466 call->ackr_sack_table[seq % RXRPC_SACK_SIZE] = 1; 467 keep = 1; 468 } 469 470 if (after(seq + 1, wtop)) { 471 wtop = seq + 1; 472 rxrpc_input_update_ack_window(call, window, wtop); 473 } 474 475 if (!keep) { 476 ack_reason = RXRPC_ACK_DUPLICATE; 477 goto send_ack; 478 } 479 480 skb_queue_walk(&call->rx_oos_queue, oos) { 481 struct rxrpc_skb_priv *osp = rxrpc_skb(oos); 482 483 if (after(osp->hdr.seq, seq)) { 484 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos); 485 __skb_queue_before(&call->rx_oos_queue, oos, skb); 486 goto oos_queued; 487 } 488 } 489 490 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos); 491 __skb_queue_tail(&call->rx_oos_queue, skb); 492 oos_queued: 493 trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos, 494 sp->hdr.serial, sp->hdr.seq); 495 } 496 497 send_ack: 498 if (ack_reason >= 0) 499 rxrpc_send_ACK(call, ack_reason, serial, 500 rxrpc_propose_ack_input_data); 501 else 502 rxrpc_propose_delay_ACK(call, serial, 503 rxrpc_propose_ack_input_data); 504 } 505 506 /* 507 * Split a jumbo packet and file the bits separately. 508 */ 509 static bool rxrpc_input_split_jumbo(struct rxrpc_call *call, struct sk_buff *skb) 510 { 511 struct rxrpc_jumbo_header jhdr; 512 struct rxrpc_skb_priv *sp = rxrpc_skb(skb), *jsp; 513 struct sk_buff *jskb; 514 unsigned int offset = sizeof(struct rxrpc_wire_header); 515 unsigned int len = skb->len - offset; 516 bool notify = false; 517 518 while (sp->hdr.flags & RXRPC_JUMBO_PACKET) { 519 if (len < RXRPC_JUMBO_SUBPKTLEN) 520 goto protocol_error; 521 if (sp->hdr.flags & RXRPC_LAST_PACKET) 522 goto protocol_error; 523 if (skb_copy_bits(skb, offset + RXRPC_JUMBO_DATALEN, 524 &jhdr, sizeof(jhdr)) < 0) 525 goto protocol_error; 526 527 jskb = skb_clone(skb, GFP_NOFS); 528 if (!jskb) { 529 kdebug("couldn't clone"); 530 return false; 531 } 532 rxrpc_new_skb(jskb, rxrpc_skb_new_jumbo_subpacket); 533 jsp = rxrpc_skb(jskb); 534 jsp->offset = offset; 535 jsp->len = RXRPC_JUMBO_DATALEN; 536 rxrpc_input_data_one(call, jskb, ¬ify); 537 rxrpc_free_skb(jskb, rxrpc_skb_put_jumbo_subpacket); 538 539 sp->hdr.flags = jhdr.flags; 540 sp->hdr._rsvd = ntohs(jhdr._rsvd); 541 sp->hdr.seq++; 542 sp->hdr.serial++; 543 offset += RXRPC_JUMBO_SUBPKTLEN; 544 len -= RXRPC_JUMBO_SUBPKTLEN; 545 } 546 547 sp->offset = offset; 548 sp->len = len; 549 rxrpc_input_data_one(call, skb, ¬ify); 550 if (notify) { 551 trace_rxrpc_notify_socket(call->debug_id, sp->hdr.serial); 552 rxrpc_notify_socket(call); 553 } 554 return true; 555 556 protocol_error: 557 return false; 558 } 559 560 /* 561 * Process a DATA packet, adding the packet to the Rx ring. The caller's 562 * packet ref must be passed on or discarded. 563 */ 564 static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) 565 { 566 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 567 rxrpc_serial_t serial = sp->hdr.serial; 568 rxrpc_seq_t seq0 = sp->hdr.seq; 569 570 _enter("{%llx,%x},{%u,%x}", 571 atomic64_read(&call->ackr_window), call->rx_highest_seq, 572 skb->len, seq0); 573 574 if (__rxrpc_call_is_complete(call)) 575 return; 576 577 switch (__rxrpc_call_state(call)) { 578 case RXRPC_CALL_CLIENT_SEND_REQUEST: 579 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 580 /* Received data implicitly ACKs all of the request 581 * packets we sent when we're acting as a client. 582 */ 583 if (!rxrpc_receiving_reply(call)) 584 goto out_notify; 585 break; 586 587 case RXRPC_CALL_SERVER_RECV_REQUEST: { 588 unsigned long timo = READ_ONCE(call->next_req_timo); 589 unsigned long now, expect_req_by; 590 591 if (timo) { 592 now = jiffies; 593 expect_req_by = now + timo; 594 WRITE_ONCE(call->expect_req_by, expect_req_by); 595 rxrpc_reduce_call_timer(call, expect_req_by, now, 596 rxrpc_timer_set_for_idle); 597 } 598 break; 599 } 600 601 default: 602 break; 603 } 604 605 if (!rxrpc_input_split_jumbo(call, skb)) { 606 rxrpc_proto_abort(call, sp->hdr.seq, rxrpc_badmsg_bad_jumbo); 607 goto out_notify; 608 } 609 skb = NULL; 610 611 out_notify: 612 trace_rxrpc_notify_socket(call->debug_id, serial); 613 rxrpc_notify_socket(call); 614 _leave(" [queued]"); 615 } 616 617 /* 618 * See if there's a cached RTT probe to complete. 619 */ 620 static void rxrpc_complete_rtt_probe(struct rxrpc_call *call, 621 ktime_t resp_time, 622 rxrpc_serial_t acked_serial, 623 rxrpc_serial_t ack_serial, 624 enum rxrpc_rtt_rx_trace type) 625 { 626 rxrpc_serial_t orig_serial; 627 unsigned long avail; 628 ktime_t sent_at; 629 bool matched = false; 630 int i; 631 632 avail = READ_ONCE(call->rtt_avail); 633 smp_rmb(); /* Read avail bits before accessing data. */ 634 635 for (i = 0; i < ARRAY_SIZE(call->rtt_serial); i++) { 636 if (!test_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &avail)) 637 continue; 638 639 sent_at = call->rtt_sent_at[i]; 640 orig_serial = call->rtt_serial[i]; 641 642 if (orig_serial == acked_serial) { 643 clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail); 644 smp_mb(); /* Read data before setting avail bit */ 645 set_bit(i, &call->rtt_avail); 646 if (type != rxrpc_rtt_rx_cancel) 647 rxrpc_peer_add_rtt(call, type, i, acked_serial, ack_serial, 648 sent_at, resp_time); 649 else 650 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_cancel, i, 651 orig_serial, acked_serial, 0, 0); 652 matched = true; 653 } 654 655 /* If a later serial is being acked, then mark this slot as 656 * being available. 657 */ 658 if (after(acked_serial, orig_serial)) { 659 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_obsolete, i, 660 orig_serial, acked_serial, 0, 0); 661 clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail); 662 smp_wmb(); 663 set_bit(i, &call->rtt_avail); 664 } 665 } 666 667 if (!matched) 668 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_lost, 9, 0, acked_serial, 0, 0); 669 } 670 671 /* 672 * Process the extra information that may be appended to an ACK packet 673 */ 674 static void rxrpc_input_ackinfo(struct rxrpc_call *call, struct sk_buff *skb, 675 struct rxrpc_ackinfo *ackinfo) 676 { 677 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 678 struct rxrpc_peer *peer; 679 unsigned int mtu; 680 bool wake = false; 681 u32 rwind = ntohl(ackinfo->rwind); 682 683 if (rwind > RXRPC_TX_MAX_WINDOW) 684 rwind = RXRPC_TX_MAX_WINDOW; 685 if (call->tx_winsize != rwind) { 686 if (rwind > call->tx_winsize) 687 wake = true; 688 trace_rxrpc_rx_rwind_change(call, sp->hdr.serial, rwind, wake); 689 call->tx_winsize = rwind; 690 } 691 692 if (call->cong_ssthresh > rwind) 693 call->cong_ssthresh = rwind; 694 695 mtu = min(ntohl(ackinfo->rxMTU), ntohl(ackinfo->maxMTU)); 696 697 peer = call->peer; 698 if (mtu < peer->maxdata) { 699 spin_lock(&peer->lock); 700 peer->maxdata = mtu; 701 peer->mtu = mtu + peer->hdrsize; 702 spin_unlock(&peer->lock); 703 } 704 705 if (wake) 706 wake_up(&call->waitq); 707 } 708 709 /* 710 * Process individual soft ACKs. 711 * 712 * Each ACK in the array corresponds to one packet and can be either an ACK or 713 * a NAK. If we get find an explicitly NAK'd packet we resend immediately; 714 * packets that lie beyond the end of the ACK list are scheduled for resend by 715 * the timer on the basis that the peer might just not have processed them at 716 * the time the ACK was sent. 717 */ 718 static void rxrpc_input_soft_acks(struct rxrpc_call *call, u8 *acks, 719 rxrpc_seq_t seq, int nr_acks, 720 struct rxrpc_ack_summary *summary) 721 { 722 unsigned int i; 723 724 for (i = 0; i < nr_acks; i++) { 725 if (acks[i] == RXRPC_ACK_TYPE_ACK) { 726 summary->nr_acks++; 727 summary->nr_new_acks++; 728 } else { 729 if (!summary->saw_nacks && 730 call->acks_lowest_nak != seq + i) { 731 call->acks_lowest_nak = seq + i; 732 summary->new_low_nack = true; 733 } 734 summary->saw_nacks = true; 735 } 736 } 737 } 738 739 /* 740 * Return true if the ACK is valid - ie. it doesn't appear to have regressed 741 * with respect to the ack state conveyed by preceding ACKs. 742 */ 743 static bool rxrpc_is_ack_valid(struct rxrpc_call *call, 744 rxrpc_seq_t first_pkt, rxrpc_seq_t prev_pkt) 745 { 746 rxrpc_seq_t base = READ_ONCE(call->acks_first_seq); 747 748 if (after(first_pkt, base)) 749 return true; /* The window advanced */ 750 751 if (before(first_pkt, base)) 752 return false; /* firstPacket regressed */ 753 754 if (after_eq(prev_pkt, call->acks_prev_seq)) 755 return true; /* previousPacket hasn't regressed. */ 756 757 /* Some rx implementations put a serial number in previousPacket. */ 758 if (after_eq(prev_pkt, base + call->tx_winsize)) 759 return false; 760 return true; 761 } 762 763 /* 764 * Process an ACK packet. 765 * 766 * ack.firstPacket is the sequence number of the first soft-ACK'd/NAK'd packet 767 * in the ACK array. Anything before that is hard-ACK'd and may be discarded. 768 * 769 * A hard-ACK means that a packet has been processed and may be discarded; a 770 * soft-ACK means that the packet may be discarded and retransmission 771 * requested. A phase is complete when all packets are hard-ACK'd. 772 */ 773 static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) 774 { 775 struct rxrpc_ack_summary summary = { 0 }; 776 struct rxrpc_ackpacket ack; 777 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 778 struct rxrpc_ackinfo info; 779 rxrpc_serial_t ack_serial, acked_serial; 780 rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt; 781 int nr_acks, offset, ioffset; 782 783 _enter(""); 784 785 offset = sizeof(struct rxrpc_wire_header); 786 if (skb_copy_bits(skb, offset, &ack, sizeof(ack)) < 0) 787 return rxrpc_proto_abort(call, 0, rxrpc_badmsg_short_ack); 788 offset += sizeof(ack); 789 790 ack_serial = sp->hdr.serial; 791 acked_serial = ntohl(ack.serial); 792 first_soft_ack = ntohl(ack.firstPacket); 793 prev_pkt = ntohl(ack.previousPacket); 794 hard_ack = first_soft_ack - 1; 795 nr_acks = ack.nAcks; 796 summary.ack_reason = (ack.reason < RXRPC_ACK__INVALID ? 797 ack.reason : RXRPC_ACK__INVALID); 798 799 trace_rxrpc_rx_ack(call, ack_serial, acked_serial, 800 first_soft_ack, prev_pkt, 801 summary.ack_reason, nr_acks); 802 rxrpc_inc_stat(call->rxnet, stat_rx_acks[ack.reason]); 803 804 switch (ack.reason) { 805 case RXRPC_ACK_PING_RESPONSE: 806 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 807 rxrpc_rtt_rx_ping_response); 808 break; 809 case RXRPC_ACK_REQUESTED: 810 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 811 rxrpc_rtt_rx_requested_ack); 812 break; 813 default: 814 if (acked_serial != 0) 815 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 816 rxrpc_rtt_rx_cancel); 817 break; 818 } 819 820 if (ack.reason == RXRPC_ACK_PING) { 821 rxrpc_send_ACK(call, RXRPC_ACK_PING_RESPONSE, ack_serial, 822 rxrpc_propose_ack_respond_to_ping); 823 } else if (sp->hdr.flags & RXRPC_REQUEST_ACK) { 824 rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, ack_serial, 825 rxrpc_propose_ack_respond_to_ack); 826 } 827 828 /* If we get an EXCEEDS_WINDOW ACK from the server, it probably 829 * indicates that the client address changed due to NAT. The server 830 * lost the call because it switched to a different peer. 831 */ 832 if (unlikely(ack.reason == RXRPC_ACK_EXCEEDS_WINDOW) && 833 first_soft_ack == 1 && 834 prev_pkt == 0 && 835 rxrpc_is_client_call(call)) { 836 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 837 0, -ENETRESET); 838 return; 839 } 840 841 /* If we get an OUT_OF_SEQUENCE ACK from the server, that can also 842 * indicate a change of address. However, we can retransmit the call 843 * if we still have it buffered to the beginning. 844 */ 845 if (unlikely(ack.reason == RXRPC_ACK_OUT_OF_SEQUENCE) && 846 first_soft_ack == 1 && 847 prev_pkt == 0 && 848 call->acks_hard_ack == 0 && 849 rxrpc_is_client_call(call)) { 850 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 851 0, -ENETRESET); 852 return; 853 } 854 855 /* Discard any out-of-order or duplicate ACKs (outside lock). */ 856 if (!rxrpc_is_ack_valid(call, first_soft_ack, prev_pkt)) { 857 trace_rxrpc_rx_discard_ack(call->debug_id, ack_serial, 858 first_soft_ack, call->acks_first_seq, 859 prev_pkt, call->acks_prev_seq); 860 return; 861 } 862 863 info.rxMTU = 0; 864 ioffset = offset + nr_acks + 3; 865 if (skb->len >= ioffset + sizeof(info) && 866 skb_copy_bits(skb, ioffset, &info, sizeof(info)) < 0) 867 return rxrpc_proto_abort(call, 0, rxrpc_badmsg_short_ack_info); 868 869 if (nr_acks > 0) 870 skb_condense(skb); 871 872 call->acks_latest_ts = skb->tstamp; 873 call->acks_first_seq = first_soft_ack; 874 call->acks_prev_seq = prev_pkt; 875 876 switch (ack.reason) { 877 case RXRPC_ACK_PING: 878 break; 879 default: 880 if (after(acked_serial, call->acks_highest_serial)) 881 call->acks_highest_serial = acked_serial; 882 break; 883 } 884 885 /* Parse rwind and mtu sizes if provided. */ 886 if (info.rxMTU) 887 rxrpc_input_ackinfo(call, skb, &info); 888 889 if (first_soft_ack == 0) 890 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_zero); 891 892 /* Ignore ACKs unless we are or have just been transmitting. */ 893 switch (__rxrpc_call_state(call)) { 894 case RXRPC_CALL_CLIENT_SEND_REQUEST: 895 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 896 case RXRPC_CALL_SERVER_SEND_REPLY: 897 case RXRPC_CALL_SERVER_AWAIT_ACK: 898 break; 899 default: 900 return; 901 } 902 903 if (before(hard_ack, call->acks_hard_ack) || 904 after(hard_ack, call->tx_top)) 905 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_outside_window); 906 if (nr_acks > call->tx_top - hard_ack) 907 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_sack_overflow); 908 909 if (after(hard_ack, call->acks_hard_ack)) { 910 if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) { 911 rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ack); 912 return; 913 } 914 } 915 916 if (nr_acks > 0) { 917 if (offset > (int)skb->len - nr_acks) 918 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_short_sack); 919 rxrpc_input_soft_acks(call, skb->data + offset, first_soft_ack, 920 nr_acks, &summary); 921 } 922 923 if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) && 924 summary.nr_acks == call->tx_top - hard_ack && 925 rxrpc_is_client_call(call)) 926 rxrpc_propose_ping(call, ack_serial, 927 rxrpc_propose_ack_ping_for_lost_reply); 928 929 rxrpc_congestion_management(call, skb, &summary, acked_serial); 930 } 931 932 /* 933 * Process an ACKALL packet. 934 */ 935 static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb) 936 { 937 struct rxrpc_ack_summary summary = { 0 }; 938 939 if (rxrpc_rotate_tx_window(call, call->tx_top, &summary)) 940 rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ackall); 941 } 942 943 /* 944 * Process an ABORT packet directed at a call. 945 */ 946 static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb) 947 { 948 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 949 950 trace_rxrpc_rx_abort(call, sp->hdr.serial, skb->priority); 951 952 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 953 skb->priority, -ECONNABORTED); 954 } 955 956 /* 957 * Process an incoming call packet. 958 */ 959 void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb) 960 { 961 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 962 unsigned long timo; 963 964 _enter("%p,%p", call, skb); 965 966 if (sp->hdr.serviceId != call->dest_srx.srx_service) 967 call->dest_srx.srx_service = sp->hdr.serviceId; 968 if ((int)sp->hdr.serial - (int)call->rx_serial > 0) 969 call->rx_serial = sp->hdr.serial; 970 if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags)) 971 set_bit(RXRPC_CALL_RX_HEARD, &call->flags); 972 973 timo = READ_ONCE(call->next_rx_timo); 974 if (timo) { 975 unsigned long now = jiffies, expect_rx_by; 976 977 expect_rx_by = now + timo; 978 WRITE_ONCE(call->expect_rx_by, expect_rx_by); 979 rxrpc_reduce_call_timer(call, expect_rx_by, now, 980 rxrpc_timer_set_for_normal); 981 } 982 983 switch (sp->hdr.type) { 984 case RXRPC_PACKET_TYPE_DATA: 985 return rxrpc_input_data(call, skb); 986 987 case RXRPC_PACKET_TYPE_ACK: 988 return rxrpc_input_ack(call, skb); 989 990 case RXRPC_PACKET_TYPE_BUSY: 991 /* Just ignore BUSY packets from the server; the retry and 992 * lifespan timers will take care of business. BUSY packets 993 * from the client don't make sense. 994 */ 995 return; 996 997 case RXRPC_PACKET_TYPE_ABORT: 998 return rxrpc_input_abort(call, skb); 999 1000 case RXRPC_PACKET_TYPE_ACKALL: 1001 return rxrpc_input_ackall(call, skb); 1002 1003 default: 1004 break; 1005 } 1006 } 1007 1008 /* 1009 * Handle a new service call on a channel implicitly completing the preceding 1010 * call on that channel. This does not apply to client conns. 1011 * 1012 * TODO: If callNumber > call_id + 1, renegotiate security. 1013 */ 1014 void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb) 1015 { 1016 switch (__rxrpc_call_state(call)) { 1017 case RXRPC_CALL_SERVER_AWAIT_ACK: 1018 rxrpc_call_completed(call); 1019 fallthrough; 1020 case RXRPC_CALL_COMPLETE: 1021 break; 1022 default: 1023 rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ESHUTDOWN, 1024 rxrpc_eproto_improper_term); 1025 trace_rxrpc_improper_term(call); 1026 break; 1027 } 1028 1029 rxrpc_input_call_event(call, skb); 1030 } 1031