xref: /linux/net/rxrpc/output.c (revision 348f968b89bfeec0bb53dd82dba58b94d97fbd34)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC packet transmission
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/gfp.h>
12 #include <linux/skbuff.h>
13 #include <linux/export.h>
14 #include <net/sock.h>
15 #include <net/af_rxrpc.h>
16 #include <net/udp.h>
17 #include "ar-internal.h"
18 
19 extern int udpv6_sendmsg(struct sock *sk, struct msghdr *msg, size_t len);
20 
do_udp_sendmsg(struct socket * socket,struct msghdr * msg,size_t len)21 static ssize_t do_udp_sendmsg(struct socket *socket, struct msghdr *msg, size_t len)
22 {
23 	struct sockaddr *sa = msg->msg_name;
24 	struct sock *sk = socket->sk;
25 
26 	if (IS_ENABLED(CONFIG_AF_RXRPC_IPV6)) {
27 		if (sa->sa_family == AF_INET6) {
28 			if (sk->sk_family != AF_INET6) {
29 				pr_warn("AF_INET6 address on AF_INET socket\n");
30 				return -ENOPROTOOPT;
31 			}
32 			return udpv6_sendmsg(sk, msg, len);
33 		}
34 	}
35 	return udp_sendmsg(sk, msg, len);
36 }
37 
38 struct rxrpc_abort_buffer {
39 	struct rxrpc_wire_header whdr;
40 	__be32 abort_code;
41 };
42 
43 static const char rxrpc_keepalive_string[] = "";
44 
45 /*
46  * Increase Tx backoff on transmission failure and clear it on success.
47  */
rxrpc_tx_backoff(struct rxrpc_call * call,int ret)48 static void rxrpc_tx_backoff(struct rxrpc_call *call, int ret)
49 {
50 	if (ret < 0) {
51 		if (call->tx_backoff < 1000)
52 			call->tx_backoff += 100;
53 	} else {
54 		call->tx_backoff = 0;
55 	}
56 }
57 
58 /*
59  * Arrange for a keepalive ping a certain time after we last transmitted.  This
60  * lets the far side know we're still interested in this call and helps keep
61  * the route through any intervening firewall open.
62  *
63  * Receiving a response to the ping will prevent the ->expect_rx_by timer from
64  * expiring.
65  */
rxrpc_set_keepalive(struct rxrpc_call * call,ktime_t now)66 static void rxrpc_set_keepalive(struct rxrpc_call *call, ktime_t now)
67 {
68 	ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo) / 6);
69 
70 	call->keepalive_at = ktime_add(ktime_get_real(), delay);
71 	trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_keepalive);
72 }
73 
74 /*
75  * Allocate transmission buffers for an ACK and attach them to local->kv[].
76  */
rxrpc_alloc_ack(struct rxrpc_call * call,size_t sack_size)77 static int rxrpc_alloc_ack(struct rxrpc_call *call, size_t sack_size)
78 {
79 	struct rxrpc_wire_header *whdr;
80 	struct rxrpc_acktrailer *trailer;
81 	struct rxrpc_ackpacket *ack;
82 	struct kvec *kv = call->local->kvec;
83 	gfp_t gfp = rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS;
84 	void *buf, *buf2 = NULL;
85 	u8 *filler;
86 
87 	buf = page_frag_alloc(&call->local->tx_alloc,
88 			      sizeof(*whdr) + sizeof(*ack) + 1 + 3 + sizeof(*trailer), gfp);
89 	if (!buf)
90 		return -ENOMEM;
91 
92 	if (sack_size) {
93 		buf2 = page_frag_alloc(&call->local->tx_alloc, sack_size, gfp);
94 		if (!buf2) {
95 			page_frag_free(buf);
96 			return -ENOMEM;
97 		}
98 	}
99 
100 	whdr	= buf;
101 	ack	= buf + sizeof(*whdr);
102 	filler	= buf + sizeof(*whdr) + sizeof(*ack) + 1;
103 	trailer	= buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3;
104 
105 	kv[0].iov_base	= whdr;
106 	kv[0].iov_len	= sizeof(*whdr) + sizeof(*ack);
107 	kv[1].iov_base	= buf2;
108 	kv[1].iov_len	= sack_size;
109 	kv[2].iov_base	= filler;
110 	kv[2].iov_len	= 3 + sizeof(*trailer);
111 	return 3; /* Number of kvec[] used. */
112 }
113 
rxrpc_free_ack(struct rxrpc_call * call)114 static void rxrpc_free_ack(struct rxrpc_call *call)
115 {
116 	page_frag_free(call->local->kvec[0].iov_base);
117 	if (call->local->kvec[1].iov_base)
118 		page_frag_free(call->local->kvec[1].iov_base);
119 }
120 
121 /*
122  * Record the beginning of an RTT probe.
123  */
rxrpc_begin_rtt_probe(struct rxrpc_call * call,rxrpc_serial_t serial,ktime_t now,enum rxrpc_rtt_tx_trace why)124 static void rxrpc_begin_rtt_probe(struct rxrpc_call *call, rxrpc_serial_t serial,
125 				  ktime_t now, enum rxrpc_rtt_tx_trace why)
126 {
127 	unsigned long avail = call->rtt_avail;
128 	int rtt_slot = 9;
129 
130 	if (!(avail & RXRPC_CALL_RTT_AVAIL_MASK))
131 		goto no_slot;
132 
133 	rtt_slot = __ffs(avail & RXRPC_CALL_RTT_AVAIL_MASK);
134 	if (!test_and_clear_bit(rtt_slot, &call->rtt_avail))
135 		goto no_slot;
136 
137 	call->rtt_serial[rtt_slot] = serial;
138 	call->rtt_sent_at[rtt_slot] = now;
139 	smp_wmb(); /* Write data before avail bit */
140 	set_bit(rtt_slot + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
141 
142 	trace_rxrpc_rtt_tx(call, why, rtt_slot, serial);
143 	return;
144 
145 no_slot:
146 	trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_no_slot, rtt_slot, serial);
147 }
148 
149 /*
150  * Fill out an ACK packet.
151  */
rxrpc_fill_out_ack(struct rxrpc_call * call,int nr_kv,u8 ack_reason,rxrpc_serial_t serial_to_ack,rxrpc_serial_t * _ack_serial)152 static int rxrpc_fill_out_ack(struct rxrpc_call *call, int nr_kv, u8 ack_reason,
153 			      rxrpc_serial_t serial_to_ack, rxrpc_serial_t *_ack_serial)
154 {
155 	struct kvec *kv = call->local->kvec;
156 	struct rxrpc_wire_header *whdr = kv[0].iov_base;
157 	struct rxrpc_acktrailer *trailer = kv[2].iov_base + 3;
158 	struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1);
159 	unsigned int qsize, sack, wrap, to, max_mtu, if_mtu;
160 	rxrpc_seq_t window, wtop;
161 	ktime_t now = ktime_get_real();
162 	int rsize;
163 	u8 *filler = kv[2].iov_base;
164 	u8 *sackp = kv[1].iov_base;
165 
166 	rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
167 
168 	window = call->ackr_window;
169 	wtop   = call->ackr_wtop;
170 	sack   = call->ackr_sack_base % RXRPC_SACK_SIZE;
171 
172 	*_ack_serial = rxrpc_get_next_serial(call->conn);
173 
174 	whdr->epoch		= htonl(call->conn->proto.epoch);
175 	whdr->cid		= htonl(call->cid);
176 	whdr->callNumber	= htonl(call->call_id);
177 	whdr->serial		= htonl(*_ack_serial);
178 	whdr->seq		= 0;
179 	whdr->type		= RXRPC_PACKET_TYPE_ACK;
180 	whdr->flags		= call->conn->out_clientflag | RXRPC_SLOW_START_OK;
181 	whdr->userStatus	= 0;
182 	whdr->securityIndex	= call->security_ix;
183 	whdr->_rsvd		= 0;
184 	whdr->serviceId		= htons(call->dest_srx.srx_service);
185 
186 	ack->bufferSpace	= 0;
187 	ack->maxSkew		= 0;
188 	ack->firstPacket	= htonl(window);
189 	ack->previousPacket	= htonl(call->rx_highest_seq);
190 	ack->serial		= htonl(serial_to_ack);
191 	ack->reason		= ack_reason;
192 	ack->nAcks		= wtop - window;
193 	filler[0]		= 0;
194 	filler[1]		= 0;
195 	filler[2]		= 0;
196 
197 	if (ack_reason == RXRPC_ACK_PING)
198 		whdr->flags |= RXRPC_REQUEST_ACK;
199 
200 	if (after(wtop, window)) {
201 		kv[1].iov_len = ack->nAcks;
202 
203 		wrap = RXRPC_SACK_SIZE - sack;
204 		to = umin(ack->nAcks, RXRPC_SACK_SIZE);
205 
206 		if (sack + ack->nAcks <= RXRPC_SACK_SIZE) {
207 			memcpy(sackp, call->ackr_sack_table + sack, ack->nAcks);
208 		} else {
209 			memcpy(sackp, call->ackr_sack_table + sack, wrap);
210 			memcpy(sackp + wrap, call->ackr_sack_table, to - wrap);
211 		}
212 	} else if (before(wtop, window)) {
213 		pr_warn("ack window backward %x %x", window, wtop);
214 	} else if (ack->reason == RXRPC_ACK_DELAY) {
215 		ack->reason = RXRPC_ACK_IDLE;
216 	}
217 
218 	qsize = (window - 1) - call->rx_consumed;
219 	rsize = max_t(int, call->rx_winsize - qsize, 0);
220 
221 	if_mtu = call->peer->if_mtu - call->peer->hdrsize;
222 	if (call->peer->ackr_adv_pmtud) {
223 		max_mtu = umax(call->peer->max_data, rxrpc_rx_mtu);
224 	} else {
225 		if_mtu = umin(if_mtu, 1444);
226 		max_mtu = if_mtu;
227 	}
228 
229 	trailer->maxMTU		= htonl(max_mtu);
230 	trailer->ifMTU		= htonl(if_mtu);
231 	trailer->rwind		= htonl(rsize);
232 	trailer->jumbo_max	= 0; /* Advertise pmtu discovery */
233 
234 	if (ack_reason == RXRPC_ACK_PING)
235 		rxrpc_begin_rtt_probe(call, *_ack_serial, now, rxrpc_rtt_tx_ping);
236 	if (whdr->flags & RXRPC_REQUEST_ACK)
237 		call->rtt_last_req = now;
238 	rxrpc_set_keepalive(call, now);
239 	return nr_kv;
240 }
241 
242 /*
243  * Transmit an ACK packet.
244  */
rxrpc_send_ack_packet(struct rxrpc_call * call,int nr_kv,size_t len,rxrpc_serial_t serial,enum rxrpc_propose_ack_trace why)245 static void rxrpc_send_ack_packet(struct rxrpc_call *call, int nr_kv, size_t len,
246 				  rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why)
247 {
248 	struct kvec *kv = call->local->kvec;
249 	struct rxrpc_wire_header *whdr = kv[0].iov_base;
250 	struct rxrpc_acktrailer *trailer = kv[2].iov_base + 3;
251 	struct rxrpc_connection *conn;
252 	struct rxrpc_ackpacket *ack = (struct rxrpc_ackpacket *)(whdr + 1);
253 	struct msghdr msg;
254 	int ret;
255 
256 	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
257 		return;
258 
259 	conn = call->conn;
260 
261 	msg.msg_name	= &call->peer->srx.transport;
262 	msg.msg_namelen	= call->peer->srx.transport_len;
263 	msg.msg_control	= NULL;
264 	msg.msg_controllen = 0;
265 	msg.msg_flags	= MSG_SPLICE_PAGES;
266 
267 	trace_rxrpc_tx_ack(call->debug_id, serial,
268 			   ntohl(ack->firstPacket),
269 			   ntohl(ack->serial), ack->reason, ack->nAcks,
270 			   ntohl(trailer->rwind), why);
271 
272 	rxrpc_inc_stat(call->rxnet, stat_tx_ack_send);
273 
274 	iov_iter_kvec(&msg.msg_iter, WRITE, kv, nr_kv, len);
275 	rxrpc_local_dont_fragment(conn->local, why == rxrpc_propose_ack_ping_for_mtu_probe);
276 
277 	ret = do_udp_sendmsg(conn->local->socket, &msg, len);
278 	call->peer->last_tx_at = ktime_get_seconds();
279 	if (ret < 0) {
280 		trace_rxrpc_tx_fail(call->debug_id, serial, ret,
281 				    rxrpc_tx_point_call_ack);
282 		if (why == rxrpc_propose_ack_ping_for_mtu_probe &&
283 		    ret == -EMSGSIZE)
284 			rxrpc_input_probe_for_pmtud(conn, serial, true);
285 	} else {
286 		trace_rxrpc_tx_packet(call->debug_id, whdr,
287 				      rxrpc_tx_point_call_ack);
288 		if (why == rxrpc_propose_ack_ping_for_mtu_probe) {
289 			call->peer->pmtud_pending = false;
290 			call->peer->pmtud_probing = true;
291 			call->conn->pmtud_probe = serial;
292 			call->conn->pmtud_call = call->debug_id;
293 			trace_rxrpc_pmtud_tx(call);
294 		}
295 	}
296 	rxrpc_tx_backoff(call, ret);
297 }
298 
299 /*
300  * Queue an ACK for immediate transmission.
301  */
rxrpc_send_ACK(struct rxrpc_call * call,u8 ack_reason,rxrpc_serial_t serial_to_ack,enum rxrpc_propose_ack_trace why)302 void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
303 		    rxrpc_serial_t serial_to_ack, enum rxrpc_propose_ack_trace why)
304 {
305 	struct kvec *kv = call->local->kvec;
306 	rxrpc_serial_t ack_serial;
307 	size_t len;
308 	int nr_kv;
309 
310 	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
311 		return;
312 
313 	rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]);
314 
315 	nr_kv = rxrpc_alloc_ack(call, call->ackr_wtop - call->ackr_window);
316 	if (nr_kv < 0) {
317 		kleave(" = -ENOMEM");
318 		return;
319 	}
320 
321 	nr_kv = rxrpc_fill_out_ack(call, nr_kv, ack_reason, serial_to_ack, &ack_serial);
322 	len  = kv[0].iov_len;
323 	len += kv[1].iov_len;
324 	len += kv[2].iov_len;
325 
326 	/* Extend a path MTU probe ACK. */
327 	if (why == rxrpc_propose_ack_ping_for_mtu_probe) {
328 		size_t probe_mtu = call->peer->pmtud_trial + sizeof(struct rxrpc_wire_header);
329 
330 		if (len > probe_mtu)
331 			goto skip;
332 		while (len < probe_mtu) {
333 			size_t part = umin(probe_mtu - len, PAGE_SIZE);
334 
335 			kv[nr_kv].iov_base = page_address(ZERO_PAGE(0));
336 			kv[nr_kv].iov_len = part;
337 			len += part;
338 			nr_kv++;
339 		}
340 	}
341 
342 	call->ackr_nr_unacked = 0;
343 	atomic_set(&call->ackr_nr_consumed, 0);
344 	clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
345 
346 	trace_rxrpc_send_ack(call, why, ack_reason, ack_serial);
347 	rxrpc_send_ack_packet(call, nr_kv, len, ack_serial, why);
348 skip:
349 	rxrpc_free_ack(call);
350 }
351 
352 /*
353  * Send an ACK probe for path MTU discovery.
354  */
rxrpc_send_probe_for_pmtud(struct rxrpc_call * call)355 void rxrpc_send_probe_for_pmtud(struct rxrpc_call *call)
356 {
357 	rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
358 		       rxrpc_propose_ack_ping_for_mtu_probe);
359 }
360 
361 /*
362  * Send an ABORT call packet.
363  */
rxrpc_send_abort_packet(struct rxrpc_call * call)364 int rxrpc_send_abort_packet(struct rxrpc_call *call)
365 {
366 	struct rxrpc_connection *conn;
367 	struct rxrpc_abort_buffer pkt;
368 	struct msghdr msg;
369 	struct kvec iov[1];
370 	rxrpc_serial_t serial;
371 	int ret;
372 
373 	/* Don't bother sending aborts for a client call once the server has
374 	 * hard-ACK'd all of its request data.  After that point, we're not
375 	 * going to stop the operation proceeding, and whilst we might limit
376 	 * the reply, it's not worth it if we can send a new call on the same
377 	 * channel instead, thereby closing off this call.
378 	 */
379 	if (rxrpc_is_client_call(call) &&
380 	    test_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags))
381 		return 0;
382 
383 	if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
384 		return -ECONNRESET;
385 
386 	conn = call->conn;
387 
388 	msg.msg_name	= &call->peer->srx.transport;
389 	msg.msg_namelen	= call->peer->srx.transport_len;
390 	msg.msg_control	= NULL;
391 	msg.msg_controllen = 0;
392 	msg.msg_flags	= 0;
393 
394 	pkt.whdr.epoch		= htonl(conn->proto.epoch);
395 	pkt.whdr.cid		= htonl(call->cid);
396 	pkt.whdr.callNumber	= htonl(call->call_id);
397 	pkt.whdr.seq		= 0;
398 	pkt.whdr.type		= RXRPC_PACKET_TYPE_ABORT;
399 	pkt.whdr.flags		= conn->out_clientflag;
400 	pkt.whdr.userStatus	= 0;
401 	pkt.whdr.securityIndex	= call->security_ix;
402 	pkt.whdr._rsvd		= 0;
403 	pkt.whdr.serviceId	= htons(call->dest_srx.srx_service);
404 	pkt.abort_code		= htonl(call->abort_code);
405 
406 	iov[0].iov_base	= &pkt;
407 	iov[0].iov_len	= sizeof(pkt);
408 
409 	serial = rxrpc_get_next_serial(conn);
410 	pkt.whdr.serial = htonl(serial);
411 
412 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, sizeof(pkt));
413 	ret = do_udp_sendmsg(conn->local->socket, &msg, sizeof(pkt));
414 	conn->peer->last_tx_at = ktime_get_seconds();
415 	if (ret < 0)
416 		trace_rxrpc_tx_fail(call->debug_id, serial, ret,
417 				    rxrpc_tx_point_call_abort);
418 	else
419 		trace_rxrpc_tx_packet(call->debug_id, &pkt.whdr,
420 				      rxrpc_tx_point_call_abort);
421 	rxrpc_tx_backoff(call, ret);
422 	return ret;
423 }
424 
425 /*
426  * Prepare a (sub)packet for transmission.
427  */
rxrpc_prepare_data_subpacket(struct rxrpc_call * call,struct rxrpc_send_data_req * req,struct rxrpc_txbuf * txb,struct rxrpc_wire_header * whdr,rxrpc_serial_t serial,int subpkt)428 static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call,
429 					   struct rxrpc_send_data_req *req,
430 					   struct rxrpc_txbuf *txb,
431 					   struct rxrpc_wire_header *whdr,
432 					   rxrpc_serial_t serial, int subpkt)
433 {
434 	struct rxrpc_jumbo_header *jumbo = txb->data - sizeof(*jumbo);
435 	enum rxrpc_req_ack_trace why;
436 	struct rxrpc_connection *conn = call->conn;
437 	struct kvec *kv = &call->local->kvec[1 + subpkt];
438 	size_t len = txb->pkt_len;
439 	bool last;
440 	u8 flags;
441 
442 	_enter("%x,%zd", txb->seq, len);
443 
444 	txb->serial = serial;
445 
446 	if (test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags) &&
447 	    txb->seq == 1)
448 		whdr->userStatus = RXRPC_USERSTATUS_SERVICE_UPGRADE;
449 
450 	txb->flags &= ~RXRPC_REQUEST_ACK;
451 	flags = txb->flags & RXRPC_TXBUF_WIRE_FLAGS;
452 	last = txb->flags & RXRPC_LAST_PACKET;
453 
454 	if (subpkt < req->n - 1) {
455 		len = RXRPC_JUMBO_DATALEN;
456 		goto dont_set_request_ack;
457 	}
458 
459 	/* If our RTT cache needs working on, request an ACK.  Also request
460 	 * ACKs if a DATA packet appears to have been lost.
461 	 *
462 	 * However, we mustn't request an ACK on the last reply packet of a
463 	 * service call, lest OpenAFS incorrectly send us an ACK with some
464 	 * soft-ACKs in it and then never follow up with a proper hard ACK.
465 	 */
466 	if (last && rxrpc_sending_to_client(txb))
467 		why = rxrpc_reqack_no_srv_last;
468 	else if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events))
469 		why = rxrpc_reqack_ack_lost;
470 	else if (txb->flags & RXRPC_TXBUF_RESENT)
471 		why = rxrpc_reqack_retrans;
472 	else if (call->cong_ca_state == RXRPC_CA_SLOW_START && call->cong_cwnd <= RXRPC_MIN_CWND)
473 		why = rxrpc_reqack_slow_start;
474 	else if (call->tx_winsize <= 2)
475 		why = rxrpc_reqack_small_txwin;
476 	else if (call->rtt_count < 3)
477 		why = rxrpc_reqack_more_rtt;
478 	else if (ktime_before(ktime_add_ms(call->rtt_last_req, 1000), ktime_get_real()))
479 		why = rxrpc_reqack_old_rtt;
480 	else if (!last && !after(READ_ONCE(call->send_top), txb->seq))
481 		why = rxrpc_reqack_app_stall;
482 	else
483 		goto dont_set_request_ack;
484 
485 	rxrpc_inc_stat(call->rxnet, stat_why_req_ack[why]);
486 	trace_rxrpc_req_ack(call->debug_id, txb->seq, why);
487 	if (why != rxrpc_reqack_no_srv_last) {
488 		flags |= RXRPC_REQUEST_ACK;
489 		trace_rxrpc_rtt_tx(call, rxrpc_rtt_tx_data, -1, serial);
490 		call->rtt_last_req = req->now;
491 	}
492 dont_set_request_ack:
493 
494 	/* There's a jumbo header prepended to the data if we need it. */
495 	if (subpkt < req->n - 1)
496 		flags |= RXRPC_JUMBO_PACKET;
497 	else
498 		flags &= ~RXRPC_JUMBO_PACKET;
499 	if (subpkt == 0) {
500 		whdr->flags	= flags;
501 		whdr->cksum	= txb->cksum;
502 		kv->iov_base	= txb->data;
503 	} else {
504 		jumbo->flags	= flags;
505 		jumbo->pad	= 0;
506 		jumbo->cksum	= txb->cksum;
507 		kv->iov_base	= jumbo;
508 		len += sizeof(*jumbo);
509 	}
510 
511 	trace_rxrpc_tx_data(call, txb->seq, txb->serial, flags, req->trace);
512 	kv->iov_len = len;
513 	return len;
514 }
515 
516 /*
517  * Prepare a transmission queue object for initial transmission.  Returns the
518  * number of microseconds since the transmission queue base timestamp.
519  */
rxrpc_prepare_txqueue(struct rxrpc_txqueue * tq,struct rxrpc_send_data_req * req)520 static unsigned int rxrpc_prepare_txqueue(struct rxrpc_txqueue *tq,
521 					  struct rxrpc_send_data_req *req)
522 {
523 	if (!tq)
524 		return 0;
525 	if (tq->xmit_ts_base == KTIME_MIN) {
526 		tq->xmit_ts_base = req->now;
527 		return 0;
528 	}
529 	return ktime_to_us(ktime_sub(req->now, tq->xmit_ts_base));
530 }
531 
532 /*
533  * Prepare a (jumbo) packet for transmission.
534  */
rxrpc_prepare_data_packet(struct rxrpc_call * call,struct rxrpc_send_data_req * req,struct rxrpc_wire_header * whdr)535 static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call,
536 					struct rxrpc_send_data_req *req,
537 					struct rxrpc_wire_header *whdr)
538 {
539 	struct rxrpc_txqueue *tq = req->tq;
540 	rxrpc_serial_t serial;
541 	unsigned int xmit_ts;
542 	rxrpc_seq_t seq = req->seq;
543 	size_t len = 0;
544 	bool start_tlp = false;
545 
546 	trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit);
547 
548 	/* Each transmission of a Tx packet needs a new serial number */
549 	serial = rxrpc_get_next_serials(call->conn, req->n);
550 
551 	whdr->epoch		= htonl(call->conn->proto.epoch);
552 	whdr->cid		= htonl(call->cid);
553 	whdr->callNumber	= htonl(call->call_id);
554 	whdr->seq		= htonl(seq);
555 	whdr->serial		= htonl(serial);
556 	whdr->type		= RXRPC_PACKET_TYPE_DATA;
557 	whdr->flags		= 0;
558 	whdr->userStatus	= 0;
559 	whdr->securityIndex	= call->security_ix;
560 	whdr->_rsvd		= 0;
561 	whdr->serviceId		= htons(call->conn->service_id);
562 
563 	call->tx_last_serial = serial + req->n - 1;
564 	call->tx_last_sent = req->now;
565 	xmit_ts = rxrpc_prepare_txqueue(tq, req);
566 	prefetch(tq->next);
567 
568 	for (int i = 0;;) {
569 		int ix = seq & RXRPC_TXQ_MASK;
570 		struct rxrpc_txbuf *txb = tq->bufs[seq & RXRPC_TXQ_MASK];
571 
572 		_debug("prep[%u] tq=%x q=%x", i, tq->qbase, seq);
573 
574 		/* Record (re-)transmission for RACK [RFC8985 6.1]. */
575 		if (__test_and_clear_bit(ix, &tq->segment_lost))
576 			call->tx_nr_lost--;
577 		if (req->retrans) {
578 			__set_bit(ix, &tq->ever_retransmitted);
579 			__set_bit(ix, &tq->segment_retransmitted);
580 			call->tx_nr_resent++;
581 		} else {
582 			call->tx_nr_sent++;
583 			start_tlp = true;
584 		}
585 		tq->segment_xmit_ts[ix] = xmit_ts;
586 		tq->segment_serial[ix] = serial;
587 		if (i + 1 == req->n)
588 			/* Only sample the last subpacket in a jumbo. */
589 			__set_bit(ix, &tq->rtt_samples);
590 		len += rxrpc_prepare_data_subpacket(call, req, txb, whdr, serial, i);
591 		serial++;
592 		seq++;
593 		i++;
594 		if (i >= req->n)
595 			break;
596 		if (!(seq & RXRPC_TXQ_MASK)) {
597 			tq = tq->next;
598 			trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit_advance);
599 			xmit_ts = rxrpc_prepare_txqueue(tq, req);
600 		}
601 	}
602 
603 	/* Set timeouts */
604 	if (req->tlp_probe) {
605 		/* Sending TLP loss probe [RFC8985 7.3]. */
606 		call->tlp_serial = serial - 1;
607 		call->tlp_seq = seq - 1;
608 	} else if (start_tlp) {
609 		/* Schedule TLP loss probe [RFC8985 7.2]. */
610 		ktime_t pto;
611 
612 		if (!test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags))
613 			 /* The first packet may take longer to elicit a response. */
614 			pto = NSEC_PER_SEC;
615 		else
616 			pto = rxrpc_tlp_calc_pto(call, req->now);
617 
618 		call->rack_timer_mode = RXRPC_CALL_RACKTIMER_TLP_PTO;
619 		call->rack_timo_at = ktime_add(req->now, pto);
620 		trace_rxrpc_rack_timer(call, pto, false);
621 		trace_rxrpc_timer_set(call, pto, rxrpc_timer_trace_rack_tlp_pto);
622 	}
623 
624 	if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags)) {
625 		ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo));
626 
627 		call->expect_rx_by = ktime_add(req->now, delay);
628 		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx);
629 	}
630 
631 	rxrpc_set_keepalive(call, req->now);
632 	page_frag_free(whdr);
633 	return len;
634 }
635 
636 /*
637  * Send one or more packets through the transport endpoint
638  */
rxrpc_send_data_packet(struct rxrpc_call * call,struct rxrpc_send_data_req * req)639 void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
640 {
641 	struct rxrpc_wire_header *whdr;
642 	struct rxrpc_connection *conn = call->conn;
643 	enum rxrpc_tx_point frag;
644 	struct rxrpc_txqueue *tq = req->tq;
645 	struct rxrpc_txbuf *txb;
646 	struct msghdr msg;
647 	rxrpc_seq_t seq = req->seq;
648 	size_t len = sizeof(*whdr);
649 	bool new_call = test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags);
650 	int ret, stat_ix;
651 
652 	_enter("%x,%x-%x", tq->qbase, seq, seq + req->n - 1);
653 
654 	whdr = page_frag_alloc(&call->local->tx_alloc, sizeof(*whdr), GFP_NOFS);
655 	if (!whdr)
656 		return; /* Drop the packet if no memory. */
657 
658 	call->local->kvec[0].iov_base = whdr;
659 	call->local->kvec[0].iov_len = sizeof(*whdr);
660 
661 	stat_ix = umin(req->n, ARRAY_SIZE(call->rxnet->stat_tx_jumbo)) - 1;
662 	atomic_inc(&call->rxnet->stat_tx_jumbo[stat_ix]);
663 
664 	len += rxrpc_prepare_data_packet(call, req, whdr);
665 	txb = tq->bufs[seq & RXRPC_TXQ_MASK];
666 
667 	iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, 1 + req->n, len);
668 
669 	msg.msg_name	= &call->peer->srx.transport;
670 	msg.msg_namelen	= call->peer->srx.transport_len;
671 	msg.msg_control	= NULL;
672 	msg.msg_controllen = 0;
673 	msg.msg_flags	= MSG_SPLICE_PAGES;
674 
675 	/* Send the packet with the don't fragment bit set unless we think it's
676 	 * too big or if this is a retransmission.
677 	 */
678 	if (seq == call->tx_transmitted + 1 &&
679 	    len >= sizeof(struct rxrpc_wire_header) + call->peer->max_data) {
680 		rxrpc_local_dont_fragment(conn->local, false);
681 		frag = rxrpc_tx_point_call_data_frag;
682 	} else {
683 		rxrpc_local_dont_fragment(conn->local, true);
684 		frag = rxrpc_tx_point_call_data_nofrag;
685 	}
686 
687 	/* Track what we've attempted to transmit at least once so that the
688 	 * retransmission algorithm doesn't try to resend what we haven't sent
689 	 * yet.
690 	 */
691 	if (seq == call->tx_transmitted + 1)
692 		call->tx_transmitted = seq + req->n - 1;
693 
694 	if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
695 		static int lose;
696 
697 		if ((lose++ & 7) == 7) {
698 			ret = 0;
699 			trace_rxrpc_tx_data(call, txb->seq, txb->serial, txb->flags,
700 					    rxrpc_txdata_inject_loss);
701 			conn->peer->last_tx_at = ktime_get_seconds();
702 			goto done;
703 		}
704 	}
705 
706 	/* send the packet by UDP
707 	 * - returns -EMSGSIZE if UDP would have to fragment the packet
708 	 *   to go out of the interface
709 	 *   - in which case, we'll have processed the ICMP error
710 	 *     message and update the peer record
711 	 */
712 	rxrpc_inc_stat(call->rxnet, stat_tx_data_send);
713 	ret = do_udp_sendmsg(conn->local->socket, &msg, len);
714 	conn->peer->last_tx_at = ktime_get_seconds();
715 
716 	if (ret == -EMSGSIZE) {
717 		rxrpc_inc_stat(call->rxnet, stat_tx_data_send_msgsize);
718 		trace_rxrpc_tx_packet(call->debug_id, whdr, frag);
719 		ret = 0;
720 	} else if (ret < 0) {
721 		rxrpc_inc_stat(call->rxnet, stat_tx_data_send_fail);
722 		trace_rxrpc_tx_fail(call->debug_id, txb->serial, ret, frag);
723 	} else {
724 		trace_rxrpc_tx_packet(call->debug_id, whdr, frag);
725 	}
726 
727 	rxrpc_tx_backoff(call, ret);
728 
729 	if (ret < 0) {
730 		/* Cancel the call if the initial transmission fails or if we
731 		 * hit due to network routing issues that aren't going away
732 		 * anytime soon.  The layer above can arrange the
733 		 * retransmission.
734 		 */
735 		if (new_call ||
736 		    ret == -ENETUNREACH ||
737 		    ret == -EHOSTUNREACH ||
738 		    ret == -ECONNREFUSED)
739 			rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
740 						  RX_USER_ABORT, ret);
741 	}
742 
743 done:
744 	_leave(" = %d [%u]", ret, call->peer->max_data);
745 }
746 
747 /*
748  * Transmit a connection-level abort.
749  */
rxrpc_send_conn_abort(struct rxrpc_connection * conn)750 void rxrpc_send_conn_abort(struct rxrpc_connection *conn)
751 {
752 	struct rxrpc_wire_header whdr;
753 	struct msghdr msg;
754 	struct kvec iov[2];
755 	__be32 word;
756 	size_t len;
757 	u32 serial;
758 	int ret;
759 
760 	msg.msg_name	= &conn->peer->srx.transport;
761 	msg.msg_namelen	= conn->peer->srx.transport_len;
762 	msg.msg_control	= NULL;
763 	msg.msg_controllen = 0;
764 	msg.msg_flags	= 0;
765 
766 	whdr.epoch	= htonl(conn->proto.epoch);
767 	whdr.cid	= htonl(conn->proto.cid);
768 	whdr.callNumber	= 0;
769 	whdr.seq	= 0;
770 	whdr.type	= RXRPC_PACKET_TYPE_ABORT;
771 	whdr.flags	= conn->out_clientflag;
772 	whdr.userStatus	= 0;
773 	whdr.securityIndex = conn->security_ix;
774 	whdr._rsvd	= 0;
775 	whdr.serviceId	= htons(conn->service_id);
776 
777 	word		= htonl(conn->abort_code);
778 
779 	iov[0].iov_base	= &whdr;
780 	iov[0].iov_len	= sizeof(whdr);
781 	iov[1].iov_base	= &word;
782 	iov[1].iov_len	= sizeof(word);
783 
784 	len = iov[0].iov_len + iov[1].iov_len;
785 
786 	serial = rxrpc_get_next_serial(conn);
787 	whdr.serial = htonl(serial);
788 
789 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
790 	ret = do_udp_sendmsg(conn->local->socket, &msg, len);
791 	if (ret < 0) {
792 		trace_rxrpc_tx_fail(conn->debug_id, serial, ret,
793 				    rxrpc_tx_point_conn_abort);
794 		_debug("sendmsg failed: %d", ret);
795 		return;
796 	}
797 
798 	trace_rxrpc_tx_packet(conn->debug_id, &whdr, rxrpc_tx_point_conn_abort);
799 
800 	conn->peer->last_tx_at = ktime_get_seconds();
801 }
802 
803 /*
804  * Reject a packet through the local endpoint.
805  */
rxrpc_reject_packet(struct rxrpc_local * local,struct sk_buff * skb)806 void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
807 {
808 	struct rxrpc_wire_header whdr;
809 	struct sockaddr_rxrpc srx;
810 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
811 	struct msghdr msg;
812 	struct kvec iov[2];
813 	size_t size;
814 	__be32 code;
815 	int ret, ioc;
816 
817 	rxrpc_see_skb(skb, rxrpc_skb_see_reject);
818 
819 	iov[0].iov_base = &whdr;
820 	iov[0].iov_len = sizeof(whdr);
821 	iov[1].iov_base = &code;
822 	iov[1].iov_len = sizeof(code);
823 
824 	msg.msg_name = &srx.transport;
825 	msg.msg_control = NULL;
826 	msg.msg_controllen = 0;
827 	msg.msg_flags = 0;
828 
829 	memset(&whdr, 0, sizeof(whdr));
830 
831 	switch (skb->mark) {
832 	case RXRPC_SKB_MARK_REJECT_BUSY:
833 		whdr.type = RXRPC_PACKET_TYPE_BUSY;
834 		size = sizeof(whdr);
835 		ioc = 1;
836 		break;
837 	case RXRPC_SKB_MARK_REJECT_ABORT:
838 		whdr.type = RXRPC_PACKET_TYPE_ABORT;
839 		code = htonl(skb->priority);
840 		size = sizeof(whdr) + sizeof(code);
841 		ioc = 2;
842 		break;
843 	default:
844 		return;
845 	}
846 
847 	if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) {
848 		msg.msg_namelen = srx.transport_len;
849 
850 		whdr.epoch	= htonl(sp->hdr.epoch);
851 		whdr.cid	= htonl(sp->hdr.cid);
852 		whdr.callNumber	= htonl(sp->hdr.callNumber);
853 		whdr.serviceId	= htons(sp->hdr.serviceId);
854 		whdr.flags	= sp->hdr.flags;
855 		whdr.flags	^= RXRPC_CLIENT_INITIATED;
856 		whdr.flags	&= RXRPC_CLIENT_INITIATED;
857 
858 		iov_iter_kvec(&msg.msg_iter, WRITE, iov, ioc, size);
859 		ret = do_udp_sendmsg(local->socket, &msg, size);
860 		if (ret < 0)
861 			trace_rxrpc_tx_fail(local->debug_id, 0, ret,
862 					    rxrpc_tx_point_reject);
863 		else
864 			trace_rxrpc_tx_packet(local->debug_id, &whdr,
865 					      rxrpc_tx_point_reject);
866 	}
867 }
868 
869 /*
870  * Send a VERSION reply to a peer as a keepalive.
871  */
rxrpc_send_keepalive(struct rxrpc_peer * peer)872 void rxrpc_send_keepalive(struct rxrpc_peer *peer)
873 {
874 	struct rxrpc_wire_header whdr;
875 	struct msghdr msg;
876 	struct kvec iov[2];
877 	size_t len;
878 	int ret;
879 
880 	_enter("");
881 
882 	msg.msg_name	= &peer->srx.transport;
883 	msg.msg_namelen	= peer->srx.transport_len;
884 	msg.msg_control	= NULL;
885 	msg.msg_controllen = 0;
886 	msg.msg_flags	= 0;
887 
888 	whdr.epoch	= htonl(peer->local->rxnet->epoch);
889 	whdr.cid	= 0;
890 	whdr.callNumber	= 0;
891 	whdr.seq	= 0;
892 	whdr.serial	= 0;
893 	whdr.type	= RXRPC_PACKET_TYPE_VERSION; /* Not client-initiated */
894 	whdr.flags	= RXRPC_LAST_PACKET;
895 	whdr.userStatus	= 0;
896 	whdr.securityIndex = 0;
897 	whdr._rsvd	= 0;
898 	whdr.serviceId	= 0;
899 
900 	iov[0].iov_base	= &whdr;
901 	iov[0].iov_len	= sizeof(whdr);
902 	iov[1].iov_base	= (char *)rxrpc_keepalive_string;
903 	iov[1].iov_len	= sizeof(rxrpc_keepalive_string);
904 
905 	len = iov[0].iov_len + iov[1].iov_len;
906 
907 	iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
908 	ret = do_udp_sendmsg(peer->local->socket, &msg, len);
909 	if (ret < 0)
910 		trace_rxrpc_tx_fail(peer->debug_id, 0, ret,
911 				    rxrpc_tx_point_version_keepalive);
912 	else
913 		trace_rxrpc_tx_packet(peer->debug_id, &whdr,
914 				      rxrpc_tx_point_version_keepalive);
915 
916 	peer->last_tx_at = ktime_get_seconds();
917 	_leave("");
918 }
919