xref: /linux/net/rxrpc/call_event.c (revision e728258debd553c95d2e70f9cd97c9fde27c7130)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/module.h>
11 #include <linux/circ_buf.h>
12 #include <linux/net.h>
13 #include <linux/skbuff.h>
14 #include <linux/slab.h>
15 #include <linux/udp.h>
16 #include <net/sock.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
19 
20 /*
21  * Propose a PING ACK be sent.
22  */
23 void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
24 			enum rxrpc_propose_ack_trace why)
25 {
26 	ktime_t delay = ms_to_ktime(READ_ONCE(rxrpc_idle_ack_delay));
27 	ktime_t now = ktime_get_real();
28 	ktime_t ping_at = ktime_add(now, delay);
29 
30 	trace_rxrpc_propose_ack(call, why, RXRPC_ACK_PING, serial);
31 	if (ktime_before(ping_at, call->ping_at)) {
32 		call->ping_at = ping_at;
33 		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_ping);
34 	}
35 }
36 
37 /*
38  * Propose a DELAY ACK be sent in the future.
39  */
40 void rxrpc_propose_delay_ACK(struct rxrpc_call *call, rxrpc_serial_t serial,
41 			     enum rxrpc_propose_ack_trace why)
42 {
43 	ktime_t now = ktime_get_real(), delay;
44 
45 	trace_rxrpc_propose_ack(call, why, RXRPC_ACK_DELAY, serial);
46 
47 	if (call->srtt_us)
48 		delay = (call->srtt_us >> 3) * NSEC_PER_USEC;
49 	else
50 		delay = ms_to_ktime(READ_ONCE(rxrpc_soft_ack_delay));
51 	ktime_add_ms(delay, call->tx_backoff);
52 
53 	call->delay_ack_at = ktime_add(now, delay);
54 	trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_delayed_ack);
55 }
56 
57 /*
58  * Retransmit one or more packets.
59  */
60 static bool rxrpc_retransmit_data(struct rxrpc_call *call,
61 				  struct rxrpc_send_data_req *req)
62 {
63 	struct rxrpc_txqueue *tq = req->tq;
64 	unsigned int ix = req->seq & RXRPC_TXQ_MASK;
65 	struct rxrpc_txbuf *txb = tq->bufs[ix];
66 
67 	_enter("%x,%x,%x,%x", tq->qbase, req->seq, ix, txb->debug_id);
68 
69 	req->retrans = true;
70 	trace_rxrpc_retransmit(call, req, txb);
71 
72 	txb->flags |= RXRPC_TXBUF_RESENT;
73 	rxrpc_send_data_packet(call, req);
74 	rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
75 
76 	req->tq		= NULL;
77 	req->n		= 0;
78 	req->did_send	= true;
79 	req->now	= ktime_get_real();
80 	return true;
81 }
82 
83 /*
84  * Perform retransmission of NAK'd and unack'd packets.
85  */
86 static void rxrpc_resend(struct rxrpc_call *call)
87 {
88 	struct rxrpc_send_data_req req = {
89 		.now	= ktime_get_real(),
90 		.trace	= rxrpc_txdata_retransmit,
91 	};
92 	struct rxrpc_txqueue *tq;
93 
94 	_enter("{%d,%d}", call->tx_bottom, call->tx_top);
95 
96 	trace_rxrpc_resend(call, call->acks_highest_serial);
97 
98 	/* Scan the transmission queue, looking for lost packets. */
99 	for (tq = call->tx_queue; tq; tq = tq->next) {
100 		unsigned long lost = tq->segment_lost;
101 
102 		if (after(tq->qbase, call->tx_transmitted))
103 			break;
104 
105 		_debug("retr %16lx %u c=%08x [%x]",
106 		       tq->segment_acked, tq->nr_reported_acks, call->debug_id, tq->qbase);
107 		_debug("lost %16lx", lost);
108 
109 		trace_rxrpc_resend_lost(call, tq, lost);
110 		while (lost) {
111 			unsigned int ix = __ffs(lost);
112 			struct rxrpc_txbuf *txb = tq->bufs[ix];
113 
114 			__clear_bit(ix, &lost);
115 			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_lost);
116 
117 			req.tq  = tq;
118 			req.seq = tq->qbase + ix;
119 			req.n   = 1;
120 			rxrpc_retransmit_data(call, &req);
121 		}
122 	}
123 
124 	rxrpc_get_rto_backoff(call, req.did_send);
125 	_leave("");
126 }
127 
128 /*
129  * Resend the highest-seq DATA packet so far transmitted for RACK-TLP [RFC8985 7.3].
130  */
131 void rxrpc_resend_tlp(struct rxrpc_call *call)
132 {
133 	struct rxrpc_send_data_req req = {
134 		.now		= ktime_get_real(),
135 		.seq		= call->tx_transmitted,
136 		.n		= 1,
137 		.tlp_probe	= true,
138 		.trace		= rxrpc_txdata_tlp_retransmit,
139 	};
140 
141 	/* There's a chance it'll be on the tail segment of the queue. */
142 	req.tq = READ_ONCE(call->tx_qtail);
143 	if (req.tq &&
144 	    before(call->tx_transmitted, req.tq->qbase + RXRPC_NR_TXQUEUE)) {
145 		rxrpc_retransmit_data(call, &req);
146 		return;
147 	}
148 
149 	for (req.tq = call->tx_queue; req.tq; req.tq = req.tq->next) {
150 		if (after_eq(call->tx_transmitted, req.tq->qbase) &&
151 		    before(call->tx_transmitted, req.tq->qbase + RXRPC_NR_TXQUEUE)) {
152 			rxrpc_retransmit_data(call, &req);
153 			return;
154 		}
155 	}
156 }
157 
158 /*
159  * Start transmitting the reply to a service.  This cancels the need to ACK the
160  * request if we haven't yet done so.
161  */
162 static void rxrpc_begin_service_reply(struct rxrpc_call *call)
163 {
164 	rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SEND_REPLY);
165 	if (call->ackr_reason == RXRPC_ACK_DELAY)
166 		call->ackr_reason = 0;
167 	call->delay_ack_at = KTIME_MAX;
168 	trace_rxrpc_timer_can(call, rxrpc_timer_trace_delayed_ack);
169 }
170 
171 /*
172  * Close the transmission phase.  After this point there is no more data to be
173  * transmitted in the call.
174  */
175 static void rxrpc_close_tx_phase(struct rxrpc_call *call)
176 {
177 	_debug("________awaiting reply/ACK__________");
178 
179 	switch (__rxrpc_call_state(call)) {
180 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
181 		rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY);
182 		break;
183 	case RXRPC_CALL_SERVER_SEND_REPLY:
184 		rxrpc_set_call_state(call, RXRPC_CALL_SERVER_AWAIT_ACK);
185 		break;
186 	default:
187 		break;
188 	}
189 }
190 
191 /*
192  * Transmit some as-yet untransmitted data, to a maximum of the supplied limit.
193  */
194 static void rxrpc_transmit_fresh_data(struct rxrpc_call *call, unsigned int limit,
195 				      enum rxrpc_txdata_trace trace)
196 {
197 	int space = rxrpc_tx_window_space(call);
198 
199 	if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
200 		if (call->send_top == call->tx_top)
201 			return;
202 		rxrpc_expose_client_call(call);
203 	}
204 
205 	while (space > 0) {
206 		struct rxrpc_send_data_req req = {
207 			.now	= ktime_get_real(),
208 			.seq	= call->tx_transmitted + 1,
209 			.n	= 0,
210 			.trace	= trace,
211 		};
212 		struct rxrpc_txqueue *tq;
213 		struct rxrpc_txbuf *txb;
214 		rxrpc_seq_t send_top, seq;
215 		int limit = min(space, max(call->peer->pmtud_jumbo, 1));
216 
217 		/* Order send_top before the contents of the new txbufs and
218 		 * txqueue pointers
219 		 */
220 		send_top = smp_load_acquire(&call->send_top);
221 		if (call->tx_top == send_top)
222 			break;
223 
224 		trace_rxrpc_transmit(call, send_top, space);
225 
226 		tq = call->tx_qtail;
227 		seq = call->tx_top;
228 		trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant);
229 
230 		do {
231 			int ix;
232 
233 			seq++;
234 			ix = seq & RXRPC_TXQ_MASK;
235 			if (!ix) {
236 				tq = tq->next;
237 				trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant_advance);
238 			}
239 			if (!req.tq)
240 				req.tq = tq;
241 			txb = tq->bufs[ix];
242 			req.n++;
243 			if (!txb->jumboable)
244 				break;
245 		} while (req.n < limit && before(seq, send_top));
246 
247 		if (txb->flags & RXRPC_LAST_PACKET) {
248 			rxrpc_close_tx_phase(call);
249 			tq = NULL;
250 		}
251 		call->tx_qtail = tq;
252 		call->tx_top = seq;
253 
254 		space -= req.n;
255 		rxrpc_send_data_packet(call, &req);
256 	}
257 }
258 
259 void rxrpc_transmit_some_data(struct rxrpc_call *call, unsigned int limit,
260 			      enum rxrpc_txdata_trace trace)
261 {
262 	switch (__rxrpc_call_state(call)) {
263 	case RXRPC_CALL_SERVER_ACK_REQUEST:
264 		if (call->tx_bottom == READ_ONCE(call->send_top))
265 			return;
266 		rxrpc_begin_service_reply(call);
267 		fallthrough;
268 
269 	case RXRPC_CALL_SERVER_SEND_REPLY:
270 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
271 		if (!rxrpc_tx_window_space(call))
272 			return;
273 		if (call->tx_bottom == READ_ONCE(call->send_top)) {
274 			rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow);
275 			return;
276 		}
277 		rxrpc_transmit_fresh_data(call, limit, trace);
278 		break;
279 	default:
280 		return;
281 	}
282 }
283 
284 /*
285  * Ping the other end to fill our RTT cache and to retrieve the rwind
286  * and MTU parameters.
287  */
288 static void rxrpc_send_initial_ping(struct rxrpc_call *call)
289 {
290 	if (call->rtt_count < 3 ||
291 	    ktime_before(ktime_add_ms(call->rtt_last_req, 1000),
292 			 ktime_get_real()))
293 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
294 			       rxrpc_propose_ack_ping_for_params);
295 }
296 
297 /*
298  * Handle retransmission and deferred ACK/abort generation.
299  */
300 bool rxrpc_input_call_event(struct rxrpc_call *call)
301 {
302 	struct sk_buff *skb;
303 	ktime_t now, t;
304 	bool did_receive = false, saw_ack = false;
305 	s32 abort_code;
306 
307 	rxrpc_see_call(call, rxrpc_call_see_input);
308 
309 	//printk("\n--------------------\n");
310 	_enter("{%d,%s,%lx}",
311 	       call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)],
312 	       call->events);
313 
314 	/* Handle abort request locklessly, vs rxrpc_propose_abort(). */
315 	abort_code = smp_load_acquire(&call->send_abort);
316 	if (abort_code) {
317 		rxrpc_abort_call(call, 0, call->send_abort, call->send_abort_err,
318 				 call->send_abort_why);
319 		goto out;
320 	}
321 
322 	do {
323 		skb = __skb_dequeue(&call->rx_queue);
324 		if (skb) {
325 			struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
326 
327 			if (__rxrpc_call_is_complete(call) ||
328 			    skb->mark == RXRPC_SKB_MARK_ERROR) {
329 				rxrpc_free_skb(skb, rxrpc_skb_put_call_rx);
330 				goto out;
331 			}
332 
333 			saw_ack |= sp->hdr.type == RXRPC_PACKET_TYPE_ACK;
334 
335 			if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
336 			    sp->hdr.securityIndex != 0 &&
337 			    skb_cloned(skb)) {
338 				/* Unshare the packet so that it can be
339 				 * modified by in-place decryption.
340 				 */
341 				struct sk_buff *nskb = skb_copy(skb, GFP_ATOMIC);
342 
343 				if (nskb) {
344 					rxrpc_new_skb(nskb, rxrpc_skb_new_unshared);
345 					rxrpc_input_call_packet(call, nskb);
346 					rxrpc_free_skb(nskb, rxrpc_skb_put_call_rx);
347 				} else {
348 					/* OOM - Drop the packet. */
349 					rxrpc_see_skb(skb, rxrpc_skb_see_unshare_nomem);
350 				}
351 			} else {
352 				rxrpc_input_call_packet(call, skb);
353 			}
354 			rxrpc_free_skb(skb, rxrpc_skb_put_call_rx);
355 			did_receive = true;
356 		}
357 
358 		t = ktime_sub(call->rack_timo_at, ktime_get_real());
359 		if (t <= 0) {
360 			trace_rxrpc_timer_exp(call, t,
361 					      rxrpc_timer_trace_rack_off + call->rack_timer_mode);
362 			call->rack_timo_at = KTIME_MAX;
363 			rxrpc_rack_timer_expired(call, t);
364 		}
365 
366 	} while (!skb_queue_empty(&call->rx_queue));
367 
368 	/* If we see our async-event poke, check for timeout trippage. */
369 	now = ktime_get_real();
370 	t = ktime_sub(call->expect_rx_by, now);
371 	if (t <= 0) {
372 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_expect_rx);
373 		goto expired;
374 	}
375 
376 	t = ktime_sub(call->expect_req_by, now);
377 	if (t <= 0) {
378 		call->expect_req_by = KTIME_MAX;
379 		if (__rxrpc_call_state(call) == RXRPC_CALL_SERVER_RECV_REQUEST) {
380 			trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_idle);
381 			goto expired;
382 		}
383 	}
384 
385 	t = ktime_sub(READ_ONCE(call->expect_term_by), now);
386 	if (t <= 0) {
387 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_hard);
388 		goto expired;
389 	}
390 
391 	t = ktime_sub(call->delay_ack_at, now);
392 	if (t <= 0) {
393 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_delayed_ack);
394 		call->delay_ack_at = KTIME_MAX;
395 		rxrpc_send_ACK(call, RXRPC_ACK_DELAY, 0,
396 			       rxrpc_propose_ack_delayed_ack);
397 	}
398 
399 	t = ktime_sub(call->ping_at, now);
400 	if (t <= 0) {
401 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_ping);
402 		call->ping_at = KTIME_MAX;
403 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
404 			       rxrpc_propose_ack_ping_for_keepalive);
405 	}
406 
407 	now = ktime_get_real();
408 	t = ktime_sub(call->keepalive_at, now);
409 	if (t <= 0) {
410 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_keepalive);
411 		call->keepalive_at = KTIME_MAX;
412 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
413 			       rxrpc_propose_ack_ping_for_keepalive);
414 	}
415 
416 	if (test_and_clear_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events))
417 		rxrpc_send_initial_ping(call);
418 
419 	rxrpc_transmit_some_data(call, UINT_MAX, rxrpc_txdata_new_data);
420 
421 	if (saw_ack)
422 		rxrpc_congestion_degrade(call);
423 
424 	if (did_receive &&
425 	    (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_SEND_REQUEST ||
426 	     __rxrpc_call_state(call) == RXRPC_CALL_SERVER_SEND_REPLY)) {
427 		t = ktime_sub(call->rack_timo_at, ktime_get_real());
428 		trace_rxrpc_rack(call, t);
429 	}
430 
431 	/* Process events */
432 	if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events))
433 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
434 			       rxrpc_propose_ack_ping_for_lost_ack);
435 
436 	if (call->tx_nr_lost > 0 &&
437 	    __rxrpc_call_state(call) != RXRPC_CALL_CLIENT_RECV_REPLY &&
438 	    !test_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags))
439 		rxrpc_resend(call);
440 
441 	if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
442 		rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
443 			       rxrpc_propose_ack_rx_idle);
444 
445 	if (call->ackr_nr_unacked > 2) {
446 		if (call->rtt_count < 3)
447 			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
448 				       rxrpc_propose_ack_ping_for_rtt);
449 		else if (ktime_before(ktime_add_ms(call->rtt_last_req, 1000),
450 				      ktime_get_real()))
451 			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
452 				       rxrpc_propose_ack_ping_for_old_rtt);
453 		else
454 			rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
455 				       rxrpc_propose_ack_input_data);
456 	}
457 
458 	/* Make sure the timer is restarted */
459 	if (!__rxrpc_call_is_complete(call)) {
460 		ktime_t next = READ_ONCE(call->expect_term_by), delay;
461 
462 #define set(T) { ktime_t _t = (T); if (ktime_before(_t, next)) next = _t; }
463 
464 		set(call->expect_req_by);
465 		set(call->expect_rx_by);
466 		set(call->delay_ack_at);
467 		set(call->rack_timo_at);
468 		set(call->keepalive_at);
469 		set(call->ping_at);
470 
471 		now = ktime_get_real();
472 		delay = ktime_sub(next, now);
473 		if (delay <= 0) {
474 			rxrpc_poke_call(call, rxrpc_call_poke_timer_now);
475 		} else {
476 			unsigned long nowj = jiffies, delayj, nextj;
477 
478 			delayj = umax(nsecs_to_jiffies(delay), 1);
479 			nextj = nowj + delayj;
480 			if (time_before(nextj, call->timer.expires) ||
481 			    !timer_pending(&call->timer)) {
482 				trace_rxrpc_timer_restart(call, delay, delayj);
483 				timer_reduce(&call->timer, nextj);
484 			}
485 		}
486 	}
487 
488 out:
489 	if (__rxrpc_call_is_complete(call)) {
490 		timer_delete_sync(&call->timer);
491 		if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
492 			rxrpc_disconnect_call(call);
493 		if (call->security)
494 			call->security->free_call_crypto(call);
495 	} else {
496 		if (did_receive &&
497 		    call->peer->ackr_adv_pmtud &&
498 		    call->peer->pmtud_pending)
499 			rxrpc_send_probe_for_pmtud(call);
500 	}
501 	_leave("");
502 	return true;
503 
504 expired:
505 	if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) &&
506 	    (int)call->conn->hi_serial - (int)call->rx_serial > 0) {
507 		trace_rxrpc_call_reset(call);
508 		rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ECONNRESET,
509 				 rxrpc_abort_call_reset);
510 	} else {
511 		rxrpc_abort_call(call, 0, RX_CALL_TIMEOUT, -ETIME,
512 				 rxrpc_abort_call_timeout);
513 	}
514 	goto out;
515 }
516