xref: /linux/net/rxrpc/output.c (revision e814f3fd16acfb7f9966773953de8f740a1e3202)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC packet transmission
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/gfp.h>
12 #include <linux/skbuff.h>
13 #include <linux/export.h>
14 #include <net/sock.h>
15 #include <net/af_rxrpc.h>
16 #include <net/udp.h>
17 #include "ar-internal.h"
18 
19 extern int udpv6_sendmsg(struct sock *sk, struct msghdr *msg, size_t len);
20 
21 static ssize_t do_udp_sendmsg(struct socket *socket, struct msghdr *msg, size_t len)
22 {
23 	struct sockaddr *sa = msg->msg_name;
24 	struct sock *sk = socket->sk;
25 
26 	if (IS_ENABLED(CONFIG_AF_RXRPC_IPV6)) {
27 		if (sa->sa_family == AF_INET6) {
28 			if (sk->sk_family != AF_INET6) {
29 				pr_warn("AF_INET6 address on AF_INET socket\n");
30 				return -ENOPROTOOPT;
31 			}
32 			return udpv6_sendmsg(sk, msg, len);
33 		}
34 	}
35 	return udp_sendmsg(sk, msg, len);
36 }
37 
38 struct rxrpc_abort_buffer {
39 	struct rxrpc_wire_header whdr;
40 	__be32 abort_code;
41 };
42 
43 static const char rxrpc_keepalive_string[] = "";
44 
45 /*
46  * Increase Tx backoff on transmission failure and clear it on success.
47  */
48 static void rxrpc_tx_backoff(struct rxrpc_call *call, int ret)
49 {
50 	if (ret < 0) {
51 		if (call->tx_backoff < 1000)
52 			call->tx_backoff += 100;
53 	} else {
54 		call->tx_backoff = 0;
55 	}
56 }
57 
58 /*
59  * Arrange for a keepalive ping a certain time after we last transmitted.  This
60  * lets the far side know we're still interested in this call and helps keep
61  * the route through any intervening firewall open.
62  *
63  * Receiving a response to the ping will prevent the ->expect_rx_by timer from
64  * expiring.
65  */
66 static void rxrpc_set_keepalive(struct rxrpc_call *call, ktime_t now)
67 {
68 	ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo) / 6);
69 
70 	call->keepalive_at = ktime_add(ktime_get_real(), delay);
71 	trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_keepalive);
72 }
73 
74 /*
75  * Allocate transmission buffers for an ACK and attach them to local->kv[].
76  */
77 static int rxrpc_alloc_ack(struct rxrpc_call *call, size_t sack_size)
78 {
79 	struct rxrpc_wire_header *whdr;
80 	struct rxrpc_acktrailer *trailer;
81 	struct rxrpc_ackpacket *ack;
82 	struct kvec *kv = call->local->kvec;
83 	gfp_t gfp = rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS;
84 	void *buf, *buf2 = NULL;
85 	u8 *filler;
86 
87 	buf = page_frag_alloc(&call->local->tx_alloc,
88 			      sizeof(*whdr) + sizeof(*ack) + 1 + 3 + sizeof(*trailer), gfp);
89 	if (!buf)
90 		return -ENOMEM;
91 
92 	if (sack_size) {
93 		buf2 = page_frag_alloc(&call->local->tx_alloc, sack_size, gfp);
94 		if (!buf2) {
95 			page_frag_free(buf);
96 			return -ENOMEM;
97 		}
98 	}
99 
100 	whdr	= buf;
101 	ack	= buf + sizeof(*whdr);
102 	filler	= buf + sizeof(*whdr) + sizeof(*ack) + 1;
103 	trailer	= buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3;
104 
105 	kv[0].iov_base	= whdr;
106 	kv[0].iov_len	= sizeof(*whdr) + sizeof(*ack);
107 	kv[1].iov_base	= buf2;
108 	kv[1].iov_len	= sack_size;
109 	kv[2].iov_base	= filler;
110 	kv[2].iov_len	= 3 + sizeof(*trailer);
111 	return 3; /* Number of kvec[] used. */
112 }
113 
114 static void rxrpc_free_ack(struct rxrpc_call *call)
115 {
116 	page_frag_free(call->local->kvec[0].iov_base);
117 	if (call->local->kvec[1].iov_base)
118 		page_frag_free(call->local->kvec[1].iov_base);
119 }
120 
121 /*
122  * Record the beginning of an RTT probe.
123  */
124 static void rxrpc_begin_rtt_probe(struct rxrpc_call *call, rxrpc_serial_t serial,
125 				  ktime_t now, enum rxrpc_rtt_tx_trace why)
126 {
127 	unsigned long avail = call->rtt_avail;
128 	int rtt_slot = 9;
129 
130 	if (!(avail & RXRPC_CALL_RTT_AVAIL_MASK))
131 		goto no_slot;
132 
133 	rtt_slot = __ffs(avail & RXRPC_CALL_RTT_AVAIL_MASK);
134 	if (!test_and_clear_bit(rtt_slot, &call->rtt_avail))
135 		goto no_slot;
136 
137 	call->rtt_serial[rtt_slot] = serial;
138 	call->rtt_sent_at[rtt_slot] = now;
139 	smp_wmb(); /* Write data before avail bit */
140 	set_bit(rtt_slot + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
141 
142 	trace_rxrpc_rtt_tx(call, why, rtt_slot, serial);
143 	return;
144 
145 no_slot:
146 	trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_no_slot, rtt_slot, serial);
147 }
148 
149 /*
150  * Fill out an ACK packet.
151  */
152 static int rxrpc_fill_out_ack(struct rxrpc_call *call, int nr_kv, u8 ack_reason,
153 			      rxrpc_serial_t serial_to_ack, rxrpc_serial_t *_ack_serial)
154 {
155 	struct kvec *kv = call->local->kvec;
156 	struct rxrpc_wire_header *whdr = kv[0].iov_base;
157 	struct rxrpc_acktrailer *trailer = kv[2].iov_base + 3;
158 	struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1);
159 	unsigned int qsize, sack, wrap, to, max_mtu, if_mtu;
160 	rxrpc_seq_t window, wtop;
161 	ktime_t now = ktime_get_real();
162 	int rsize;
163 	u8 *filler = kv[2].iov_base;
164 	u8 *sackp = kv[1].iov_base;
165 
166 	rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
167 
168 	window = call->ackr_window;
169 	wtop   = call->ackr_wtop;
170 	sack   = call->ackr_sack_base % RXRPC_SACK_SIZE;
171 
172 	*_ack_serial = rxrpc_get_next_serial(call->conn);
173 
174 	whdr->epoch		= htonl(call->conn->proto.epoch);
175 	whdr->cid		= htonl(call->cid);
176 	whdr->callNumber	= htonl(call->call_id);
177 	whdr->serial		= htonl(*_ack_serial);
178 	whdr->seq		= 0;
179 	whdr->type		= RXRPC_PACKET_TYPE_ACK;
180 	whdr->flags		= call->conn->out_clientflag | RXRPC_SLOW_START_OK;
181 	whdr->userStatus	= 0;
182 	whdr->securityIndex	= call->security_ix;
183 	whdr->_rsvd		= 0;
184 	whdr->serviceId		= htons(call->dest_srx.srx_service);
185 
186 	ack->bufferSpace	= 0;
187 	ack->maxSkew		= 0;
188 	ack->firstPacket	= htonl(window);
189 	ack->previousPacket	= htonl(call->rx_highest_seq);
190 	ack->serial		= htonl(serial_to_ack);
191 	ack->reason		= ack_reason;
192 	ack->nAcks		= wtop - window;
193 	filler[0]		= 0;
194 	filler[1]		= 0;
195 	filler[2]		= 0;
196 
197 	if (ack_reason == RXRPC_ACK_PING)
198 		whdr->flags |= RXRPC_REQUEST_ACK;
199 
200 	if (after(wtop, window)) {
201 		kv[1].iov_len = ack->nAcks;
202 
203 		wrap = RXRPC_SACK_SIZE - sack;
204 		to = umin(ack->nAcks, RXRPC_SACK_SIZE);
205 
206 		if (sack + ack->nAcks <= RXRPC_SACK_SIZE) {
207 			memcpy(sackp, call->ackr_sack_table + sack, ack->nAcks);
208 		} else {
209 			memcpy(sackp, call->ackr_sack_table + sack, wrap);
210 			memcpy(sackp + wrap, call->ackr_sack_table, to - wrap);
211 		}
212 	} else if (before(wtop, window)) {
213 		pr_warn("ack window backward %x %x", window, wtop);
214 	} else if (ack->reason == RXRPC_ACK_DELAY) {
215 		ack->reason = RXRPC_ACK_IDLE;
216 	}
217 
218 	qsize = (window - 1) - call->rx_consumed;
219 	rsize = max_t(int, call->rx_winsize - qsize, 0);
220 
221 	if_mtu = call->peer->if_mtu - call->peer->hdrsize;
222 	if (call->peer->ackr_adv_pmtud) {
223 		max_mtu = umax(call->peer->max_data, rxrpc_rx_mtu);
224 	} else {
225 		if_mtu = umin(if_mtu, 1444);
226 		max_mtu = if_mtu;
227 	}
228 
229 	trailer->maxMTU		= htonl(max_mtu);
230 	trailer->ifMTU		= htonl(if_mtu);
231 	trailer->rwind		= htonl(rsize);
232 	trailer->jumbo_max	= 0; /* Advertise pmtu discovery */
233 
234 	if (ack_reason == RXRPC_ACK_PING)
235 		rxrpc_begin_rtt_probe(call, *_ack_serial, now, rxrpc_rtt_tx_ping);
236 	if (whdr->flags & RXRPC_REQUEST_ACK)
237 		call->rtt_last_req = now;
238 	rxrpc_set_keepalive(call, now);
239 	return nr_kv;
240 }
241 
242 /*
243  * Transmit an ACK packet.
244  */
245 static void rxrpc_send_ack_packet(struct rxrpc_call *call, int nr_kv, size_t len,
246 				  rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why)
247 {
248 	struct kvec *kv = call->local->kvec;
249 	struct rxrpc_wire_header *whdr = kv[0].iov_base;
250 	struct rxrpc_acktrailer *trailer = kv[2].iov_base + 3;
251 	struct rxrpc_connection *conn;
252 	struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1);
253 	struct msghdr msg;
254 	int ret;
255 
256 	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
257 		return;
258 
259 	conn = call->conn;
260 
261 	msg.msg_name	= &call->peer->srx.transport;
262 	msg.msg_namelen	= call->peer->srx.transport_len;
263 	msg.msg_control	= NULL;
264 	msg.msg_controllen = 0;
265 	msg.msg_flags	= MSG_SPLICE_PAGES;
266 
267 	trace_rxrpc_tx_ack(call->debug_id, serial,
268 			   ntohl(ack->firstPacket),
269 			   ntohl(ack->serial), ack->reason, ack->nAcks,
270 			   ntohl(trailer->rwind), why);
271 
272 	rxrpc_inc_stat(call->rxnet, stat_tx_ack_send);
273 
274 	iov_iter_kvec(&msg.msg_iter, WRITE, kv, nr_kv, len);
275 	rxrpc_local_dont_fragment(conn->local, why == rxrpc_propose_ack_ping_for_mtu_probe);
276 
277 	ret = do_udp_sendmsg(conn->local->socket, &msg, len);
278 	call->peer->last_tx_at = ktime_get_seconds();
279 	if (ret < 0) {
280 		trace_rxrpc_tx_fail(call->debug_id, serial, ret,
281 				    rxrpc_tx_point_call_ack);
282 		if (why == rxrpc_propose_ack_ping_for_mtu_probe &&
283 		    ret == -EMSGSIZE)
284 			rxrpc_input_probe_for_pmtud(conn, serial, true);
285 	} else {
286 		trace_rxrpc_tx_packet(call->debug_id, whdr,
287 				      rxrpc_tx_point_call_ack);
288 		if (why == rxrpc_propose_ack_ping_for_mtu_probe) {
289 			call->peer->pmtud_pending = false;
290 			call->peer->pmtud_probing = true;
291 			call->conn->pmtud_probe = serial;
292 			call->conn->pmtud_call = call->debug_id;
293 			trace_rxrpc_pmtud_tx(call);
294 		}
295 	}
296 	rxrpc_tx_backoff(call, ret);
297 }
298 
299 /*
300  * Queue an ACK for immediate transmission.
301  */
302 void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
303 		    rxrpc_serial_t serial_to_ack, enum rxrpc_propose_ack_trace why)
304 {
305 	struct kvec *kv = call->local->kvec;
306 	rxrpc_serial_t ack_serial;
307 	size_t len;
308 	int nr_kv;
309 
310 	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
311 		return;
312 
313 	rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);
314 
315 	nr_kv = rxrpc_alloc_ack(call, call->ackr_wtop - call->ackr_window);
316 	if (nr_kv < 0) {
317 		kleave(" = -ENOMEM");
318 		return;
319 	}
320 
321 	nr_kv = rxrpc_fill_out_ack(call, nr_kv, ack_reason, serial_to_ack, &ack_serial);
322 	len  = kv[0].iov_len;
323 	len += kv[1].iov_len;
324 	len += kv[2].iov_len;
325 
326 	/* Extend a path MTU probe ACK. */
327 	if (why == rxrpc_propose_ack_ping_for_mtu_probe) {
328 		size_t probe_mtu = call->peer->pmtud_trial + sizeof(struct rxrpc_wire_header);
329 
330 		if (len > probe_mtu)
331 			goto skip;
332 		while (len < probe_mtu) {
333 			size_t part = umin(probe_mtu - len, PAGE_SIZE);
334 
335 			kv[nr_kv].iov_base = page_address(ZERO_PAGE(0));
336 			kv[nr_kv].iov_len = part;
337 			len += part;
338 			nr_kv++;
339 		}
340 	}
341 
342 	call->ackr_nr_unacked = 0;
343 	atomic_set(&call->ackr_nr_consumed, 0);
344 	clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
345 
346 	trace_rxrpc_send_ack(call, why, ack_reason, ack_serial);
347 	rxrpc_send_ack_packet(call, nr_kv, len, ack_serial, why);
348 skip:
349 	rxrpc_free_ack(call);
350 }
351 
352 /*
353  * Send an ACK probe for path MTU discovery.
354  */
355 void rxrpc_send_probe_for_pmtud(struct rxrpc_call *call)
356 {
357 	rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
358 		       rxrpc_propose_ack_ping_for_mtu_probe);
359 }
360 
361 /*
362  * Send an ABORT call packet.
363  */
364 int rxrpc_send_abort_packet(struct rxrpc_call *call)
365 {
366 	struct rxrpc_connection *conn;
367 	struct rxrpc_abort_buffer pkt;
368 	struct msghdr msg;
369 	struct kvec iov[1];
370 	rxrpc_serial_t serial;
371 	int ret;
372 
373 	/* Don't bother sending aborts for a client call once the server has
374 	 * hard-ACK'd all of its request data.  After that point, we're not
375 	 * going to stop the operation proceeding, and whilst we might limit
376 	 * the reply, it's not worth it if we can send a new call on the same
377 	 * channel instead, thereby closing off this call.
378 	 */
379 	if (rxrpc_is_client_call(call) &&
380 	    test_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags))
381 		return 0;
382 
383 	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
384 		return -ECONNRESET;
385 
386 	conn = call->conn;
387 
388 	msg.msg_name	= &call->peer->srx.transport;
389 	msg.msg_namelen	= call->peer->srx.transport_len;
390 	msg.msg_control	= NULL;
391 	msg.msg_controllen = 0;
392 	msg.msg_flags	= 0;
393 
394 	pkt.whdr.epoch		= htonl(conn->proto.epoch);
395 	pkt.whdr.cid		= htonl(call->cid);
396 	pkt.whdr.callNumber	= htonl(call->call_id);
397 	pkt.whdr.seq		= 0;
398 	pkt.whdr.type		= RXRPC_PACKET_TYPE_ABORT;
399 	pkt.whdr.flags		= conn->out_clientflag;
400 	pkt.whdr.userStatus	= 0;
401 	pkt.whdr.securityIndex	= call->security_ix;
402 	pkt.whdr._rsvd		= 0;
403 	pkt.whdr.serviceId	= htons(call->dest_srx.srx_service);
404 	pkt.abort_code		= htonl(call->abort_code);
405 
406 	iov[0].iov_base	= &pkt;
407 	iov[0].iov_len	= sizeof(pkt);
408 
409 	serial = rxrpc_get_next_serial(conn);
410 	pkt.whdr.serial = htonl(serial);
411 
412 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, sizeof(pkt));
413 	ret = do_udp_sendmsg(conn->local->socket, &msg, sizeof(pkt));
414 	conn->peer->last_tx_at = ktime_get_seconds();
415 	if (ret < 0)
416 		trace_rxrpc_tx_fail(call->debug_id, serial, ret,
417 				    rxrpc_tx_point_call_abort);
418 	else
419 		trace_rxrpc_tx_packet(call->debug_id, &pkt.whdr,
420 				      rxrpc_tx_point_call_abort);
421 	rxrpc_tx_backoff(call, ret);
422 	return ret;
423 }
424 
425 /*
426  * Prepare a (sub)packet for transmission.
427  */
428 static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call,
429 					   struct rxrpc_send_data_req *req,
430 					   struct rxrpc_txbuf *txb,
431 					   rxrpc_serial_t serial, int subpkt)
432 {
433 	struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
434 	struct rxrpc_jumbo_header *jumbo = (void *)(whdr + 1) - sizeof(*jumbo);
435 	enum rxrpc_req_ack_trace why;
436 	struct rxrpc_connection *conn = call->conn;
437 	struct kvec *kv = &call->local->kvec[subpkt];
438 	size_t len = txb->pkt_len;
439 	bool last;
440 	u8 flags;
441 
442 	_enter("%x,%zd", txb->seq, len);
443 
444 	txb->serial = serial;
445 
446 	if (test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags) &&
447 	    txb->seq == 1)
448 		whdr->userStatus = RXRPC_USERSTATUS_SERVICE_UPGRADE;
449 
450 	txb->flags &= ~RXRPC_REQUEST_ACK;
451 	flags = txb->flags & RXRPC_TXBUF_WIRE_FLAGS;
452 	last = txb->flags & RXRPC_LAST_PACKET;
453 
454 	if (subpkt < req->n - 1) {
455 		len = RXRPC_JUMBO_DATALEN;
456 		goto dont_set_request_ack;
457 	}
458 
459 	/* If our RTT cache needs working on, request an ACK.  Also request
460 	 * ACKs if a DATA packet appears to have been lost.
461 	 *
462 	 * However, we mustn't request an ACK on the last reply packet of a
463 	 * service call, lest OpenAFS incorrectly send us an ACK with some
464 	 * soft-ACKs in it and then never follow up with a proper hard ACK.
465 	 */
466 	if (last && rxrpc_sending_to_client(txb))
467 		why = rxrpc_reqack_no_srv_last;
468 	else if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events))
469 		why = rxrpc_reqack_ack_lost;
470 	else if (txb->flags & RXRPC_TXBUF_RESENT)
471 		why = rxrpc_reqack_retrans;
472 	else if (call->cong_ca_state == RXRPC_CA_SLOW_START && call->cong_cwnd <= RXRPC_MIN_CWND)
473 		why = rxrpc_reqack_slow_start;
474 	else if (call->tx_winsize <= 2)
475 		why = rxrpc_reqack_small_txwin;
476 	else if (call->rtt_count < 3)
477 		why = rxrpc_reqack_more_rtt;
478 	else if (ktime_before(ktime_add_ms(call->rtt_last_req, 1000), ktime_get_real()))
479 		why = rxrpc_reqack_old_rtt;
480 	else if (!last && !after(READ_ONCE(call->send_top), txb->seq))
481 		why = rxrpc_reqack_app_stall;
482 	else
483 		goto dont_set_request_ack;
484 
485 	rxrpc_inc_stat(call->rxnet, stat_why_req_ack[why]);
486 	trace_rxrpc_req_ack(call->debug_id, txb->seq, why);
487 	if (why != rxrpc_reqack_no_srv_last) {
488 		flags |= RXRPC_REQUEST_ACK;
489 		trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_data, -1, serial);
490 		call->rtt_last_req = req->now;
491 	}
492 dont_set_request_ack:
493 
494 	/* The jumbo header overlays the wire header in the txbuf. */
495 	if (subpkt < req->n - 1)
496 		flags |= RXRPC_JUMBO_PACKET;
497 	else
498 		flags &= ~RXRPC_JUMBO_PACKET;
499 	if (subpkt == 0) {
500 		whdr->flags	= flags;
501 		whdr->serial	= htonl(txb->serial);
502 		whdr->cksum	= txb->cksum;
503 		whdr->serviceId	= htons(conn->service_id);
504 		kv->iov_base	= whdr;
505 		len += sizeof(*whdr);
506 	} else {
507 		jumbo->flags	= flags;
508 		jumbo->pad	= 0;
509 		jumbo->cksum	= txb->cksum;
510 		kv->iov_base	= jumbo;
511 		len += sizeof(*jumbo);
512 	}
513 
514 	trace_rxrpc_tx_data(call, txb->seq, txb->serial, flags, req->trace);
515 	kv->iov_len = len;
516 	return len;
517 }
518 
519 /*
520  * Prepare a transmission queue object for initial transmission.  Returns the
521  * number of microseconds since the transmission queue base timestamp.
522  */
523 static unsigned int rxrpc_prepare_txqueue(struct rxrpc_txqueue *tq,
524 					  struct rxrpc_send_data_req *req)
525 {
526 	if (!tq)
527 		return 0;
528 	if (tq->xmit_ts_base == KTIME_MIN) {
529 		tq->xmit_ts_base = req->now;
530 		return 0;
531 	}
532 	return ktime_to_us(ktime_sub(req->now, tq->xmit_ts_base));
533 }
534 
535 /*
536  * Prepare a (jumbo) packet for transmission.
537  */
538 static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
539 {
540 	struct rxrpc_txqueue *tq = req->tq;
541 	rxrpc_serial_t serial;
542 	unsigned int xmit_ts;
543 	rxrpc_seq_t seq = req->seq;
544 	size_t len = 0;
545 	bool start_tlp = false;
546 
547 	trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit);
548 
549 	/* Each transmission of a Tx packet needs a new serial number */
550 	serial = rxrpc_get_next_serials(call->conn, req->n);
551 
552 	call->tx_last_serial = serial + req->n - 1;
553 	call->tx_last_sent = req->now;
554 	xmit_ts = rxrpc_prepare_txqueue(tq, req);
555 	prefetch(tq->next);
556 
557 	for (int i = 0;;) {
558 		int ix = seq & RXRPC_TXQ_MASK;
559 		struct rxrpc_txbuf *txb = tq->bufs[seq & RXRPC_TXQ_MASK];
560 
561 		_debug("prep[%u] tq=%x q=%x", i, tq->qbase, seq);
562 
563 		/* Record (re-)transmission for RACK [RFC8985 6.1]. */
564 		if (__test_and_clear_bit(ix, &tq->segment_lost))
565 			call->tx_nr_lost--;
566 		if (req->retrans) {
567 			__set_bit(ix, &tq->ever_retransmitted);
568 			__set_bit(ix, &tq->segment_retransmitted);
569 			call->tx_nr_resent++;
570 		} else {
571 			call->tx_nr_sent++;
572 			start_tlp = true;
573 		}
574 		tq->segment_xmit_ts[ix] = xmit_ts;
575 		tq->segment_serial[ix] = serial;
576 		if (i + 1 == req->n)
577 			/* Only sample the last subpacket in a jumbo. */
578 			__set_bit(ix, &tq->rtt_samples);
579 		len += rxrpc_prepare_data_subpacket(call, req, txb, serial, i);
580 		serial++;
581 		seq++;
582 		i++;
583 		if (i >= req->n)
584 			break;
585 		if (!(seq & RXRPC_TXQ_MASK)) {
586 			tq = tq->next;
587 			trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit_advance);
588 			xmit_ts = rxrpc_prepare_txqueue(tq, req);
589 		}
590 	}
591 
592 	/* Set timeouts */
593 	if (req->tlp_probe) {
594 		/* Sending TLP loss probe [RFC8985 7.3]. */
595 		call->tlp_serial = serial - 1;
596 		call->tlp_seq = seq - 1;
597 	} else if (start_tlp) {
598 		/* Schedule TLP loss probe [RFC8985 7.2]. */
599 		ktime_t pto;
600 
601 		if (!test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags))
602 			 /* The first packet may take longer to elicit a response. */
603 			pto = NSEC_PER_SEC;
604 		else
605 			pto = rxrpc_tlp_calc_pto(call, req->now);
606 
607 		call->rack_timer_mode = RXRPC_CALL_RACKTIMER_TLP_PTO;
608 		call->rack_timo_at = ktime_add(req->now, pto);
609 		trace_rxrpc_rack_timer(call, pto, false);
610 		trace_rxrpc_timer_set(call, pto, rxrpc_timer_trace_rack_tlp_pto);
611 	}
612 
613 	if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags)) {
614 		ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo));
615 
616 		call->expect_rx_by = ktime_add(req->now, delay);
617 		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx);
618 	}
619 
620 	rxrpc_set_keepalive(call, req->now);
621 	return len;
622 }
623 
624 /*
625  * Send one or more packets through the transport endpoint
626  */
627 void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
628 {
629 	struct rxrpc_connection *conn = call->conn;
630 	enum rxrpc_tx_point frag;
631 	struct rxrpc_txqueue *tq = req->tq;
632 	struct rxrpc_txbuf *txb;
633 	struct msghdr msg;
634 	rxrpc_seq_t seq = req->seq;
635 	size_t len;
636 	bool new_call = test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags);
637 	int ret, stat_ix;
638 
639 	_enter("%x,%x-%x", tq->qbase, seq, seq + req->n - 1);
640 
641 	stat_ix = umin(req->n, ARRAY_SIZE(call->rxnet->stat_tx_jumbo)) - 1;
642 	atomic_inc(&call->rxnet->stat_tx_jumbo[stat_ix]);
643 
644 	len = rxrpc_prepare_data_packet(call, req);
645 	txb = tq->bufs[seq & RXRPC_TXQ_MASK];
646 
647 	iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, req->n, len);
648 
649 	msg.msg_name	= &call->peer->srx.transport;
650 	msg.msg_namelen	= call->peer->srx.transport_len;
651 	msg.msg_control	= NULL;
652 	msg.msg_controllen = 0;
653 	msg.msg_flags	= MSG_SPLICE_PAGES;
654 
655 	/* Send the packet with the don't fragment bit set unless we think it's
656 	 * too big or if this is a retransmission.
657 	 */
658 	if (seq == call->tx_transmitted + 1 &&
659 	    len >= sizeof(struct rxrpc_wire_header) + call->peer->max_data) {
660 		rxrpc_local_dont_fragment(conn->local, false);
661 		frag = rxrpc_tx_point_call_data_frag;
662 	} else {
663 		rxrpc_local_dont_fragment(conn->local, true);
664 		frag = rxrpc_tx_point_call_data_nofrag;
665 	}
666 
667 	/* Track what we've attempted to transmit at least once so that the
668 	 * retransmission algorithm doesn't try to resend what we haven't sent
669 	 * yet.
670 	 */
671 	if (seq == call->tx_transmitted + 1)
672 		call->tx_transmitted = seq + req->n - 1;
673 
674 	if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
675 		static int lose;
676 
677 		if ((lose++ & 7) == 7) {
678 			ret = 0;
679 			trace_rxrpc_tx_data(call, txb->seq, txb->serial, txb->flags,
680 					    rxrpc_txdata_inject_loss);
681 			conn->peer->last_tx_at = ktime_get_seconds();
682 			goto done;
683 		}
684 	}
685 
686 	/* send the packet by UDP
687 	 * - returns -EMSGSIZE if UDP would have to fragment the packet
688 	 *   to go out of the interface
689 	 *   - in which case, we'll have processed the ICMP error
690 	 *     message and update the peer record
691 	 */
692 	rxrpc_inc_stat(call->rxnet, stat_tx_data_send);
693 	ret = do_udp_sendmsg(conn->local->socket, &msg, len);
694 	conn->peer->last_tx_at = ktime_get_seconds();
695 
696 	if (ret == -EMSGSIZE) {
697 		rxrpc_inc_stat(call->rxnet, stat_tx_data_send_msgsize);
698 		trace_rxrpc_tx_packet(call->debug_id, call->local->kvec[0].iov_base, frag);
699 		ret = 0;
700 	} else if (ret < 0) {
701 		rxrpc_inc_stat(call->rxnet, stat_tx_data_send_fail);
702 		trace_rxrpc_tx_fail(call->debug_id, txb->serial, ret, frag);
703 	} else {
704 		trace_rxrpc_tx_packet(call->debug_id, call->local->kvec[0].iov_base, frag);
705 	}
706 
707 	rxrpc_tx_backoff(call, ret);
708 
709 	if (ret < 0) {
710 		/* Cancel the call if the initial transmission fails or if we
711 		 * hit due to network routing issues that aren't going away
712 		 * anytime soon.  The layer above can arrange the
713 		 * retransmission.
714 		 */
715 		if (new_call ||
716 		    ret == -ENETUNREACH ||
717 		    ret == -EHOSTUNREACH ||
718 		    ret == -ECONNREFUSED)
719 			rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
720 						  RX_USER_ABORT, ret);
721 	}
722 
723 done:
724 	_leave(" = %d [%u]", ret, call->peer->max_data);
725 }
726 
727 /*
728  * Transmit a connection-level abort.
729  */
730 void rxrpc_send_conn_abort(struct rxrpc_connection *conn)
731 {
732 	struct rxrpc_wire_header whdr;
733 	struct msghdr msg;
734 	struct kvec iov[2];
735 	__be32 word;
736 	size_t len;
737 	u32 serial;
738 	int ret;
739 
740 	msg.msg_name	= &conn->peer->srx.transport;
741 	msg.msg_namelen	= conn->peer->srx.transport_len;
742 	msg.msg_control	= NULL;
743 	msg.msg_controllen = 0;
744 	msg.msg_flags	= 0;
745 
746 	whdr.epoch	= htonl(conn->proto.epoch);
747 	whdr.cid	= htonl(conn->proto.cid);
748 	whdr.callNumber	= 0;
749 	whdr.seq	= 0;
750 	whdr.type	= RXRPC_PACKET_TYPE_ABORT;
751 	whdr.flags	= conn->out_clientflag;
752 	whdr.userStatus	= 0;
753 	whdr.securityIndex = conn->security_ix;
754 	whdr._rsvd	= 0;
755 	whdr.serviceId	= htons(conn->service_id);
756 
757 	word		= htonl(conn->abort_code);
758 
759 	iov[0].iov_base	= &whdr;
760 	iov[0].iov_len	= sizeof(whdr);
761 	iov[1].iov_base	= &word;
762 	iov[1].iov_len	= sizeof(word);
763 
764 	len = iov[0].iov_len + iov[1].iov_len;
765 
766 	serial = rxrpc_get_next_serial(conn);
767 	whdr.serial = htonl(serial);
768 
769 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
770 	ret = do_udp_sendmsg(conn->local->socket, &msg, len);
771 	if (ret < 0) {
772 		trace_rxrpc_tx_fail(conn->debug_id, serial, ret,
773 				    rxrpc_tx_point_conn_abort);
774 		_debug("sendmsg failed: %d", ret);
775 		return;
776 	}
777 
778 	trace_rxrpc_tx_packet(conn->debug_id, &whdr, rxrpc_tx_point_conn_abort);
779 
780 	conn->peer->last_tx_at = ktime_get_seconds();
781 }
782 
783 /*
784  * Reject a packet through the local endpoint.
785  */
786 void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
787 {
788 	struct rxrpc_wire_header whdr;
789 	struct sockaddr_rxrpc srx;
790 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
791 	struct msghdr msg;
792 	struct kvec iov[2];
793 	size_t size;
794 	__be32 code;
795 	int ret, ioc;
796 
797 	rxrpc_see_skb(skb, rxrpc_skb_see_reject);
798 
799 	iov[0].iov_base = &whdr;
800 	iov[0].iov_len = sizeof(whdr);
801 	iov[1].iov_base = &code;
802 	iov[1].iov_len = sizeof(code);
803 
804 	msg.msg_name = &srx.transport;
805 	msg.msg_control = NULL;
806 	msg.msg_controllen = 0;
807 	msg.msg_flags = 0;
808 
809 	memset(&whdr, 0, sizeof(whdr));
810 
811 	switch (skb->mark) {
812 	case RXRPC_SKB_MARK_REJECT_BUSY:
813 		whdr.type = RXRPC_PACKET_TYPE_BUSY;
814 		size = sizeof(whdr);
815 		ioc = 1;
816 		break;
817 	case RXRPC_SKB_MARK_REJECT_ABORT:
818 		whdr.type = RXRPC_PACKET_TYPE_ABORT;
819 		code = htonl(skb->priority);
820 		size = sizeof(whdr) + sizeof(code);
821 		ioc = 2;
822 		break;
823 	default:
824 		return;
825 	}
826 
827 	if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) {
828 		msg.msg_namelen = srx.transport_len;
829 
830 		whdr.epoch	= htonl(sp->hdr.epoch);
831 		whdr.cid	= htonl(sp->hdr.cid);
832 		whdr.callNumber	= htonl(sp->hdr.callNumber);
833 		whdr.serviceId	= htons(sp->hdr.serviceId);
834 		whdr.flags	= sp->hdr.flags;
835 		whdr.flags	^= RXRPC_CLIENT_INITIATED;
836 		whdr.flags	&= RXRPC_CLIENT_INITIATED;
837 
838 		iov_iter_kvec(&msg.msg_iter, WRITE, iov, ioc, size);
839 		ret = do_udp_sendmsg(local->socket, &msg, size);
840 		if (ret < 0)
841 			trace_rxrpc_tx_fail(local->debug_id, 0, ret,
842 					    rxrpc_tx_point_reject);
843 		else
844 			trace_rxrpc_tx_packet(local->debug_id, &whdr,
845 					      rxrpc_tx_point_reject);
846 	}
847 }
848 
849 /*
850  * Send a VERSION reply to a peer as a keepalive.
851  */
852 void rxrpc_send_keepalive(struct rxrpc_peer *peer)
853 {
854 	struct rxrpc_wire_header whdr;
855 	struct msghdr msg;
856 	struct kvec iov[2];
857 	size_t len;
858 	int ret;
859 
860 	_enter("");
861 
862 	msg.msg_name	= &peer->srx.transport;
863 	msg.msg_namelen	= peer->srx.transport_len;
864 	msg.msg_control	= NULL;
865 	msg.msg_controllen = 0;
866 	msg.msg_flags	= 0;
867 
868 	whdr.epoch	= htonl(peer->local->rxnet->epoch);
869 	whdr.cid	= 0;
870 	whdr.callNumber	= 0;
871 	whdr.seq	= 0;
872 	whdr.serial	= 0;
873 	whdr.type	= RXRPC_PACKET_TYPE_VERSION; /* Not client-initiated */
874 	whdr.flags	= RXRPC_LAST_PACKET;
875 	whdr.userStatus	= 0;
876 	whdr.securityIndex = 0;
877 	whdr._rsvd	= 0;
878 	whdr.serviceId	= 0;
879 
880 	iov[0].iov_base	= &whdr;
881 	iov[0].iov_len	= sizeof(whdr);
882 	iov[1].iov_base	= (char *)rxrpc_keepalive_string;
883 	iov[1].iov_len	= sizeof(rxrpc_keepalive_string);
884 
885 	len = iov[0].iov_len + iov[1].iov_len;
886 
887 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
888 	ret = do_udp_sendmsg(peer->local->socket, &msg, len);
889 	if (ret < 0)
890 		trace_rxrpc_tx_fail(peer->debug_id, 0, ret,
891 				    rxrpc_tx_point_version_keepalive);
892 	else
893 		trace_rxrpc_tx_packet(peer->debug_id, &whdr,
894 				      rxrpc_tx_point_version_keepalive);
895 
896 	peer->last_tx_at = ktime_get_seconds();
897 	_leave("");
898 }
899