1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC packet transmission 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/net.h> 11 #include <linux/gfp.h> 12 #include <linux/skbuff.h> 13 #include <linux/export.h> 14 #include <net/sock.h> 15 #include <net/af_rxrpc.h> 16 #include <net/udp.h> 17 #include "ar-internal.h" 18 19 ssize_t do_udp_sendmsg(struct socket *socket, struct msghdr *msg, size_t len) 20 { 21 struct sockaddr *sa = msg->msg_name; 22 struct sock *sk = socket->sk; 23 24 if (IS_ENABLED(CONFIG_AF_RXRPC_IPV6)) { 25 if (sa->sa_family == AF_INET6) { 26 if (sk->sk_family != AF_INET6) { 27 pr_warn("AF_INET6 address on AF_INET socket\n"); 28 return -ENOPROTOOPT; 29 } 30 return udpv6_sendmsg(sk, msg, len); 31 } 32 } 33 return udp_sendmsg(sk, msg, len); 34 } 35 36 struct rxrpc_abort_buffer { 37 struct rxrpc_wire_header whdr; 38 __be32 abort_code; 39 }; 40 41 static const char rxrpc_keepalive_string[] = ""; 42 43 /* 44 * Increase Tx backoff on transmission failure and clear it on success. 45 */ 46 static void rxrpc_tx_backoff(struct rxrpc_call *call, int ret) 47 { 48 if (ret < 0) { 49 if (call->tx_backoff < 1000) 50 call->tx_backoff += 100; 51 } else { 52 call->tx_backoff = 0; 53 } 54 } 55 56 /* 57 * Arrange for a keepalive ping a certain time after we last transmitted. This 58 * lets the far side know we're still interested in this call and helps keep 59 * the route through any intervening firewall open. 60 * 61 * Receiving a response to the ping will prevent the ->expect_rx_by timer from 62 * expiring. 63 */ 64 static void rxrpc_set_keepalive(struct rxrpc_call *call, ktime_t now) 65 { 66 ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo) / 6); 67 68 call->keepalive_at = ktime_add(ktime_get_real(), delay); 69 trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_keepalive); 70 } 71 72 /* 73 * Allocate transmission buffers for an ACK and attach them to local->kv[]. 74 */ 75 static int rxrpc_alloc_ack(struct rxrpc_call *call, size_t sack_size) 76 { 77 struct rxrpc_wire_header *whdr; 78 struct rxrpc_acktrailer *trailer; 79 struct rxrpc_ackpacket *ack; 80 struct kvec *kv = call->local->kvec; 81 gfp_t gfp = rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS; 82 void *buf, *buf2 = NULL; 83 u8 *filler; 84 85 buf = page_frag_alloc(&call->local->tx_alloc, 86 sizeof(*whdr) + sizeof(*ack) + 1 + 3 + sizeof(*trailer), gfp); 87 if (!buf) 88 return -ENOMEM; 89 90 if (sack_size) { 91 buf2 = page_frag_alloc(&call->local->tx_alloc, sack_size, gfp); 92 if (!buf2) { 93 page_frag_free(buf); 94 return -ENOMEM; 95 } 96 } 97 98 whdr = buf; 99 ack = buf + sizeof(*whdr); 100 filler = buf + sizeof(*whdr) + sizeof(*ack) + 1; 101 trailer = buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3; 102 103 kv[0].iov_base = whdr; 104 kv[0].iov_len = sizeof(*whdr) + sizeof(*ack); 105 kv[1].iov_base = buf2; 106 kv[1].iov_len = sack_size; 107 kv[2].iov_base = filler; 108 kv[2].iov_len = 3 + sizeof(*trailer); 109 return 3; /* Number of kvec[] used. */ 110 } 111 112 static void rxrpc_free_ack(struct rxrpc_call *call) 113 { 114 page_frag_free(call->local->kvec[0].iov_base); 115 if (call->local->kvec[1].iov_base) 116 page_frag_free(call->local->kvec[1].iov_base); 117 } 118 119 /* 120 * Record the beginning of an RTT probe. 121 */ 122 static void rxrpc_begin_rtt_probe(struct rxrpc_call *call, rxrpc_serial_t serial, 123 ktime_t now, enum rxrpc_rtt_tx_trace why) 124 { 125 unsigned long avail = call->rtt_avail; 126 int rtt_slot = 9; 127 128 if (!(avail & RXRPC_CALL_RTT_AVAIL_MASK)) 129 goto no_slot; 130 131 rtt_slot = __ffs(avail & RXRPC_CALL_RTT_AVAIL_MASK); 132 if (!test_and_clear_bit(rtt_slot, &call->rtt_avail)) 133 goto no_slot; 134 135 call->rtt_serial[rtt_slot] = serial; 136 call->rtt_sent_at[rtt_slot] = now; 137 smp_wmb(); /* Write data before avail bit */ 138 set_bit(rtt_slot + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail); 139 140 trace_rxrpc_rtt_tx(call, why, rtt_slot, serial); 141 return; 142 143 no_slot: 144 trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_no_slot, rtt_slot, serial); 145 } 146 147 /* 148 * Fill out an ACK packet. 149 */ 150 static int rxrpc_fill_out_ack(struct rxrpc_call *call, int nr_kv, u8 ack_reason, 151 rxrpc_serial_t serial_to_ack, rxrpc_serial_t *_ack_serial) 152 { 153 struct kvec *kv = call->local->kvec; 154 struct rxrpc_wire_header *whdr = kv[0].iov_base; 155 struct rxrpc_acktrailer *trailer = kv[2].iov_base + 3; 156 struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1); 157 unsigned int qsize, sack, wrap, to, max_mtu, if_mtu; 158 rxrpc_seq_t window, wtop; 159 ktime_t now = ktime_get_real(); 160 int rsize; 161 u8 *filler = kv[2].iov_base; 162 u8 *sackp = kv[1].iov_base; 163 164 rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill); 165 166 window = call->ackr_window; 167 wtop = call->ackr_wtop; 168 sack = call->ackr_sack_base % RXRPC_SACK_SIZE; 169 170 *_ack_serial = rxrpc_get_next_serial(call->conn); 171 172 whdr->epoch = htonl(call->conn->proto.epoch); 173 whdr->cid = htonl(call->cid); 174 whdr->callNumber = htonl(call->call_id); 175 whdr->serial = htonl(*_ack_serial); 176 whdr->seq = 0; 177 whdr->type = RXRPC_PACKET_TYPE_ACK; 178 whdr->flags = call->conn->out_clientflag | RXRPC_SLOW_START_OK; 179 whdr->userStatus = 0; 180 whdr->securityIndex = call->security_ix; 181 whdr->_rsvd = 0; 182 whdr->serviceId = htons(call->dest_srx.srx_service); 183 184 ack->bufferSpace = 0; 185 ack->maxSkew = 0; 186 ack->firstPacket = htonl(window); 187 ack->previousPacket = htonl(call->rx_highest_seq); 188 ack->serial = htonl(serial_to_ack); 189 ack->reason = ack_reason; 190 ack->nAcks = wtop - window; 191 filler[0] = 0; 192 filler[1] = 0; 193 filler[2] = 0; 194 195 if (ack_reason == RXRPC_ACK_PING) 196 whdr->flags |= RXRPC_REQUEST_ACK; 197 198 if (after(wtop, window)) { 199 kv[1].iov_len = ack->nAcks; 200 201 wrap = RXRPC_SACK_SIZE - sack; 202 to = umin(ack->nAcks, RXRPC_SACK_SIZE); 203 204 if (sack + ack->nAcks <= RXRPC_SACK_SIZE) { 205 memcpy(sackp, call->ackr_sack_table + sack, ack->nAcks); 206 } else { 207 memcpy(sackp, call->ackr_sack_table + sack, wrap); 208 memcpy(sackp + wrap, call->ackr_sack_table, to - wrap); 209 } 210 } else if (before(wtop, window)) { 211 pr_warn("ack window backward %x %x", window, wtop); 212 } else if (ack->reason == RXRPC_ACK_DELAY) { 213 ack->reason = RXRPC_ACK_IDLE; 214 } 215 216 qsize = (window - 1) - call->rx_consumed; 217 rsize = max_t(int, call->rx_winsize - qsize, 0); 218 219 if_mtu = call->peer->if_mtu - call->peer->hdrsize; 220 if (call->peer->ackr_adv_pmtud) { 221 max_mtu = umax(call->peer->max_data, rxrpc_rx_mtu); 222 } else { 223 if_mtu = umin(if_mtu, 1444); 224 max_mtu = if_mtu; 225 } 226 227 trailer->maxMTU = htonl(max_mtu); 228 trailer->ifMTU = htonl(if_mtu); 229 trailer->rwind = htonl(rsize); 230 trailer->jumbo_max = 0; /* Advertise pmtu discovery */ 231 232 if (ack_reason == RXRPC_ACK_PING) 233 rxrpc_begin_rtt_probe(call, *_ack_serial, now, rxrpc_rtt_tx_ping); 234 if (whdr->flags & RXRPC_REQUEST_ACK) 235 call->rtt_last_req = now; 236 rxrpc_set_keepalive(call, now); 237 return nr_kv; 238 } 239 240 /* 241 * Transmit an ACK packet. 242 */ 243 static void rxrpc_send_ack_packet(struct rxrpc_call *call, int nr_kv, size_t len, 244 rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why) 245 { 246 struct kvec *kv = call->local->kvec; 247 struct rxrpc_wire_header *whdr = kv[0].iov_base; 248 struct rxrpc_acktrailer *trailer = kv[2].iov_base + 3; 249 struct rxrpc_connection *conn; 250 struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1); 251 struct msghdr msg; 252 int ret; 253 254 if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) 255 return; 256 257 conn = call->conn; 258 259 msg.msg_name = &call->peer->srx.transport; 260 msg.msg_namelen = call->peer->srx.transport_len; 261 msg.msg_control = NULL; 262 msg.msg_controllen = 0; 263 msg.msg_flags = MSG_SPLICE_PAGES; 264 265 trace_rxrpc_tx_ack(call->debug_id, serial, 266 ntohl(ack->firstPacket), 267 ntohl(ack->serial), ack->reason, ack->nAcks, 268 ntohl(trailer->rwind), why); 269 270 rxrpc_inc_stat(call->rxnet, stat_tx_ack_send); 271 272 iov_iter_kvec(&msg.msg_iter, WRITE, kv, nr_kv, len); 273 rxrpc_local_dont_fragment(conn->local, why == rxrpc_propose_ack_ping_for_mtu_probe); 274 275 ret = do_udp_sendmsg(conn->local->socket, &msg, len); 276 rxrpc_peer_mark_tx(call->peer); 277 if (ret < 0) { 278 trace_rxrpc_tx_fail(call->debug_id, serial, ret, 279 rxrpc_tx_point_call_ack); 280 if (why == rxrpc_propose_ack_ping_for_mtu_probe && 281 ret == -EMSGSIZE) 282 rxrpc_input_probe_for_pmtud(conn, serial, true); 283 } else { 284 trace_rxrpc_tx_packet(call->debug_id, whdr, 285 rxrpc_tx_point_call_ack); 286 if (why == rxrpc_propose_ack_ping_for_mtu_probe) { 287 call->peer->pmtud_pending = false; 288 call->peer->pmtud_probing = true; 289 call->conn->pmtud_probe = serial; 290 call->conn->pmtud_call = call->debug_id; 291 trace_rxrpc_pmtud_tx(call); 292 } 293 } 294 rxrpc_tx_backoff(call, ret); 295 } 296 297 /* 298 * Queue an ACK for immediate transmission. 299 */ 300 void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason, 301 rxrpc_serial_t serial_to_ack, enum rxrpc_propose_ack_trace why) 302 { 303 struct kvec *kv = call->local->kvec; 304 rxrpc_serial_t ack_serial; 305 size_t len; 306 int nr_kv; 307 308 if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) 309 return; 310 311 rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]); 312 313 nr_kv = rxrpc_alloc_ack(call, call->ackr_wtop - call->ackr_window); 314 if (nr_kv < 0) { 315 kleave(" = -ENOMEM"); 316 return; 317 } 318 319 nr_kv = rxrpc_fill_out_ack(call, nr_kv, ack_reason, serial_to_ack, &ack_serial); 320 len = kv[0].iov_len; 321 len += kv[1].iov_len; 322 len += kv[2].iov_len; 323 324 /* Extend a path MTU probe ACK. */ 325 if (why == rxrpc_propose_ack_ping_for_mtu_probe) { 326 size_t probe_mtu = call->peer->pmtud_trial + sizeof(struct rxrpc_wire_header); 327 328 if (len > probe_mtu) 329 goto skip; 330 while (len < probe_mtu) { 331 size_t part = umin(probe_mtu - len, PAGE_SIZE); 332 333 kv[nr_kv].iov_base = page_address(ZERO_PAGE(0)); 334 kv[nr_kv].iov_len = part; 335 len += part; 336 nr_kv++; 337 } 338 } 339 340 call->ackr_nr_unacked = 0; 341 atomic_set(&call->ackr_nr_consumed, 0); 342 clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags); 343 344 trace_rxrpc_send_ack(call, why, ack_reason, ack_serial); 345 rxrpc_send_ack_packet(call, nr_kv, len, ack_serial, why); 346 skip: 347 rxrpc_free_ack(call); 348 } 349 350 /* 351 * Send an ACK probe for path MTU discovery. 352 */ 353 void rxrpc_send_probe_for_pmtud(struct rxrpc_call *call) 354 { 355 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 356 rxrpc_propose_ack_ping_for_mtu_probe); 357 } 358 359 /* 360 * Send an ABORT call packet. 361 */ 362 int rxrpc_send_abort_packet(struct rxrpc_call *call) 363 { 364 struct rxrpc_connection *conn; 365 struct rxrpc_abort_buffer pkt; 366 struct msghdr msg; 367 struct kvec iov[1]; 368 rxrpc_serial_t serial; 369 int ret; 370 371 /* Don't bother sending aborts for a client call once the server has 372 * hard-ACK'd all of its request data. After that point, we're not 373 * going to stop the operation proceeding, and whilst we might limit 374 * the reply, it's not worth it if we can send a new call on the same 375 * channel instead, thereby closing off this call. 376 */ 377 if (rxrpc_is_client_call(call) && 378 test_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags)) 379 return 0; 380 381 if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) 382 return -ECONNRESET; 383 384 conn = call->conn; 385 386 msg.msg_name = &call->peer->srx.transport; 387 msg.msg_namelen = call->peer->srx.transport_len; 388 msg.msg_control = NULL; 389 msg.msg_controllen = 0; 390 msg.msg_flags = 0; 391 392 pkt.whdr.epoch = htonl(conn->proto.epoch); 393 pkt.whdr.cid = htonl(call->cid); 394 pkt.whdr.callNumber = htonl(call->call_id); 395 pkt.whdr.seq = 0; 396 pkt.whdr.type = RXRPC_PACKET_TYPE_ABORT; 397 pkt.whdr.flags = conn->out_clientflag; 398 pkt.whdr.userStatus = 0; 399 pkt.whdr.securityIndex = call->security_ix; 400 pkt.whdr._rsvd = 0; 401 pkt.whdr.serviceId = htons(call->dest_srx.srx_service); 402 pkt.abort_code = htonl(call->abort_code); 403 404 iov[0].iov_base = &pkt; 405 iov[0].iov_len = sizeof(pkt); 406 407 serial = rxrpc_get_next_serial(conn); 408 pkt.whdr.serial = htonl(serial); 409 410 iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, sizeof(pkt)); 411 ret = do_udp_sendmsg(conn->local->socket, &msg, sizeof(pkt)); 412 rxrpc_peer_mark_tx(conn->peer); 413 if (ret < 0) 414 trace_rxrpc_tx_fail(call->debug_id, serial, ret, 415 rxrpc_tx_point_call_abort); 416 else 417 trace_rxrpc_tx_packet(call->debug_id, &pkt.whdr, 418 rxrpc_tx_point_call_abort); 419 rxrpc_tx_backoff(call, ret); 420 return ret; 421 } 422 423 /* 424 * Prepare a (sub)packet for transmission. 425 */ 426 static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, 427 struct rxrpc_send_data_req *req, 428 struct rxrpc_txbuf *txb, 429 struct rxrpc_wire_header *whdr, 430 rxrpc_serial_t serial, int subpkt) 431 { 432 struct rxrpc_jumbo_header *jumbo = txb->data - sizeof(*jumbo); 433 enum rxrpc_req_ack_trace why; 434 struct rxrpc_connection *conn = call->conn; 435 struct kvec *kv = &call->local->kvec[1 + subpkt]; 436 size_t len = txb->pkt_len; 437 bool last; 438 u8 flags; 439 440 _enter("%x,%zd", txb->seq, len); 441 442 txb->serial = serial; 443 444 if (test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags) && 445 txb->seq == 1) 446 whdr->userStatus = RXRPC_USERSTATUS_SERVICE_UPGRADE; 447 448 txb->flags &= ~RXRPC_REQUEST_ACK; 449 flags = txb->flags & RXRPC_TXBUF_WIRE_FLAGS; 450 last = txb->flags & RXRPC_LAST_PACKET; 451 452 if (subpkt < req->n - 1) { 453 len = RXRPC_JUMBO_DATALEN; 454 goto dont_set_request_ack; 455 } 456 457 /* If our RTT cache needs working on, request an ACK. Also request 458 * ACKs if a DATA packet appears to have been lost. 459 * 460 * However, we mustn't request an ACK on the last reply packet of a 461 * service call, lest OpenAFS incorrectly send us an ACK with some 462 * soft-ACKs in it and then never follow up with a proper hard ACK. 463 */ 464 if (last && rxrpc_sending_to_client(txb)) 465 why = rxrpc_reqack_no_srv_last; 466 else if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events)) 467 why = rxrpc_reqack_ack_lost; 468 else if (txb->flags & RXRPC_TXBUF_RESENT) 469 why = rxrpc_reqack_retrans; 470 else if (call->cong_ca_state == RXRPC_CA_SLOW_START && call->cong_cwnd <= RXRPC_MIN_CWND) 471 why = rxrpc_reqack_slow_start; 472 else if (call->tx_winsize <= 2) 473 why = rxrpc_reqack_small_txwin; 474 else if (call->rtt_count < 3) 475 why = rxrpc_reqack_more_rtt; 476 else if (ktime_before(ktime_add_ms(call->rtt_last_req, 1000), ktime_get_real())) 477 why = rxrpc_reqack_old_rtt; 478 else if (!last && !after(READ_ONCE(call->send_top), txb->seq)) 479 why = rxrpc_reqack_app_stall; 480 else 481 goto dont_set_request_ack; 482 483 rxrpc_inc_stat(call->rxnet, stat_why_req_ack[why]); 484 trace_rxrpc_req_ack(call->debug_id, txb->seq, why); 485 if (why != rxrpc_reqack_no_srv_last) { 486 flags |= RXRPC_REQUEST_ACK; 487 trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_data, -1, serial); 488 call->rtt_last_req = req->now; 489 } 490 dont_set_request_ack: 491 492 /* There's a jumbo header prepended to the data if we need it. */ 493 if (subpkt < req->n - 1) 494 flags |= RXRPC_JUMBO_PACKET; 495 else 496 flags &= ~RXRPC_JUMBO_PACKET; 497 if (subpkt == 0) { 498 whdr->flags = flags; 499 whdr->cksum = txb->cksum; 500 kv->iov_base = txb->data; 501 } else { 502 jumbo->flags = flags; 503 jumbo->pad = 0; 504 jumbo->cksum = txb->cksum; 505 kv->iov_base = jumbo; 506 len += sizeof(*jumbo); 507 } 508 509 trace_rxrpc_tx_data(call, txb->seq, txb->serial, flags, req->trace); 510 kv->iov_len = len; 511 return len; 512 } 513 514 /* 515 * Prepare a transmission queue object for initial transmission. Returns the 516 * number of microseconds since the transmission queue base timestamp. 517 */ 518 static unsigned int rxrpc_prepare_txqueue(struct rxrpc_txqueue *tq, 519 struct rxrpc_send_data_req *req) 520 { 521 if (!tq) 522 return 0; 523 if (tq->xmit_ts_base == KTIME_MIN) { 524 tq->xmit_ts_base = req->now; 525 return 0; 526 } 527 return ktime_to_us(ktime_sub(req->now, tq->xmit_ts_base)); 528 } 529 530 /* 531 * Prepare a (jumbo) packet for transmission. 532 */ 533 static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, 534 struct rxrpc_send_data_req *req, 535 struct rxrpc_wire_header *whdr) 536 { 537 struct rxrpc_txqueue *tq = req->tq; 538 rxrpc_serial_t serial; 539 unsigned int xmit_ts; 540 rxrpc_seq_t seq = req->seq; 541 size_t len = 0; 542 bool start_tlp = false; 543 544 trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit); 545 546 /* Each transmission of a Tx packet needs a new serial number */ 547 serial = rxrpc_get_next_serials(call->conn, req->n); 548 549 whdr->epoch = htonl(call->conn->proto.epoch); 550 whdr->cid = htonl(call->cid); 551 whdr->callNumber = htonl(call->call_id); 552 whdr->seq = htonl(seq); 553 whdr->serial = htonl(serial); 554 whdr->type = RXRPC_PACKET_TYPE_DATA; 555 whdr->flags = 0; 556 whdr->userStatus = 0; 557 whdr->securityIndex = call->security_ix; 558 whdr->_rsvd = 0; 559 whdr->serviceId = htons(call->conn->service_id); 560 561 call->tx_last_serial = serial + req->n - 1; 562 call->tx_last_sent = req->now; 563 xmit_ts = rxrpc_prepare_txqueue(tq, req); 564 prefetch(tq->next); 565 566 for (int i = 0;;) { 567 int ix = seq & RXRPC_TXQ_MASK; 568 struct rxrpc_txbuf *txb = tq->bufs[seq & RXRPC_TXQ_MASK]; 569 570 _debug("prep[%u] tq=%x q=%x", i, tq->qbase, seq); 571 572 /* Record (re-)transmission for RACK [RFC8985 6.1]. */ 573 if (__test_and_clear_bit(ix, &tq->segment_lost)) 574 call->tx_nr_lost--; 575 if (req->retrans) { 576 __set_bit(ix, &tq->ever_retransmitted); 577 __set_bit(ix, &tq->segment_retransmitted); 578 call->tx_nr_resent++; 579 } else { 580 call->tx_nr_sent++; 581 start_tlp = true; 582 } 583 tq->segment_xmit_ts[ix] = xmit_ts; 584 tq->segment_serial[ix] = serial; 585 if (i + 1 == req->n) 586 /* Only sample the last subpacket in a jumbo. */ 587 __set_bit(ix, &tq->rtt_samples); 588 len += rxrpc_prepare_data_subpacket(call, req, txb, whdr, serial, i); 589 serial++; 590 seq++; 591 i++; 592 if (i >= req->n) 593 break; 594 if (!(seq & RXRPC_TXQ_MASK)) { 595 tq = tq->next; 596 trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit_advance); 597 xmit_ts = rxrpc_prepare_txqueue(tq, req); 598 } 599 } 600 601 /* Set timeouts */ 602 if (req->tlp_probe) { 603 /* Sending TLP loss probe [RFC8985 7.3]. */ 604 call->tlp_serial = serial - 1; 605 call->tlp_seq = seq - 1; 606 } else if (start_tlp) { 607 /* Schedule TLP loss probe [RFC8985 7.2]. */ 608 ktime_t pto; 609 610 if (!test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags)) 611 /* The first packet may take longer to elicit a response. */ 612 pto = NSEC_PER_SEC; 613 else 614 pto = rxrpc_tlp_calc_pto(call, req->now); 615 616 call->rack_timer_mode = RXRPC_CALL_RACKTIMER_TLP_PTO; 617 call->rack_timo_at = ktime_add(req->now, pto); 618 trace_rxrpc_rack_timer(call, pto, false); 619 trace_rxrpc_timer_set(call, pto, rxrpc_timer_trace_rack_tlp_pto); 620 } 621 622 if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags)) { 623 ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo)); 624 625 call->expect_rx_by = ktime_add(req->now, delay); 626 trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx); 627 } 628 629 rxrpc_set_keepalive(call, req->now); 630 page_frag_free(whdr); 631 return len; 632 } 633 634 /* 635 * Send one or more packets through the transport endpoint 636 */ 637 void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req) 638 { 639 struct rxrpc_wire_header *whdr; 640 struct rxrpc_connection *conn = call->conn; 641 enum rxrpc_tx_point frag; 642 struct rxrpc_txqueue *tq = req->tq; 643 struct rxrpc_txbuf *txb; 644 struct msghdr msg; 645 rxrpc_seq_t seq = req->seq; 646 size_t len = sizeof(*whdr); 647 bool new_call = test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags); 648 int ret, stat_ix; 649 650 _enter("%x,%x-%x", tq->qbase, seq, seq + req->n - 1); 651 652 whdr = page_frag_alloc(&call->local->tx_alloc, sizeof(*whdr), GFP_NOFS); 653 if (!whdr) 654 return; /* Drop the packet if no memory. */ 655 656 call->local->kvec[0].iov_base = whdr; 657 call->local->kvec[0].iov_len = sizeof(*whdr); 658 659 stat_ix = umin(req->n, ARRAY_SIZE(call->rxnet->stat_tx_jumbo)) - 1; 660 atomic_inc(&call->rxnet->stat_tx_jumbo[stat_ix]); 661 662 len += rxrpc_prepare_data_packet(call, req, whdr); 663 txb = tq->bufs[seq & RXRPC_TXQ_MASK]; 664 665 iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, 1 + req->n, len); 666 667 msg.msg_name = &call->peer->srx.transport; 668 msg.msg_namelen = call->peer->srx.transport_len; 669 msg.msg_control = NULL; 670 msg.msg_controllen = 0; 671 msg.msg_flags = MSG_SPLICE_PAGES; 672 673 /* Send the packet with the don't fragment bit set unless we think it's 674 * too big or if this is a retransmission. 675 */ 676 if (seq == call->tx_transmitted + 1 && 677 len >= sizeof(struct rxrpc_wire_header) + call->peer->max_data) { 678 rxrpc_local_dont_fragment(conn->local, false); 679 frag = rxrpc_tx_point_call_data_frag; 680 } else { 681 rxrpc_local_dont_fragment(conn->local, true); 682 frag = rxrpc_tx_point_call_data_nofrag; 683 } 684 685 /* Track what we've attempted to transmit at least once so that the 686 * retransmission algorithm doesn't try to resend what we haven't sent 687 * yet. 688 */ 689 if (seq == call->tx_transmitted + 1) 690 call->tx_transmitted = seq + req->n - 1; 691 692 if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) { 693 static int lose; 694 695 if ((lose++ & 7) == 7) { 696 ret = 0; 697 trace_rxrpc_tx_data(call, txb->seq, txb->serial, txb->flags, 698 rxrpc_txdata_inject_loss); 699 rxrpc_peer_mark_tx(conn->peer); 700 goto done; 701 } 702 } 703 704 /* send the packet by UDP 705 * - returns -EMSGSIZE if UDP would have to fragment the packet 706 * to go out of the interface 707 * - in which case, we'll have processed the ICMP error 708 * message and update the peer record 709 */ 710 rxrpc_inc_stat(call->rxnet, stat_tx_data_send); 711 ret = do_udp_sendmsg(conn->local->socket, &msg, len); 712 rxrpc_peer_mark_tx(conn->peer); 713 714 if (ret == -EMSGSIZE) { 715 rxrpc_inc_stat(call->rxnet, stat_tx_data_send_msgsize); 716 trace_rxrpc_tx_packet(call->debug_id, whdr, frag); 717 ret = 0; 718 } else if (ret < 0) { 719 rxrpc_inc_stat(call->rxnet, stat_tx_data_send_fail); 720 trace_rxrpc_tx_fail(call->debug_id, txb->serial, ret, frag); 721 } else { 722 trace_rxrpc_tx_packet(call->debug_id, whdr, frag); 723 } 724 725 rxrpc_tx_backoff(call, ret); 726 727 if (ret < 0) { 728 /* Cancel the call if the initial transmission fails or if we 729 * hit due to network routing issues that aren't going away 730 * anytime soon. The layer above can arrange the 731 * retransmission. 732 */ 733 if (new_call || 734 ret == -ENETUNREACH || 735 ret == -EHOSTUNREACH || 736 ret == -ECONNREFUSED) 737 rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 738 RX_USER_ABORT, ret); 739 } 740 741 done: 742 _leave(" = %d [%u]", ret, call->peer->max_data); 743 } 744 745 /* 746 * Transmit a connection-level abort. 747 */ 748 void rxrpc_send_conn_abort(struct rxrpc_connection *conn) 749 { 750 struct rxrpc_wire_header whdr; 751 struct msghdr msg; 752 struct kvec iov[2]; 753 __be32 word; 754 size_t len; 755 u32 serial; 756 int ret; 757 758 msg.msg_name = &conn->peer->srx.transport; 759 msg.msg_namelen = conn->peer->srx.transport_len; 760 msg.msg_control = NULL; 761 msg.msg_controllen = 0; 762 msg.msg_flags = 0; 763 764 whdr.epoch = htonl(conn->proto.epoch); 765 whdr.cid = htonl(conn->proto.cid); 766 whdr.callNumber = 0; 767 whdr.seq = 0; 768 whdr.type = RXRPC_PACKET_TYPE_ABORT; 769 whdr.flags = conn->out_clientflag; 770 whdr.userStatus = 0; 771 whdr.securityIndex = conn->security_ix; 772 whdr._rsvd = 0; 773 whdr.serviceId = htons(conn->service_id); 774 775 word = htonl(conn->abort_code); 776 777 iov[0].iov_base = &whdr; 778 iov[0].iov_len = sizeof(whdr); 779 iov[1].iov_base = &word; 780 iov[1].iov_len = sizeof(word); 781 782 len = iov[0].iov_len + iov[1].iov_len; 783 784 serial = rxrpc_get_next_serial(conn); 785 whdr.serial = htonl(serial); 786 787 iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len); 788 ret = do_udp_sendmsg(conn->local->socket, &msg, len); 789 if (ret < 0) { 790 trace_rxrpc_tx_fail(conn->debug_id, serial, ret, 791 rxrpc_tx_point_conn_abort); 792 _debug("sendmsg failed: %d", ret); 793 return; 794 } 795 796 trace_rxrpc_tx_packet(conn->debug_id, &whdr, rxrpc_tx_point_conn_abort); 797 798 rxrpc_peer_mark_tx(conn->peer); 799 } 800 801 /* 802 * Reject a packet through the local endpoint. 803 */ 804 void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb) 805 { 806 struct rxrpc_wire_header whdr; 807 struct sockaddr_rxrpc srx; 808 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 809 struct msghdr msg; 810 struct kvec iov[2]; 811 size_t size; 812 __be32 code; 813 int ret, ioc; 814 815 if (sp->hdr.type == RXRPC_PACKET_TYPE_ABORT) 816 return; /* Never abort an abort. */ 817 818 rxrpc_see_skb(skb, rxrpc_skb_see_reject); 819 820 iov[0].iov_base = &whdr; 821 iov[0].iov_len = sizeof(whdr); 822 iov[1].iov_base = &code; 823 iov[1].iov_len = sizeof(code); 824 825 msg.msg_name = &srx.transport; 826 msg.msg_control = NULL; 827 msg.msg_controllen = 0; 828 msg.msg_flags = 0; 829 830 whdr = (struct rxrpc_wire_header) { 831 .epoch = htonl(sp->hdr.epoch), 832 .cid = htonl(sp->hdr.cid), 833 .callNumber = htonl(sp->hdr.callNumber), 834 .serviceId = htons(sp->hdr.serviceId), 835 .flags = ~sp->hdr.flags & RXRPC_CLIENT_INITIATED, 836 }; 837 838 switch (skb->mark) { 839 case RXRPC_SKB_MARK_REJECT_BUSY: 840 whdr.type = RXRPC_PACKET_TYPE_BUSY; 841 size = sizeof(whdr); 842 ioc = 1; 843 break; 844 case RXRPC_SKB_MARK_REJECT_CONN_ABORT: 845 whdr.callNumber = 0; 846 fallthrough; 847 case RXRPC_SKB_MARK_REJECT_ABORT: 848 whdr.type = RXRPC_PACKET_TYPE_ABORT; 849 code = htonl(skb->priority); 850 size = sizeof(whdr) + sizeof(code); 851 ioc = 2; 852 break; 853 default: 854 return; 855 } 856 857 if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) { 858 msg.msg_namelen = srx.transport_len; 859 860 iov_iter_kvec(&msg.msg_iter, WRITE, iov, ioc, size); 861 ret = do_udp_sendmsg(local->socket, &msg, size); 862 if (ret < 0) 863 trace_rxrpc_tx_fail(local->debug_id, 0, ret, 864 rxrpc_tx_point_reject); 865 else 866 trace_rxrpc_tx_packet(local->debug_id, &whdr, 867 rxrpc_tx_point_reject); 868 } 869 } 870 871 /* 872 * Send a VERSION reply to a peer as a keepalive. 873 */ 874 void rxrpc_send_keepalive(struct rxrpc_peer *peer) 875 { 876 struct rxrpc_wire_header whdr; 877 struct msghdr msg; 878 struct kvec iov[2]; 879 size_t len; 880 int ret; 881 882 _enter(""); 883 884 msg.msg_name = &peer->srx.transport; 885 msg.msg_namelen = peer->srx.transport_len; 886 msg.msg_control = NULL; 887 msg.msg_controllen = 0; 888 msg.msg_flags = 0; 889 890 whdr.epoch = htonl(peer->local->rxnet->epoch); 891 whdr.cid = 0; 892 whdr.callNumber = 0; 893 whdr.seq = 0; 894 whdr.serial = 0; 895 whdr.type = RXRPC_PACKET_TYPE_VERSION; /* Not client-initiated */ 896 whdr.flags = RXRPC_LAST_PACKET; 897 whdr.userStatus = 0; 898 whdr.securityIndex = 0; 899 whdr._rsvd = 0; 900 whdr.serviceId = 0; 901 902 iov[0].iov_base = &whdr; 903 iov[0].iov_len = sizeof(whdr); 904 iov[1].iov_base = (char *)rxrpc_keepalive_string; 905 iov[1].iov_len = sizeof(rxrpc_keepalive_string); 906 907 len = iov[0].iov_len + iov[1].iov_len; 908 909 iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len); 910 ret = do_udp_sendmsg(peer->local->socket, &msg, len); 911 if (ret < 0) 912 trace_rxrpc_tx_fail(peer->debug_id, 0, ret, 913 rxrpc_tx_point_version_keepalive); 914 else 915 trace_rxrpc_tx_packet(peer->debug_id, &whdr, 916 rxrpc_tx_point_version_keepalive); 917 918 rxrpc_peer_mark_tx(peer); 919 _leave(""); 920 } 921 922 /* 923 * Send a RESPONSE message. 924 */ 925 void rxrpc_send_response(struct rxrpc_connection *conn, struct sk_buff *response) 926 { 927 struct rxrpc_skb_priv *sp = rxrpc_skb(response); 928 struct scatterlist sg[16]; 929 struct bio_vec *bvec = conn->local->bvec; 930 struct msghdr msg; 931 size_t len = sp->resp.len; 932 __be32 wserial; 933 u32 serial = 0; 934 int ret, nr_sg; 935 936 _enter("C=%x,%x", conn->debug_id, sp->resp.challenge_serial); 937 938 sg_init_table(sg, ARRAY_SIZE(sg)); 939 ret = skb_to_sgvec(response, sg, 0, len); 940 if (ret < 0) 941 goto fail; 942 nr_sg = ret; 943 ret = -EIO; 944 if (WARN_ON_ONCE(nr_sg > ARRAY_SIZE(conn->local->bvec))) 945 goto fail; 946 947 for (int i = 0; i < nr_sg; i++) 948 bvec_set_page(&bvec[i], sg_page(&sg[i]), sg[i].length, sg[i].offset); 949 950 iov_iter_bvec(&msg.msg_iter, WRITE, bvec, nr_sg, len); 951 952 msg.msg_name = &conn->peer->srx.transport; 953 msg.msg_namelen = conn->peer->srx.transport_len; 954 msg.msg_control = NULL; 955 msg.msg_controllen = 0; 956 msg.msg_flags = MSG_SPLICE_PAGES; 957 958 serial = rxrpc_get_next_serials(conn, 1); 959 wserial = htonl(serial); 960 961 trace_rxrpc_tx_response(conn, serial, sp); 962 963 ret = skb_store_bits(response, offsetof(struct rxrpc_wire_header, serial), 964 &wserial, sizeof(wserial)); 965 if (ret < 0) 966 goto fail; 967 968 rxrpc_local_dont_fragment(conn->local, false); 969 970 ret = do_udp_sendmsg(conn->local->socket, &msg, len); 971 if (ret < 0) 972 goto fail; 973 974 rxrpc_peer_mark_tx(conn->peer); 975 return; 976 977 fail: 978 trace_rxrpc_tx_fail(conn->debug_id, serial, ret, 979 rxrpc_tx_point_response); 980 kleave(" = %d", ret); 981 } 982