xref: /linux/net/rxrpc/call_event.c (revision 9b6ce594808580b2a19e6e1aa459ef56c0153ac1)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/module.h>
11 #include <linux/circ_buf.h>
12 #include <linux/net.h>
13 #include <linux/skbuff.h>
14 #include <linux/slab.h>
15 #include <linux/udp.h>
16 #include <net/sock.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
19 
20 /*
21  * Propose a PING ACK be sent.
22  */
23 void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
24 			enum rxrpc_propose_ack_trace why)
25 {
26 	ktime_t delay = ms_to_ktime(READ_ONCE(rxrpc_idle_ack_delay));
27 	ktime_t now = ktime_get_real();
28 	ktime_t ping_at = ktime_add(now, delay);
29 
30 	trace_rxrpc_propose_ack(call, why, RXRPC_ACK_PING, serial);
31 	if (ktime_before(ping_at, call->ping_at)) {
32 		call->ping_at = ping_at;
33 		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_ping);
34 	}
35 }
36 
37 /*
38  * Propose a DELAY ACK be sent in the future.
39  */
40 void rxrpc_propose_delay_ACK(struct rxrpc_call *call, rxrpc_serial_t serial,
41 			     enum rxrpc_propose_ack_trace why)
42 {
43 	ktime_t now = ktime_get_real(), delay;
44 
45 	trace_rxrpc_propose_ack(call, why, RXRPC_ACK_DELAY, serial);
46 
47 	if (call->srtt_us)
48 		delay = (call->srtt_us >> 3) * NSEC_PER_USEC;
49 	else
50 		delay = ms_to_ktime(READ_ONCE(rxrpc_soft_ack_delay));
51 	ktime_add_ms(delay, call->tx_backoff);
52 
53 	call->delay_ack_at = ktime_add(now, delay);
54 	trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_delayed_ack);
55 }
56 
57 /*
58  * Retransmit one or more packets.
59  */
60 static bool rxrpc_retransmit_data(struct rxrpc_call *call,
61 				  struct rxrpc_send_data_req *req)
62 {
63 	struct rxrpc_txqueue *tq = req->tq;
64 	unsigned int ix = req->seq & RXRPC_TXQ_MASK;
65 	struct rxrpc_txbuf *txb = tq->bufs[ix];
66 
67 	_enter("%x,%x,%x,%x", tq->qbase, req->seq, ix, txb->debug_id);
68 
69 	req->retrans = true;
70 	trace_rxrpc_retransmit(call, req, txb);
71 
72 	txb->flags |= RXRPC_TXBUF_RESENT;
73 	rxrpc_send_data_packet(call, req);
74 	rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
75 
76 	req->tq		= NULL;
77 	req->n		= 0;
78 	req->did_send	= true;
79 	req->now	= ktime_get_real();
80 	return true;
81 }
82 
83 /*
84  * Perform retransmission of NAK'd and unack'd packets.
85  */
86 static void rxrpc_resend(struct rxrpc_call *call)
87 {
88 	struct rxrpc_send_data_req req = {
89 		.now	= ktime_get_real(),
90 		.trace	= rxrpc_txdata_retransmit,
91 	};
92 	struct rxrpc_txqueue *tq;
93 
94 	_enter("{%d,%d}", call->tx_bottom, call->tx_top);
95 
96 	trace_rxrpc_resend(call, call->acks_highest_serial);
97 
98 	/* Scan the transmission queue, looking for lost packets. */
99 	for (tq = call->tx_queue; tq; tq = tq->next) {
100 		unsigned long lost = tq->segment_lost;
101 
102 		if (after(tq->qbase, call->tx_transmitted))
103 			break;
104 
105 		_debug("retr %16lx %u c=%08x [%x]",
106 		       tq->segment_acked, tq->nr_reported_acks, call->debug_id, tq->qbase);
107 		_debug("lost %16lx", lost);
108 
109 		trace_rxrpc_resend_lost(call, tq, lost);
110 		while (lost) {
111 			unsigned int ix = __ffs(lost);
112 			struct rxrpc_txbuf *txb = tq->bufs[ix];
113 
114 			__clear_bit(ix, &lost);
115 			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_lost);
116 
117 			req.tq  = tq;
118 			req.seq = tq->qbase + ix;
119 			req.n   = 1;
120 			rxrpc_retransmit_data(call, &req);
121 		}
122 	}
123 
124 	rxrpc_get_rto_backoff(call, req.did_send);
125 	_leave("");
126 }
127 
128 /*
129  * Resend the highest-seq DATA packet so far transmitted for RACK-TLP [RFC8985 7.3].
130  */
131 void rxrpc_resend_tlp(struct rxrpc_call *call)
132 {
133 	struct rxrpc_send_data_req req = {
134 		.now		= ktime_get_real(),
135 		.seq		= call->tx_transmitted,
136 		.n		= 1,
137 		.tlp_probe	= true,
138 		.trace		= rxrpc_txdata_tlp_retransmit,
139 	};
140 
141 	/* There's a chance it'll be on the tail segment of the queue. */
142 	req.tq = READ_ONCE(call->tx_qtail);
143 	if (req.tq &&
144 	    before(call->tx_transmitted, req.tq->qbase + RXRPC_NR_TXQUEUE)) {
145 		rxrpc_retransmit_data(call, &req);
146 		return;
147 	}
148 
149 	for (req.tq = call->tx_queue; req.tq; req.tq = req.tq->next) {
150 		if (after_eq(call->tx_transmitted, req.tq->qbase) &&
151 		    before(call->tx_transmitted, req.tq->qbase + RXRPC_NR_TXQUEUE)) {
152 			rxrpc_retransmit_data(call, &req);
153 			return;
154 		}
155 	}
156 }
157 
158 /*
159  * Start transmitting the reply to a service.  This cancels the need to ACK the
160  * request if we haven't yet done so.
161  */
162 static void rxrpc_begin_service_reply(struct rxrpc_call *call)
163 {
164 	rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SEND_REPLY);
165 	if (call->ackr_reason == RXRPC_ACK_DELAY)
166 		call->ackr_reason = 0;
167 	call->delay_ack_at = KTIME_MAX;
168 	trace_rxrpc_timer_can(call, rxrpc_timer_trace_delayed_ack);
169 }
170 
171 /*
172  * Close the transmission phase.  After this point there is no more data to be
173  * transmitted in the call.
174  */
175 static void rxrpc_close_tx_phase(struct rxrpc_call *call)
176 {
177 	_debug("________awaiting reply/ACK__________");
178 
179 	switch (__rxrpc_call_state(call)) {
180 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
181 		rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_ACK);
182 		break;
183 	case RXRPC_CALL_SERVER_SEND_REPLY:
184 		rxrpc_set_call_state(call, RXRPC_CALL_SERVER_AWAIT_ACK);
185 		break;
186 	default:
187 		break;
188 	}
189 }
190 
191 /*
192  * Transmit some as-yet untransmitted data, to a maximum of the supplied limit.
193  */
194 static void rxrpc_transmit_fresh_data(struct rxrpc_call *call, unsigned int limit,
195 				      enum rxrpc_txdata_trace trace)
196 {
197 	int space = rxrpc_tx_window_space(call);
198 
199 	if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
200 		if (call->send_top == call->tx_top)
201 			return;
202 		rxrpc_expose_client_call(call);
203 	}
204 
205 	while (space > 0) {
206 		struct rxrpc_send_data_req req = {
207 			.now	= ktime_get_real(),
208 			.seq	= call->tx_transmitted + 1,
209 			.n	= 0,
210 			.trace	= trace,
211 		};
212 		struct rxrpc_txqueue *tq;
213 		struct rxrpc_txbuf *txb;
214 		rxrpc_seq_t send_top, seq;
215 		int limit = min(space, max(call->peer->pmtud_jumbo, 1));
216 
217 		/* Order send_top before the contents of the new txbufs and
218 		 * txqueue pointers
219 		 */
220 		send_top = smp_load_acquire(&call->send_top);
221 		if (call->tx_top == send_top)
222 			break;
223 
224 		trace_rxrpc_transmit(call, send_top, space);
225 
226 		tq = call->tx_qtail;
227 		seq = call->tx_top;
228 		trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant);
229 
230 		do {
231 			int ix;
232 
233 			seq++;
234 			ix = seq & RXRPC_TXQ_MASK;
235 			if (!ix) {
236 				tq = tq->next;
237 				trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant_advance);
238 			}
239 			if (!req.tq)
240 				req.tq = tq;
241 			txb = tq->bufs[ix];
242 			req.n++;
243 			if (!txb->jumboable)
244 				break;
245 		} while (req.n < limit && before(seq, send_top));
246 
247 		if (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_PRE_SEND)
248 			rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_SEND_REQUEST);
249 		if (txb->flags & RXRPC_LAST_PACKET) {
250 			rxrpc_close_tx_phase(call);
251 			tq = NULL;
252 		}
253 		call->tx_qtail = tq;
254 		call->tx_top = seq;
255 
256 		space -= req.n;
257 		rxrpc_send_data_packet(call, &req);
258 	}
259 }
260 
261 void rxrpc_transmit_some_data(struct rxrpc_call *call, unsigned int limit,
262 			      enum rxrpc_txdata_trace trace)
263 {
264 	switch (__rxrpc_call_state(call)) {
265 	case RXRPC_CALL_SERVER_ACK_REQUEST:
266 		if (call->tx_bottom == READ_ONCE(call->send_top))
267 			return;
268 		rxrpc_begin_service_reply(call);
269 		fallthrough;
270 
271 	case RXRPC_CALL_SERVER_SEND_REPLY:
272 	case RXRPC_CALL_CLIENT_PRE_SEND:
273 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
274 		if (!rxrpc_tx_window_space(call))
275 			return;
276 		if (call->tx_bottom == READ_ONCE(call->send_top)) {
277 			rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow);
278 			return;
279 		}
280 		rxrpc_transmit_fresh_data(call, limit, trace);
281 		break;
282 	default:
283 		return;
284 	}
285 }
286 
287 /*
288  * Ping the other end to fill our RTT cache and to retrieve the rwind
289  * and MTU parameters.
290  */
291 static void rxrpc_send_initial_ping(struct rxrpc_call *call)
292 {
293 	if (call->rtt_count < 3 ||
294 	    ktime_before(ktime_add_ms(call->rtt_last_req, 1000),
295 			 ktime_get_real()))
296 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
297 			       rxrpc_propose_ack_ping_for_params);
298 }
299 
300 /*
301  * Handle retransmission and deferred ACK/abort generation.
302  */
303 bool rxrpc_input_call_event(struct rxrpc_call *call)
304 {
305 	struct sk_buff *skb;
306 	ktime_t now, t;
307 	bool did_receive = false, saw_ack = false;
308 	s32 abort_code;
309 
310 	rxrpc_see_call(call, rxrpc_call_see_input);
311 
312 	//printk("\n--------------------\n");
313 	_enter("{%d,%s,%lx}",
314 	       call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)],
315 	       call->events);
316 
317 	/* Handle abort request locklessly, vs rxrpc_propose_abort(). */
318 	abort_code = smp_load_acquire(&call->send_abort);
319 	if (abort_code) {
320 		rxrpc_abort_call(call, 0, call->send_abort, call->send_abort_err,
321 				 call->send_abort_why);
322 		goto out;
323 	}
324 
325 	do {
326 		skb = __skb_dequeue(&call->rx_queue);
327 		if (skb) {
328 			struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
329 
330 			if (__rxrpc_call_is_complete(call) ||
331 			    skb->mark == RXRPC_SKB_MARK_ERROR) {
332 				rxrpc_free_skb(skb, rxrpc_skb_put_call_rx);
333 				goto out;
334 			}
335 
336 			saw_ack |= sp->hdr.type == RXRPC_PACKET_TYPE_ACK;
337 
338 			rxrpc_input_call_packet(call, skb);
339 			rxrpc_free_skb(skb, rxrpc_skb_put_call_rx);
340 			did_receive = true;
341 		}
342 
343 		t = ktime_sub(call->rack_timo_at, ktime_get_real());
344 		if (t <= 0) {
345 			trace_rxrpc_timer_exp(call, t,
346 					      rxrpc_timer_trace_rack_off + call->rack_timer_mode);
347 			call->rack_timo_at = KTIME_MAX;
348 			rxrpc_rack_timer_expired(call, t);
349 		}
350 
351 	} while (!skb_queue_empty(&call->rx_queue));
352 
353 	/* If we see our async-event poke, check for timeout trippage. */
354 	now = ktime_get_real();
355 	t = ktime_sub(call->expect_rx_by, now);
356 	if (t <= 0) {
357 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_expect_rx);
358 		goto expired;
359 	}
360 
361 	t = ktime_sub(call->expect_req_by, now);
362 	if (t <= 0) {
363 		call->expect_req_by = KTIME_MAX;
364 		if (__rxrpc_call_state(call) == RXRPC_CALL_SERVER_RECV_REQUEST) {
365 			trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_idle);
366 			goto expired;
367 		}
368 	}
369 
370 	t = ktime_sub(READ_ONCE(call->expect_term_by), now);
371 	if (t <= 0) {
372 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_hard);
373 		goto expired;
374 	}
375 
376 	t = ktime_sub(call->delay_ack_at, now);
377 	if (t <= 0) {
378 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_delayed_ack);
379 		call->delay_ack_at = KTIME_MAX;
380 		rxrpc_send_ACK(call, RXRPC_ACK_DELAY, 0,
381 			       rxrpc_propose_ack_delayed_ack);
382 	}
383 
384 	t = ktime_sub(call->ping_at, now);
385 	if (t <= 0) {
386 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_ping);
387 		call->ping_at = KTIME_MAX;
388 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
389 			       rxrpc_propose_ack_ping_for_keepalive);
390 	}
391 
392 	now = ktime_get_real();
393 	t = ktime_sub(call->keepalive_at, now);
394 	if (t <= 0) {
395 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_keepalive);
396 		call->keepalive_at = KTIME_MAX;
397 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
398 			       rxrpc_propose_ack_ping_for_keepalive);
399 	}
400 
401 	if (test_and_clear_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events))
402 		rxrpc_send_initial_ping(call);
403 
404 	rxrpc_transmit_some_data(call, UINT_MAX, rxrpc_txdata_new_data);
405 
406 	if (saw_ack)
407 		rxrpc_congestion_degrade(call);
408 
409 	if (did_receive &&
410 	    (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_SEND_REQUEST ||
411 	     __rxrpc_call_state(call) == RXRPC_CALL_SERVER_SEND_REPLY)) {
412 		t = ktime_sub(call->rack_timo_at, ktime_get_real());
413 		trace_rxrpc_rack(call, t);
414 	}
415 
416 	/* Process events */
417 	if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events))
418 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
419 			       rxrpc_propose_ack_ping_for_lost_ack);
420 
421 	if (call->tx_nr_lost > 0 &&
422 	    __rxrpc_call_state(call) != RXRPC_CALL_CLIENT_RECV_REPLY &&
423 	    !test_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags))
424 		rxrpc_resend(call);
425 
426 	if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
427 		rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
428 			       rxrpc_propose_ack_rx_idle);
429 
430 	if (call->ackr_nr_unacked > 2) {
431 		if (call->rtt_count < 3)
432 			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
433 				       rxrpc_propose_ack_ping_for_rtt);
434 		else if (ktime_before(ktime_add_ms(call->rtt_last_req, 1000),
435 				      ktime_get_real()))
436 			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
437 				       rxrpc_propose_ack_ping_for_old_rtt);
438 		else
439 			rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
440 				       rxrpc_propose_ack_input_data);
441 	}
442 
443 	/* Make sure the timer is restarted */
444 	if (!__rxrpc_call_is_complete(call)) {
445 		ktime_t next = READ_ONCE(call->expect_term_by), delay;
446 
447 #define set(T) { ktime_t _t = (T); if (ktime_before(_t, next)) next = _t; }
448 
449 		set(call->expect_req_by);
450 		set(call->expect_rx_by);
451 		set(call->delay_ack_at);
452 		set(call->rack_timo_at);
453 		set(call->keepalive_at);
454 		set(call->ping_at);
455 
456 		now = ktime_get_real();
457 		delay = ktime_sub(next, now);
458 		if (delay <= 0) {
459 			rxrpc_poke_call(call, rxrpc_call_poke_timer_now);
460 		} else {
461 			unsigned long nowj = jiffies, delayj, nextj;
462 
463 			delayj = umax(nsecs_to_jiffies(delay), 1);
464 			nextj = nowj + delayj;
465 			if (time_before(nextj, call->timer.expires) ||
466 			    !timer_pending(&call->timer)) {
467 				trace_rxrpc_timer_restart(call, delay, delayj);
468 				timer_reduce(&call->timer, nextj);
469 			}
470 		}
471 	}
472 
473 out:
474 	if (__rxrpc_call_is_complete(call)) {
475 		timer_delete_sync(&call->timer);
476 		if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
477 			rxrpc_disconnect_call(call);
478 		if (call->security)
479 			call->security->free_call_crypto(call);
480 	} else {
481 		if (did_receive &&
482 		    call->peer->ackr_adv_pmtud &&
483 		    call->peer->pmtud_pending)
484 			rxrpc_send_probe_for_pmtud(call);
485 	}
486 	_leave("");
487 	return true;
488 
489 expired:
490 	if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) &&
491 	    (int)call->conn->hi_serial - (int)call->rx_serial > 0) {
492 		trace_rxrpc_call_reset(call);
493 		rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ECONNRESET,
494 				 rxrpc_abort_call_reset);
495 	} else {
496 		rxrpc_abort_call(call, 0, RX_CALL_TIMEOUT, -ETIME,
497 				 rxrpc_abort_call_timeout);
498 	}
499 	goto out;
500 }
501