xref: /linux/net/rxrpc/output.c (revision dfecb0c5af3b07ebfa84be63a7a21bfc9e29a872)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC packet transmission
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/gfp.h>
12 #include <linux/skbuff.h>
13 #include <linux/export.h>
14 #include <net/sock.h>
15 #include <net/af_rxrpc.h>
16 #include <net/udp.h>
17 #include "ar-internal.h"
18 
19 ssize_t do_udp_sendmsg(struct socket *socket, struct msghdr *msg, size_t len)
20 {
21 	struct sockaddr *sa = msg->msg_name;
22 	struct sock *sk = socket->sk;
23 
24 	if (IS_ENABLED(CONFIG_AF_RXRPC_IPV6)) {
25 		if (sa->sa_family == AF_INET6) {
26 			if (sk->sk_family != AF_INET6) {
27 				pr_warn("AF_INET6 address on AF_INET socket\n");
28 				return -ENOPROTOOPT;
29 			}
30 			return udpv6_sendmsg(sk, msg, len);
31 		}
32 	}
33 	return udp_sendmsg(sk, msg, len);
34 }
35 
36 struct rxrpc_abort_buffer {
37 	struct rxrpc_wire_header whdr;
38 	__be32 abort_code;
39 };
40 
41 static const char rxrpc_keepalive_string[] = "";
42 
43 /*
44  * Increase Tx backoff on transmission failure and clear it on success.
45  */
46 static void rxrpc_tx_backoff(struct rxrpc_call *call, int ret)
47 {
48 	if (ret < 0) {
49 		if (call->tx_backoff < 1000)
50 			call->tx_backoff += 100;
51 	} else {
52 		call->tx_backoff = 0;
53 	}
54 }
55 
56 /*
57  * Arrange for a keepalive ping a certain time after we last transmitted.  This
58  * lets the far side know we're still interested in this call and helps keep
59  * the route through any intervening firewall open.
60  *
61  * Receiving a response to the ping will prevent the ->expect_rx_by timer from
62  * expiring.
63  */
64 static void rxrpc_set_keepalive(struct rxrpc_call *call, ktime_t now)
65 {
66 	ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo) / 6);
67 
68 	call->keepalive_at = ktime_add(ktime_get_real(), delay);
69 	trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_keepalive);
70 }
71 
72 /*
73  * Allocate transmission buffers for an ACK and attach them to local->kv[].
74  */
75 static int rxrpc_alloc_ack(struct rxrpc_call *call, size_t sack_size)
76 {
77 	struct rxrpc_wire_header *whdr;
78 	struct rxrpc_acktrailer *trailer;
79 	struct rxrpc_ackpacket *ack;
80 	struct kvec *kv = call->local->kvec;
81 	gfp_t gfp = rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS;
82 	void *buf, *buf2 = NULL;
83 	u8 *filler;
84 
85 	buf = page_frag_alloc(&call->local->tx_alloc,
86 			      sizeof(*whdr) + sizeof(*ack) + 1 + 3 + sizeof(*trailer), gfp);
87 	if (!buf)
88 		return -ENOMEM;
89 
90 	if (sack_size) {
91 		buf2 = page_frag_alloc(&call->local->tx_alloc, sack_size, gfp);
92 		if (!buf2) {
93 			page_frag_free(buf);
94 			return -ENOMEM;
95 		}
96 	}
97 
98 	whdr	= buf;
99 	ack	= buf + sizeof(*whdr);
100 	filler	= buf + sizeof(*whdr) + sizeof(*ack) + 1;
101 	trailer	= buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3;
102 
103 	kv[0].iov_base	= whdr;
104 	kv[0].iov_len	= sizeof(*whdr) + sizeof(*ack);
105 	kv[1].iov_base	= buf2;
106 	kv[1].iov_len	= sack_size;
107 	kv[2].iov_base	= filler;
108 	kv[2].iov_len	= 3 + sizeof(*trailer);
109 	return 3; /* Number of kvec[] used. */
110 }
111 
112 static void rxrpc_free_ack(struct rxrpc_call *call)
113 {
114 	page_frag_free(call->local->kvec[0].iov_base);
115 	if (call->local->kvec[1].iov_base)
116 		page_frag_free(call->local->kvec[1].iov_base);
117 }
118 
119 /*
120  * Record the beginning of an RTT probe.
121  */
122 static void rxrpc_begin_rtt_probe(struct rxrpc_call *call, rxrpc_serial_t serial,
123 				  ktime_t now, enum rxrpc_rtt_tx_trace why)
124 {
125 	unsigned long avail = call->rtt_avail;
126 	int rtt_slot = 9;
127 
128 	if (!(avail & RXRPC_CALL_RTT_AVAIL_MASK))
129 		goto no_slot;
130 
131 	rtt_slot = __ffs(avail & RXRPC_CALL_RTT_AVAIL_MASK);
132 	if (!test_and_clear_bit(rtt_slot, &call->rtt_avail))
133 		goto no_slot;
134 
135 	call->rtt_serial[rtt_slot] = serial;
136 	call->rtt_sent_at[rtt_slot] = now;
137 	smp_wmb(); /* Write data before avail bit */
138 	set_bit(rtt_slot + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
139 
140 	trace_rxrpc_rtt_tx(call, why, rtt_slot, serial);
141 	return;
142 
143 no_slot:
144 	trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_no_slot, rtt_slot, serial);
145 }
146 
147 /*
148  * Fill out an ACK packet.
149  */
150 static int rxrpc_fill_out_ack(struct rxrpc_call *call, int nr_kv, u8 ack_reason,
151 			      rxrpc_serial_t serial_to_ack, rxrpc_serial_t *_ack_serial)
152 {
153 	struct kvec *kv = call->local->kvec;
154 	struct rxrpc_wire_header *whdr = kv[0].iov_base;
155 	struct rxrpc_acktrailer *trailer = kv[2].iov_base + 3;
156 	struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1);
157 	unsigned int qsize, sack, wrap, to, max_mtu, if_mtu;
158 	rxrpc_seq_t window, wtop;
159 	ktime_t now = ktime_get_real();
160 	int rsize;
161 	u8 *filler = kv[2].iov_base;
162 	u8 *sackp = kv[1].iov_base;
163 
164 	rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
165 
166 	window = call->ackr_window;
167 	wtop   = call->ackr_wtop;
168 	sack   = call->ackr_sack_base % RXRPC_SACK_SIZE;
169 
170 	*_ack_serial = rxrpc_get_next_serial(call->conn);
171 
172 	whdr->epoch		= htonl(call->conn->proto.epoch);
173 	whdr->cid		= htonl(call->cid);
174 	whdr->callNumber	= htonl(call->call_id);
175 	whdr->serial		= htonl(*_ack_serial);
176 	whdr->seq		= 0;
177 	whdr->type		= RXRPC_PACKET_TYPE_ACK;
178 	whdr->flags		= call->conn->out_clientflag | RXRPC_SLOW_START_OK;
179 	whdr->userStatus	= 0;
180 	whdr->securityIndex	= call->security_ix;
181 	whdr->_rsvd		= 0;
182 	whdr->serviceId		= htons(call->dest_srx.srx_service);
183 
184 	ack->bufferSpace	= 0;
185 	ack->maxSkew		= 0;
186 	ack->firstPacket	= htonl(window);
187 	ack->previousPacket	= htonl(call->rx_highest_seq);
188 	ack->serial		= htonl(serial_to_ack);
189 	ack->reason		= ack_reason;
190 	ack->nAcks		= wtop - window;
191 	filler[0]		= 0;
192 	filler[1]		= 0;
193 	filler[2]		= 0;
194 
195 	if (ack_reason == RXRPC_ACK_PING)
196 		whdr->flags |= RXRPC_REQUEST_ACK;
197 
198 	if (after(wtop, window)) {
199 		kv[1].iov_len = ack->nAcks;
200 
201 		wrap = RXRPC_SACK_SIZE - sack;
202 		to = umin(ack->nAcks, RXRPC_SACK_SIZE);
203 
204 		if (sack + ack->nAcks <= RXRPC_SACK_SIZE) {
205 			memcpy(sackp, call->ackr_sack_table + sack, ack->nAcks);
206 		} else {
207 			memcpy(sackp, call->ackr_sack_table + sack, wrap);
208 			memcpy(sackp + wrap, call->ackr_sack_table, to - wrap);
209 		}
210 	} else if (before(wtop, window)) {
211 		pr_warn("ack window backward %x %x", window, wtop);
212 	} else if (ack->reason == RXRPC_ACK_DELAY) {
213 		ack->reason = RXRPC_ACK_IDLE;
214 	}
215 
216 	qsize = (window - 1) - call->rx_consumed;
217 	rsize = max_t(int, call->rx_winsize - qsize, 0);
218 
219 	if_mtu = call->peer->if_mtu - call->peer->hdrsize;
220 	if (call->peer->ackr_adv_pmtud) {
221 		max_mtu = umax(call->peer->max_data, rxrpc_rx_mtu);
222 	} else {
223 		if_mtu = umin(if_mtu, 1444);
224 		max_mtu = if_mtu;
225 	}
226 
227 	trailer->maxMTU		= htonl(max_mtu);
228 	trailer->ifMTU		= htonl(if_mtu);
229 	trailer->rwind		= htonl(rsize);
230 	trailer->jumbo_max	= 0; /* Advertise pmtu discovery */
231 
232 	if (ack_reason == RXRPC_ACK_PING)
233 		rxrpc_begin_rtt_probe(call, *_ack_serial, now, rxrpc_rtt_tx_ping);
234 	if (whdr->flags & RXRPC_REQUEST_ACK)
235 		call->rtt_last_req = now;
236 	rxrpc_set_keepalive(call, now);
237 	return nr_kv;
238 }
239 
240 /*
241  * Transmit an ACK packet.
242  */
243 static void rxrpc_send_ack_packet(struct rxrpc_call *call, int nr_kv, size_t len,
244 				  rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why)
245 {
246 	struct kvec *kv = call->local->kvec;
247 	struct rxrpc_wire_header *whdr = kv[0].iov_base;
248 	struct rxrpc_acktrailer *trailer = kv[2].iov_base + 3;
249 	struct rxrpc_connection *conn;
250 	struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1);
251 	struct msghdr msg;
252 	int ret;
253 
254 	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
255 		return;
256 
257 	conn = call->conn;
258 
259 	msg.msg_name	= &call->peer->srx.transport;
260 	msg.msg_namelen	= call->peer->srx.transport_len;
261 	msg.msg_control	= NULL;
262 	msg.msg_controllen = 0;
263 	msg.msg_flags	= MSG_SPLICE_PAGES;
264 
265 	trace_rxrpc_tx_ack(call->debug_id, serial,
266 			   ntohl(ack->firstPacket),
267 			   ntohl(ack->serial), ack->reason, ack->nAcks,
268 			   ntohl(trailer->rwind), why);
269 
270 	rxrpc_inc_stat(call->rxnet, stat_tx_ack_send);
271 
272 	iov_iter_kvec(&msg.msg_iter, WRITE, kv, nr_kv, len);
273 	rxrpc_local_dont_fragment(conn->local, why == rxrpc_propose_ack_ping_for_mtu_probe);
274 
275 	ret = do_udp_sendmsg(conn->local->socket, &msg, len);
276 	rxrpc_peer_mark_tx(call->peer);
277 	if (ret < 0) {
278 		trace_rxrpc_tx_fail(call->debug_id, serial, ret,
279 				    rxrpc_tx_point_call_ack);
280 		if (why == rxrpc_propose_ack_ping_for_mtu_probe &&
281 		    ret == -EMSGSIZE)
282 			rxrpc_input_probe_for_pmtud(conn, serial, true);
283 	} else {
284 		trace_rxrpc_tx_packet(call->debug_id, whdr,
285 				      rxrpc_tx_point_call_ack);
286 		if (why == rxrpc_propose_ack_ping_for_mtu_probe) {
287 			call->peer->pmtud_pending = false;
288 			call->peer->pmtud_probing = true;
289 			call->conn->pmtud_probe = serial;
290 			call->conn->pmtud_call = call->debug_id;
291 			trace_rxrpc_pmtud_tx(call);
292 		}
293 	}
294 	rxrpc_tx_backoff(call, ret);
295 }
296 
297 /*
298  * Queue an ACK for immediate transmission.
299  */
300 void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
301 		    rxrpc_serial_t serial_to_ack, enum rxrpc_propose_ack_trace why)
302 {
303 	struct kvec *kv = call->local->kvec;
304 	rxrpc_serial_t ack_serial;
305 	size_t len;
306 	int nr_kv;
307 
308 	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
309 		return;
310 
311 	rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);
312 
313 	nr_kv = rxrpc_alloc_ack(call, call->ackr_wtop - call->ackr_window);
314 	if (nr_kv < 0) {
315 		kleave(" = -ENOMEM");
316 		return;
317 	}
318 
319 	nr_kv = rxrpc_fill_out_ack(call, nr_kv, ack_reason, serial_to_ack, &ack_serial);
320 	len  = kv[0].iov_len;
321 	len += kv[1].iov_len;
322 	len += kv[2].iov_len;
323 
324 	/* Extend a path MTU probe ACK. */
325 	if (why == rxrpc_propose_ack_ping_for_mtu_probe) {
326 		size_t probe_mtu = call->peer->pmtud_trial + sizeof(struct rxrpc_wire_header);
327 
328 		if (len > probe_mtu)
329 			goto skip;
330 		while (len < probe_mtu) {
331 			size_t part = umin(probe_mtu - len, PAGE_SIZE);
332 
333 			kv[nr_kv].iov_base = page_address(ZERO_PAGE(0));
334 			kv[nr_kv].iov_len = part;
335 			len += part;
336 			nr_kv++;
337 		}
338 	}
339 
340 	call->ackr_nr_unacked = 0;
341 	atomic_set(&call->ackr_nr_consumed, 0);
342 	clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
343 
344 	trace_rxrpc_send_ack(call, why, ack_reason, ack_serial);
345 	rxrpc_send_ack_packet(call, nr_kv, len, ack_serial, why);
346 skip:
347 	rxrpc_free_ack(call);
348 }
349 
350 /*
351  * Send an ACK probe for path MTU discovery.
352  */
353 void rxrpc_send_probe_for_pmtud(struct rxrpc_call *call)
354 {
355 	rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
356 		       rxrpc_propose_ack_ping_for_mtu_probe);
357 }
358 
359 /*
360  * Send an ABORT call packet.
361  */
362 int rxrpc_send_abort_packet(struct rxrpc_call *call)
363 {
364 	struct rxrpc_connection *conn;
365 	struct rxrpc_abort_buffer pkt;
366 	struct msghdr msg;
367 	struct kvec iov[1];
368 	rxrpc_serial_t serial;
369 	int ret;
370 
371 	/* Don't bother sending aborts for a client call once the server has
372 	 * hard-ACK'd all of its request data.  After that point, we're not
373 	 * going to stop the operation proceeding, and whilst we might limit
374 	 * the reply, it's not worth it if we can send a new call on the same
375 	 * channel instead, thereby closing off this call.
376 	 */
377 	if (rxrpc_is_client_call(call) &&
378 	    test_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags))
379 		return 0;
380 
381 	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
382 		return -ECONNRESET;
383 
384 	conn = call->conn;
385 
386 	msg.msg_name	= &call->peer->srx.transport;
387 	msg.msg_namelen	= call->peer->srx.transport_len;
388 	msg.msg_control	= NULL;
389 	msg.msg_controllen = 0;
390 	msg.msg_flags	= 0;
391 
392 	pkt.whdr.epoch		= htonl(conn->proto.epoch);
393 	pkt.whdr.cid		= htonl(call->cid);
394 	pkt.whdr.callNumber	= htonl(call->call_id);
395 	pkt.whdr.seq		= 0;
396 	pkt.whdr.type		= RXRPC_PACKET_TYPE_ABORT;
397 	pkt.whdr.flags		= conn->out_clientflag;
398 	pkt.whdr.userStatus	= 0;
399 	pkt.whdr.securityIndex	= call->security_ix;
400 	pkt.whdr._rsvd		= 0;
401 	pkt.whdr.serviceId	= htons(call->dest_srx.srx_service);
402 	pkt.abort_code		= htonl(call->abort_code);
403 
404 	iov[0].iov_base	= &pkt;
405 	iov[0].iov_len	= sizeof(pkt);
406 
407 	serial = rxrpc_get_next_serial(conn);
408 	pkt.whdr.serial = htonl(serial);
409 
410 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, sizeof(pkt));
411 	ret = do_udp_sendmsg(conn->local->socket, &msg, sizeof(pkt));
412 	rxrpc_peer_mark_tx(conn->peer);
413 	if (ret < 0)
414 		trace_rxrpc_tx_fail(call->debug_id, serial, ret,
415 				    rxrpc_tx_point_call_abort);
416 	else
417 		trace_rxrpc_tx_packet(call->debug_id, &pkt.whdr,
418 				      rxrpc_tx_point_call_abort);
419 	rxrpc_tx_backoff(call, ret);
420 	return ret;
421 }
422 
423 /*
424  * Prepare a (sub)packet for transmission.
425  */
426 static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call,
427 					   struct rxrpc_send_data_req *req,
428 					   struct rxrpc_txbuf *txb,
429 					   struct rxrpc_wire_header *whdr,
430 					   rxrpc_serial_t serial, int subpkt)
431 {
432 	struct rxrpc_jumbo_header *jumbo = txb->data - sizeof(*jumbo);
433 	enum rxrpc_req_ack_trace why;
434 	struct rxrpc_connection *conn = call->conn;
435 	struct kvec *kv = &call->local->kvec[1 + subpkt];
436 	size_t len = txb->pkt_len;
437 	bool last;
438 	u8 flags;
439 
440 	_enter("%x,%zd", txb->seq, len);
441 
442 	txb->serial = serial;
443 
444 	if (test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags) &&
445 	    txb->seq == 1)
446 		whdr->userStatus = RXRPC_USERSTATUS_SERVICE_UPGRADE;
447 
448 	txb->flags &= ~RXRPC_REQUEST_ACK;
449 	flags = txb->flags & RXRPC_TXBUF_WIRE_FLAGS;
450 	last = txb->flags & RXRPC_LAST_PACKET;
451 
452 	if (subpkt < req->n - 1) {
453 		len = RXRPC_JUMBO_DATALEN;
454 		goto dont_set_request_ack;
455 	}
456 
457 	/* If our RTT cache needs working on, request an ACK.  Also request
458 	 * ACKs if a DATA packet appears to have been lost.
459 	 *
460 	 * However, we mustn't request an ACK on the last reply packet of a
461 	 * service call, lest OpenAFS incorrectly send us an ACK with some
462 	 * soft-ACKs in it and then never follow up with a proper hard ACK.
463 	 */
464 	if (last && rxrpc_sending_to_client(txb))
465 		why = rxrpc_reqack_no_srv_last;
466 	else if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events))
467 		why = rxrpc_reqack_ack_lost;
468 	else if (txb->flags & RXRPC_TXBUF_RESENT)
469 		why = rxrpc_reqack_retrans;
470 	else if (call->cong_ca_state == RXRPC_CA_SLOW_START && call->cong_cwnd <= RXRPC_MIN_CWND)
471 		why = rxrpc_reqack_slow_start;
472 	else if (call->tx_winsize <= 2)
473 		why = rxrpc_reqack_small_txwin;
474 	else if (call->rtt_count < 3)
475 		why = rxrpc_reqack_more_rtt;
476 	else if (ktime_before(ktime_add_ms(call->rtt_last_req, 1000), ktime_get_real()))
477 		why = rxrpc_reqack_old_rtt;
478 	else if (!last && !after(READ_ONCE(call->send_top), txb->seq))
479 		why = rxrpc_reqack_app_stall;
480 	else
481 		goto dont_set_request_ack;
482 
483 	rxrpc_inc_stat(call->rxnet, stat_why_req_ack[why]);
484 	trace_rxrpc_req_ack(call->debug_id, txb->seq, why);
485 	if (why != rxrpc_reqack_no_srv_last) {
486 		flags |= RXRPC_REQUEST_ACK;
487 		trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_data, -1, serial);
488 		call->rtt_last_req = req->now;
489 	}
490 dont_set_request_ack:
491 
492 	/* There's a jumbo header prepended to the data if we need it. */
493 	if (subpkt < req->n - 1)
494 		flags |= RXRPC_JUMBO_PACKET;
495 	else
496 		flags &= ~RXRPC_JUMBO_PACKET;
497 	if (subpkt == 0) {
498 		whdr->flags	= flags;
499 		whdr->cksum	= txb->cksum;
500 		kv->iov_base	= txb->data;
501 	} else {
502 		jumbo->flags	= flags;
503 		jumbo->pad	= 0;
504 		jumbo->cksum	= txb->cksum;
505 		kv->iov_base	= jumbo;
506 		len += sizeof(*jumbo);
507 	}
508 
509 	trace_rxrpc_tx_data(call, txb->seq, txb->serial, flags, req->trace);
510 	kv->iov_len = len;
511 	return len;
512 }
513 
514 /*
515  * Prepare a transmission queue object for initial transmission.  Returns the
516  * number of microseconds since the transmission queue base timestamp.
517  */
518 static unsigned int rxrpc_prepare_txqueue(struct rxrpc_txqueue *tq,
519 					  struct rxrpc_send_data_req *req)
520 {
521 	if (!tq)
522 		return 0;
523 	if (tq->xmit_ts_base == KTIME_MIN) {
524 		tq->xmit_ts_base = req->now;
525 		return 0;
526 	}
527 	return ktime_to_us(ktime_sub(req->now, tq->xmit_ts_base));
528 }
529 
530 /*
531  * Prepare a (jumbo) packet for transmission.
532  */
533 static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call,
534 					struct rxrpc_send_data_req *req,
535 					struct rxrpc_wire_header *whdr)
536 {
537 	struct rxrpc_txqueue *tq = req->tq;
538 	rxrpc_serial_t serial;
539 	unsigned int xmit_ts;
540 	rxrpc_seq_t seq = req->seq;
541 	size_t len = 0;
542 	bool start_tlp = false;
543 
544 	trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit);
545 
546 	/* Each transmission of a Tx packet needs a new serial number */
547 	serial = rxrpc_get_next_serials(call->conn, req->n);
548 
549 	whdr->epoch		= htonl(call->conn->proto.epoch);
550 	whdr->cid		= htonl(call->cid);
551 	whdr->callNumber	= htonl(call->call_id);
552 	whdr->seq		= htonl(seq);
553 	whdr->serial		= htonl(serial);
554 	whdr->type		= RXRPC_PACKET_TYPE_DATA;
555 	whdr->flags		= 0;
556 	whdr->userStatus	= 0;
557 	whdr->securityIndex	= call->security_ix;
558 	whdr->_rsvd		= 0;
559 	whdr->serviceId		= htons(call->conn->service_id);
560 
561 	call->tx_last_serial = serial + req->n - 1;
562 	call->tx_last_sent = req->now;
563 	xmit_ts = rxrpc_prepare_txqueue(tq, req);
564 	prefetch(tq->next);
565 
566 	for (int i = 0;;) {
567 		int ix = seq & RXRPC_TXQ_MASK;
568 		struct rxrpc_txbuf *txb = tq->bufs[seq & RXRPC_TXQ_MASK];
569 
570 		_debug("prep[%u] tq=%x q=%x", i, tq->qbase, seq);
571 
572 		/* Record (re-)transmission for RACK [RFC8985 6.1]. */
573 		if (__test_and_clear_bit(ix, &tq->segment_lost))
574 			call->tx_nr_lost--;
575 		if (req->retrans) {
576 			__set_bit(ix, &tq->ever_retransmitted);
577 			__set_bit(ix, &tq->segment_retransmitted);
578 			call->tx_nr_resent++;
579 		} else {
580 			call->tx_nr_sent++;
581 			start_tlp = true;
582 		}
583 		tq->segment_xmit_ts[ix] = xmit_ts;
584 		tq->segment_serial[ix] = serial;
585 		if (i + 1 == req->n)
586 			/* Only sample the last subpacket in a jumbo. */
587 			__set_bit(ix, &tq->rtt_samples);
588 		len += rxrpc_prepare_data_subpacket(call, req, txb, whdr, serial, i);
589 		serial++;
590 		seq++;
591 		i++;
592 		if (i >= req->n)
593 			break;
594 		if (!(seq & RXRPC_TXQ_MASK)) {
595 			tq = tq->next;
596 			trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit_advance);
597 			xmit_ts = rxrpc_prepare_txqueue(tq, req);
598 		}
599 	}
600 
601 	/* Set timeouts */
602 	if (req->tlp_probe) {
603 		/* Sending TLP loss probe [RFC8985 7.3]. */
604 		call->tlp_serial = serial - 1;
605 		call->tlp_seq = seq - 1;
606 	} else if (start_tlp) {
607 		/* Schedule TLP loss probe [RFC8985 7.2]. */
608 		ktime_t pto;
609 
610 		if (!test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags))
611 			 /* The first packet may take longer to elicit a response. */
612 			pto = NSEC_PER_SEC;
613 		else
614 			pto = rxrpc_tlp_calc_pto(call, req->now);
615 
616 		call->rack_timer_mode = RXRPC_CALL_RACKTIMER_TLP_PTO;
617 		call->rack_timo_at = ktime_add(req->now, pto);
618 		trace_rxrpc_rack_timer(call, pto, false);
619 		trace_rxrpc_timer_set(call, pto, rxrpc_timer_trace_rack_tlp_pto);
620 	}
621 
622 	if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags)) {
623 		ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo));
624 
625 		call->expect_rx_by = ktime_add(req->now, delay);
626 		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx);
627 	}
628 
629 	rxrpc_set_keepalive(call, req->now);
630 	page_frag_free(whdr);
631 	return len;
632 }
633 
634 /*
635  * Send one or more packets through the transport endpoint
636  */
637 void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
638 {
639 	struct rxrpc_wire_header *whdr;
640 	struct rxrpc_connection *conn = call->conn;
641 	enum rxrpc_tx_point frag;
642 	struct rxrpc_txqueue *tq = req->tq;
643 	struct rxrpc_txbuf *txb;
644 	struct msghdr msg;
645 	rxrpc_seq_t seq = req->seq;
646 	size_t len = sizeof(*whdr);
647 	bool new_call = test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags);
648 	int ret, stat_ix;
649 
650 	_enter("%x,%x-%x", tq->qbase, seq, seq + req->n - 1);
651 
652 	whdr = page_frag_alloc(&call->local->tx_alloc, sizeof(*whdr), GFP_NOFS);
653 	if (!whdr)
654 		return; /* Drop the packet if no memory. */
655 
656 	call->local->kvec[0].iov_base = whdr;
657 	call->local->kvec[0].iov_len = sizeof(*whdr);
658 
659 	stat_ix = umin(req->n, ARRAY_SIZE(call->rxnet->stat_tx_jumbo)) - 1;
660 	atomic_inc(&call->rxnet->stat_tx_jumbo[stat_ix]);
661 
662 	len += rxrpc_prepare_data_packet(call, req, whdr);
663 	txb = tq->bufs[seq & RXRPC_TXQ_MASK];
664 
665 	iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, 1 + req->n, len);
666 
667 	msg.msg_name	= &call->peer->srx.transport;
668 	msg.msg_namelen	= call->peer->srx.transport_len;
669 	msg.msg_control	= NULL;
670 	msg.msg_controllen = 0;
671 	msg.msg_flags	= MSG_SPLICE_PAGES;
672 
673 	/* Send the packet with the don't fragment bit set unless we think it's
674 	 * too big or if this is a retransmission.
675 	 */
676 	if (seq == call->tx_transmitted + 1 &&
677 	    len >= sizeof(struct rxrpc_wire_header) + call->peer->max_data) {
678 		rxrpc_local_dont_fragment(conn->local, false);
679 		frag = rxrpc_tx_point_call_data_frag;
680 	} else {
681 		rxrpc_local_dont_fragment(conn->local, true);
682 		frag = rxrpc_tx_point_call_data_nofrag;
683 	}
684 
685 	/* Track what we've attempted to transmit at least once so that the
686 	 * retransmission algorithm doesn't try to resend what we haven't sent
687 	 * yet.
688 	 */
689 	if (seq == call->tx_transmitted + 1)
690 		call->tx_transmitted = seq + req->n - 1;
691 
692 	if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
693 		static int lose;
694 
695 		if ((lose++ & 7) == 7) {
696 			ret = 0;
697 			trace_rxrpc_tx_data(call, txb->seq, txb->serial, txb->flags,
698 					    rxrpc_txdata_inject_loss);
699 			rxrpc_peer_mark_tx(conn->peer);
700 			goto done;
701 		}
702 	}
703 
704 	/* send the packet by UDP
705 	 * - returns -EMSGSIZE if UDP would have to fragment the packet
706 	 *   to go out of the interface
707 	 *   - in which case, we'll have processed the ICMP error
708 	 *     message and update the peer record
709 	 */
710 	rxrpc_inc_stat(call->rxnet, stat_tx_data_send);
711 	ret = do_udp_sendmsg(conn->local->socket, &msg, len);
712 	rxrpc_peer_mark_tx(conn->peer);
713 
714 	if (ret == -EMSGSIZE) {
715 		rxrpc_inc_stat(call->rxnet, stat_tx_data_send_msgsize);
716 		trace_rxrpc_tx_packet(call->debug_id, whdr, frag);
717 		ret = 0;
718 	} else if (ret < 0) {
719 		rxrpc_inc_stat(call->rxnet, stat_tx_data_send_fail);
720 		trace_rxrpc_tx_fail(call->debug_id, txb->serial, ret, frag);
721 	} else {
722 		trace_rxrpc_tx_packet(call->debug_id, whdr, frag);
723 	}
724 
725 	rxrpc_tx_backoff(call, ret);
726 
727 	if (ret < 0) {
728 		/* Cancel the call if the initial transmission fails or if we
729 		 * hit due to network routing issues that aren't going away
730 		 * anytime soon.  The layer above can arrange the
731 		 * retransmission.
732 		 */
733 		if (new_call ||
734 		    ret == -ENETUNREACH ||
735 		    ret == -EHOSTUNREACH ||
736 		    ret == -ECONNREFUSED)
737 			rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
738 						  RX_USER_ABORT, ret);
739 	}
740 
741 done:
742 	_leave(" = %d [%u]", ret, call->peer->max_data);
743 }
744 
745 /*
746  * Transmit a connection-level abort.
747  */
748 void rxrpc_send_conn_abort(struct rxrpc_connection *conn)
749 {
750 	struct rxrpc_wire_header whdr;
751 	struct msghdr msg;
752 	struct kvec iov[2];
753 	__be32 word;
754 	size_t len;
755 	u32 serial;
756 	int ret;
757 
758 	msg.msg_name	= &conn->peer->srx.transport;
759 	msg.msg_namelen	= conn->peer->srx.transport_len;
760 	msg.msg_control	= NULL;
761 	msg.msg_controllen = 0;
762 	msg.msg_flags	= 0;
763 
764 	whdr.epoch	= htonl(conn->proto.epoch);
765 	whdr.cid	= htonl(conn->proto.cid);
766 	whdr.callNumber	= 0;
767 	whdr.seq	= 0;
768 	whdr.type	= RXRPC_PACKET_TYPE_ABORT;
769 	whdr.flags	= conn->out_clientflag;
770 	whdr.userStatus	= 0;
771 	whdr.securityIndex = conn->security_ix;
772 	whdr._rsvd	= 0;
773 	whdr.serviceId	= htons(conn->service_id);
774 
775 	word		= htonl(conn->abort_code);
776 
777 	iov[0].iov_base	= &whdr;
778 	iov[0].iov_len	= sizeof(whdr);
779 	iov[1].iov_base	= &word;
780 	iov[1].iov_len	= sizeof(word);
781 
782 	len = iov[0].iov_len + iov[1].iov_len;
783 
784 	serial = rxrpc_get_next_serial(conn);
785 	whdr.serial = htonl(serial);
786 
787 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
788 	ret = do_udp_sendmsg(conn->local->socket, &msg, len);
789 	if (ret < 0) {
790 		trace_rxrpc_tx_fail(conn->debug_id, serial, ret,
791 				    rxrpc_tx_point_conn_abort);
792 		_debug("sendmsg failed: %d", ret);
793 		return;
794 	}
795 
796 	trace_rxrpc_tx_packet(conn->debug_id, &whdr, rxrpc_tx_point_conn_abort);
797 
798 	rxrpc_peer_mark_tx(conn->peer);
799 }
800 
801 /*
802  * Reject a packet through the local endpoint.
803  */
804 void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
805 {
806 	struct rxrpc_wire_header whdr;
807 	struct sockaddr_rxrpc srx;
808 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
809 	struct msghdr msg;
810 	struct kvec iov[2];
811 	size_t size;
812 	__be32 code;
813 	int ret, ioc;
814 
815 	if (sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
816 		return; /* Never abort an abort. */
817 
818 	rxrpc_see_skb(skb, rxrpc_skb_see_reject);
819 
820 	iov[0].iov_base = &whdr;
821 	iov[0].iov_len = sizeof(whdr);
822 	iov[1].iov_base = &code;
823 	iov[1].iov_len = sizeof(code);
824 
825 	msg.msg_name = &srx.transport;
826 	msg.msg_control = NULL;
827 	msg.msg_controllen = 0;
828 	msg.msg_flags = 0;
829 
830 	whdr = (struct rxrpc_wire_header) {
831 		.epoch		= htonl(sp->hdr.epoch),
832 		.cid		= htonl(sp->hdr.cid),
833 		.callNumber	= htonl(sp->hdr.callNumber),
834 		.serviceId	= htons(sp->hdr.serviceId),
835 		.flags		= ~sp->hdr.flags & RXRPC_CLIENT_INITIATED,
836 	};
837 
838 	switch (skb->mark) {
839 	case RXRPC_SKB_MARK_REJECT_BUSY:
840 		whdr.type = RXRPC_PACKET_TYPE_BUSY;
841 		size = sizeof(whdr);
842 		ioc = 1;
843 		break;
844 	case RXRPC_SKB_MARK_REJECT_CONN_ABORT:
845 		whdr.callNumber	= 0;
846 		fallthrough;
847 	case RXRPC_SKB_MARK_REJECT_ABORT:
848 		whdr.type = RXRPC_PACKET_TYPE_ABORT;
849 		code = htonl(skb->priority);
850 		size = sizeof(whdr) + sizeof(code);
851 		ioc = 2;
852 		break;
853 	default:
854 		return;
855 	}
856 
857 	if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) {
858 		msg.msg_namelen = srx.transport_len;
859 
860 		iov_iter_kvec(&msg.msg_iter, WRITE, iov, ioc, size);
861 		ret = do_udp_sendmsg(local->socket, &msg, size);
862 		if (ret < 0)
863 			trace_rxrpc_tx_fail(local->debug_id, 0, ret,
864 					    rxrpc_tx_point_reject);
865 		else
866 			trace_rxrpc_tx_packet(local->debug_id, &whdr,
867 					      rxrpc_tx_point_reject);
868 	}
869 }
870 
871 /*
872  * Send a VERSION reply to a peer as a keepalive.
873  */
874 void rxrpc_send_keepalive(struct rxrpc_peer *peer)
875 {
876 	struct rxrpc_wire_header whdr;
877 	struct msghdr msg;
878 	struct kvec iov[2];
879 	size_t len;
880 	int ret;
881 
882 	_enter("");
883 
884 	msg.msg_name	= &peer->srx.transport;
885 	msg.msg_namelen	= peer->srx.transport_len;
886 	msg.msg_control	= NULL;
887 	msg.msg_controllen = 0;
888 	msg.msg_flags	= 0;
889 
890 	whdr.epoch	= htonl(peer->local->rxnet->epoch);
891 	whdr.cid	= 0;
892 	whdr.callNumber	= 0;
893 	whdr.seq	= 0;
894 	whdr.serial	= 0;
895 	whdr.type	= RXRPC_PACKET_TYPE_VERSION; /* Not client-initiated */
896 	whdr.flags	= RXRPC_LAST_PACKET;
897 	whdr.userStatus	= 0;
898 	whdr.securityIndex = 0;
899 	whdr._rsvd	= 0;
900 	whdr.serviceId	= 0;
901 
902 	iov[0].iov_base	= &whdr;
903 	iov[0].iov_len	= sizeof(whdr);
904 	iov[1].iov_base	= (char *)rxrpc_keepalive_string;
905 	iov[1].iov_len	= sizeof(rxrpc_keepalive_string);
906 
907 	len = iov[0].iov_len + iov[1].iov_len;
908 
909 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
910 	ret = do_udp_sendmsg(peer->local->socket, &msg, len);
911 	if (ret < 0)
912 		trace_rxrpc_tx_fail(peer->debug_id, 0, ret,
913 				    rxrpc_tx_point_version_keepalive);
914 	else
915 		trace_rxrpc_tx_packet(peer->debug_id, &whdr,
916 				      rxrpc_tx_point_version_keepalive);
917 
918 	rxrpc_peer_mark_tx(peer);
919 	_leave("");
920 }
921 
922 /*
923  * Send a RESPONSE message.
924  */
925 void rxrpc_send_response(struct rxrpc_connection *conn, struct sk_buff *response)
926 {
927 	struct rxrpc_skb_priv *sp = rxrpc_skb(response);
928 	struct scatterlist sg[16];
929 	struct bio_vec *bvec = conn->local->bvec;
930 	struct msghdr msg;
931 	size_t len = sp->resp.len;
932 	__be32 wserial;
933 	u32 serial = 0;
934 	int ret, nr_sg;
935 
936 	_enter("C=%x,%x", conn->debug_id, sp->resp.challenge_serial);
937 
938 	sg_init_table(sg, ARRAY_SIZE(sg));
939 	ret = skb_to_sgvec(response, sg, 0, len);
940 	if (ret < 0)
941 		goto fail;
942 	nr_sg = ret;
943 	ret = -EIO;
944 	if (WARN_ON_ONCE(nr_sg > ARRAY_SIZE(conn->local->bvec)))
945 		goto fail;
946 
947 	for (int i = 0; i < nr_sg; i++)
948 		bvec_set_page(&bvec[i], sg_page(&sg[i]), sg[i].length, sg[i].offset);
949 
950 	iov_iter_bvec(&msg.msg_iter, WRITE, bvec, nr_sg, len);
951 
952 	msg.msg_name	= &conn->peer->srx.transport;
953 	msg.msg_namelen	= conn->peer->srx.transport_len;
954 	msg.msg_control	= NULL;
955 	msg.msg_controllen = 0;
956 	msg.msg_flags	= MSG_SPLICE_PAGES;
957 
958 	serial = rxrpc_get_next_serials(conn, 1);
959 	wserial = htonl(serial);
960 
961 	trace_rxrpc_tx_response(conn, serial, sp);
962 
963 	ret = skb_store_bits(response, offsetof(struct rxrpc_wire_header, serial),
964 			     &wserial, sizeof(wserial));
965 	if (ret < 0)
966 		goto fail;
967 
968 	rxrpc_local_dont_fragment(conn->local, false);
969 
970 	ret = do_udp_sendmsg(conn->local->socket, &msg, len);
971 	if (ret < 0)
972 		goto fail;
973 
974 	rxrpc_peer_mark_tx(conn->peer);
975 	return;
976 
977 fail:
978 	trace_rxrpc_tx_fail(conn->debug_id, serial, ret,
979 			    rxrpc_tx_point_response);
980 	kleave(" = %d", ret);
981 }
982