xref: /linux/net/rxrpc/call_event.c (revision 816b02e63a759c4458edee142b721ab09c918b3d)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/module.h>
11 #include <linux/circ_buf.h>
12 #include <linux/net.h>
13 #include <linux/skbuff.h>
14 #include <linux/slab.h>
15 #include <linux/udp.h>
16 #include <net/sock.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
19 
20 /*
21  * Propose a PING ACK be sent.
22  */
23 void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
24 			enum rxrpc_propose_ack_trace why)
25 {
26 	ktime_t delay = ms_to_ktime(READ_ONCE(rxrpc_idle_ack_delay));
27 	ktime_t now = ktime_get_real();
28 	ktime_t ping_at = ktime_add(now, delay);
29 
30 	trace_rxrpc_propose_ack(call, why, RXRPC_ACK_PING, serial);
31 	if (ktime_before(ping_at, call->ping_at)) {
32 		call->ping_at = ping_at;
33 		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_ping);
34 	}
35 }
36 
37 /*
38  * Propose a DELAY ACK be sent in the future.
39  */
40 void rxrpc_propose_delay_ACK(struct rxrpc_call *call, rxrpc_serial_t serial,
41 			     enum rxrpc_propose_ack_trace why)
42 {
43 	ktime_t now = ktime_get_real(), delay;
44 
45 	trace_rxrpc_propose_ack(call, why, RXRPC_ACK_DELAY, serial);
46 
47 	if (call->srtt_us)
48 		delay = (call->srtt_us >> 3) * NSEC_PER_USEC;
49 	else
50 		delay = ms_to_ktime(READ_ONCE(rxrpc_soft_ack_delay));
51 	ktime_add_ms(delay, call->tx_backoff);
52 
53 	call->delay_ack_at = ktime_add(now, delay);
54 	trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_delayed_ack);
55 }
56 
57 /*
58  * Retransmit one or more packets.
59  */
60 static bool rxrpc_retransmit_data(struct rxrpc_call *call,
61 				  struct rxrpc_send_data_req *req)
62 {
63 	struct rxrpc_txqueue *tq = req->tq;
64 	unsigned int ix = req->seq & RXRPC_TXQ_MASK;
65 	struct rxrpc_txbuf *txb = tq->bufs[ix];
66 
67 	_enter("%x,%x,%x,%x", tq->qbase, req->seq, ix, txb->debug_id);
68 
69 	req->retrans = true;
70 	trace_rxrpc_retransmit(call, req, txb);
71 
72 	txb->flags |= RXRPC_TXBUF_RESENT;
73 	rxrpc_send_data_packet(call, req);
74 	rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
75 
76 	req->tq		= NULL;
77 	req->n		= 0;
78 	req->did_send	= true;
79 	req->now	= ktime_get_real();
80 	return true;
81 }
82 
83 /*
84  * Perform retransmission of NAK'd and unack'd packets.
85  */
86 static void rxrpc_resend(struct rxrpc_call *call)
87 {
88 	struct rxrpc_send_data_req req = {
89 		.now	= ktime_get_real(),
90 		.trace	= rxrpc_txdata_retransmit,
91 	};
92 	struct rxrpc_txqueue *tq;
93 
94 	_enter("{%d,%d}", call->tx_bottom, call->tx_top);
95 
96 	trace_rxrpc_resend(call, call->acks_highest_serial);
97 
98 	/* Scan the transmission queue, looking for lost packets. */
99 	for (tq = call->tx_queue; tq; tq = tq->next) {
100 		unsigned long lost = tq->segment_lost;
101 
102 		if (after(tq->qbase, call->tx_transmitted))
103 			break;
104 
105 		_debug("retr %16lx %u c=%08x [%x]",
106 		       tq->segment_acked, tq->nr_reported_acks, call->debug_id, tq->qbase);
107 		_debug("lost %16lx", lost);
108 
109 		trace_rxrpc_resend_lost(call, tq, lost);
110 		while (lost) {
111 			unsigned int ix = __ffs(lost);
112 			struct rxrpc_txbuf *txb = tq->bufs[ix];
113 
114 			__clear_bit(ix, &lost);
115 			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_lost);
116 
117 			req.tq  = tq;
118 			req.seq = tq->qbase + ix;
119 			req.n   = 1;
120 			rxrpc_retransmit_data(call, &req);
121 		}
122 	}
123 
124 	rxrpc_get_rto_backoff(call, req.did_send);
125 	_leave("");
126 }
127 
128 /*
129  * Resend the highest-seq DATA packet so far transmitted for RACK-TLP [RFC8985 7.3].
130  */
131 void rxrpc_resend_tlp(struct rxrpc_call *call)
132 {
133 	struct rxrpc_send_data_req req = {
134 		.now		= ktime_get_real(),
135 		.seq		= call->tx_transmitted,
136 		.n		= 1,
137 		.tlp_probe	= true,
138 		.trace		= rxrpc_txdata_tlp_retransmit,
139 	};
140 
141 	/* There's a chance it'll be on the tail segment of the queue. */
142 	req.tq = READ_ONCE(call->tx_qtail);
143 	if (req.tq &&
144 	    before(call->tx_transmitted, req.tq->qbase + RXRPC_NR_TXQUEUE)) {
145 		rxrpc_retransmit_data(call, &req);
146 		return;
147 	}
148 
149 	for (req.tq = call->tx_queue; req.tq; req.tq = req.tq->next) {
150 		if (after_eq(call->tx_transmitted, req.tq->qbase) &&
151 		    before(call->tx_transmitted, req.tq->qbase + RXRPC_NR_TXQUEUE)) {
152 			rxrpc_retransmit_data(call, &req);
153 			return;
154 		}
155 	}
156 }
157 
158 /*
159  * Start transmitting the reply to a service.  This cancels the need to ACK the
160  * request if we haven't yet done so.
161  */
162 static void rxrpc_begin_service_reply(struct rxrpc_call *call)
163 {
164 	rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SEND_REPLY);
165 	if (call->ackr_reason == RXRPC_ACK_DELAY)
166 		call->ackr_reason = 0;
167 	call->delay_ack_at = KTIME_MAX;
168 	trace_rxrpc_timer_can(call, rxrpc_timer_trace_delayed_ack);
169 }
170 
171 /*
172  * Close the transmission phase.  After this point there is no more data to be
173  * transmitted in the call.
174  */
175 static void rxrpc_close_tx_phase(struct rxrpc_call *call)
176 {
177 	_debug("________awaiting reply/ACK__________");
178 
179 	switch (__rxrpc_call_state(call)) {
180 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
181 		rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY);
182 		break;
183 	case RXRPC_CALL_SERVER_SEND_REPLY:
184 		rxrpc_set_call_state(call, RXRPC_CALL_SERVER_AWAIT_ACK);
185 		break;
186 	default:
187 		break;
188 	}
189 }
190 
191 /*
192  * Transmit some as-yet untransmitted data, to a maximum of the supplied limit.
193  */
194 static void rxrpc_transmit_fresh_data(struct rxrpc_call *call, unsigned int limit,
195 				      enum rxrpc_txdata_trace trace)
196 {
197 	int space = rxrpc_tx_window_space(call);
198 
199 	if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
200 		if (call->send_top == call->tx_top)
201 			return;
202 		rxrpc_expose_client_call(call);
203 	}
204 
205 	while (space > 0) {
206 		struct rxrpc_send_data_req req = {
207 			.now	= ktime_get_real(),
208 			.seq	= call->tx_transmitted + 1,
209 			.n	= 0,
210 			.trace	= trace,
211 		};
212 		struct rxrpc_txqueue *tq;
213 		struct rxrpc_txbuf *txb;
214 		rxrpc_seq_t send_top, seq;
215 		int limit = min(space, max(call->peer->pmtud_jumbo, 1));
216 
217 		/* Order send_top before the contents of the new txbufs and
218 		 * txqueue pointers
219 		 */
220 		send_top = smp_load_acquire(&call->send_top);
221 		if (call->tx_top == send_top)
222 			break;
223 
224 		trace_rxrpc_transmit(call, send_top, space);
225 
226 		tq = call->tx_qtail;
227 		seq = call->tx_top;
228 		trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant);
229 
230 		do {
231 			int ix;
232 
233 			seq++;
234 			ix = seq & RXRPC_TXQ_MASK;
235 			if (!ix) {
236 				tq = tq->next;
237 				trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant_advance);
238 			}
239 			if (!req.tq)
240 				req.tq = tq;
241 			txb = tq->bufs[ix];
242 			req.n++;
243 			if (!txb->jumboable)
244 				break;
245 		} while (req.n < limit && before(seq, send_top));
246 
247 		if (txb->flags & RXRPC_LAST_PACKET) {
248 			rxrpc_close_tx_phase(call);
249 			tq = NULL;
250 		}
251 		call->tx_qtail = tq;
252 		call->tx_top = seq;
253 
254 		space -= req.n;
255 		rxrpc_send_data_packet(call, &req);
256 	}
257 }
258 
259 void rxrpc_transmit_some_data(struct rxrpc_call *call, unsigned int limit,
260 			      enum rxrpc_txdata_trace trace)
261 {
262 	switch (__rxrpc_call_state(call)) {
263 	case RXRPC_CALL_SERVER_ACK_REQUEST:
264 		if (call->tx_bottom == READ_ONCE(call->send_top))
265 			return;
266 		rxrpc_begin_service_reply(call);
267 		fallthrough;
268 
269 	case RXRPC_CALL_SERVER_SEND_REPLY:
270 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
271 		if (!rxrpc_tx_window_space(call))
272 			return;
273 		if (call->tx_bottom == READ_ONCE(call->send_top)) {
274 			rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow);
275 			return;
276 		}
277 		rxrpc_transmit_fresh_data(call, limit, trace);
278 		break;
279 	default:
280 		return;
281 	}
282 }
283 
284 /*
285  * Ping the other end to fill our RTT cache and to retrieve the rwind
286  * and MTU parameters.
287  */
288 static void rxrpc_send_initial_ping(struct rxrpc_call *call)
289 {
290 	if (call->rtt_count < 3 ||
291 	    ktime_before(ktime_add_ms(call->rtt_last_req, 1000),
292 			 ktime_get_real()))
293 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
294 			       rxrpc_propose_ack_ping_for_params);
295 }
296 
297 /*
298  * Handle retransmission and deferred ACK/abort generation.
299  */
300 bool rxrpc_input_call_event(struct rxrpc_call *call)
301 {
302 	struct sk_buff *skb;
303 	ktime_t now, t;
304 	bool did_receive = false, saw_ack = false;
305 	s32 abort_code;
306 
307 	rxrpc_see_call(call, rxrpc_call_see_input);
308 
309 	//printk("\n--------------------\n");
310 	_enter("{%d,%s,%lx}",
311 	       call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)],
312 	       call->events);
313 
314 	/* Handle abort request locklessly, vs rxrpc_propose_abort(). */
315 	abort_code = smp_load_acquire(&call->send_abort);
316 	if (abort_code) {
317 		rxrpc_abort_call(call, 0, call->send_abort, call->send_abort_err,
318 				 call->send_abort_why);
319 		goto out;
320 	}
321 
322 	do {
323 		skb = __skb_dequeue(&call->rx_queue);
324 		if (skb) {
325 			struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
326 
327 			if (__rxrpc_call_is_complete(call) ||
328 			    skb->mark == RXRPC_SKB_MARK_ERROR) {
329 				rxrpc_free_skb(skb, rxrpc_skb_put_call_rx);
330 				goto out;
331 			}
332 
333 			saw_ack |= sp->hdr.type == RXRPC_PACKET_TYPE_ACK;
334 
335 			rxrpc_input_call_packet(call, skb);
336 			rxrpc_free_skb(skb, rxrpc_skb_put_call_rx);
337 			did_receive = true;
338 		}
339 
340 		t = ktime_sub(call->rack_timo_at, ktime_get_real());
341 		if (t <= 0) {
342 			trace_rxrpc_timer_exp(call, t,
343 					      rxrpc_timer_trace_rack_off + call->rack_timer_mode);
344 			call->rack_timo_at = KTIME_MAX;
345 			rxrpc_rack_timer_expired(call, t);
346 		}
347 
348 	} while (!skb_queue_empty(&call->rx_queue));
349 
350 	/* If we see our async-event poke, check for timeout trippage. */
351 	now = ktime_get_real();
352 	t = ktime_sub(call->expect_rx_by, now);
353 	if (t <= 0) {
354 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_expect_rx);
355 		goto expired;
356 	}
357 
358 	t = ktime_sub(call->expect_req_by, now);
359 	if (t <= 0) {
360 		call->expect_req_by = KTIME_MAX;
361 		if (__rxrpc_call_state(call) == RXRPC_CALL_SERVER_RECV_REQUEST) {
362 			trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_idle);
363 			goto expired;
364 		}
365 	}
366 
367 	t = ktime_sub(READ_ONCE(call->expect_term_by), now);
368 	if (t <= 0) {
369 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_hard);
370 		goto expired;
371 	}
372 
373 	t = ktime_sub(call->delay_ack_at, now);
374 	if (t <= 0) {
375 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_delayed_ack);
376 		call->delay_ack_at = KTIME_MAX;
377 		rxrpc_send_ACK(call, RXRPC_ACK_DELAY, 0,
378 			       rxrpc_propose_ack_delayed_ack);
379 	}
380 
381 	t = ktime_sub(call->ping_at, now);
382 	if (t <= 0) {
383 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_ping);
384 		call->ping_at = KTIME_MAX;
385 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
386 			       rxrpc_propose_ack_ping_for_keepalive);
387 	}
388 
389 	now = ktime_get_real();
390 	t = ktime_sub(call->keepalive_at, now);
391 	if (t <= 0) {
392 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_keepalive);
393 		call->keepalive_at = KTIME_MAX;
394 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
395 			       rxrpc_propose_ack_ping_for_keepalive);
396 	}
397 
398 	if (test_and_clear_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events))
399 		rxrpc_send_initial_ping(call);
400 
401 	rxrpc_transmit_some_data(call, UINT_MAX, rxrpc_txdata_new_data);
402 
403 	if (saw_ack)
404 		rxrpc_congestion_degrade(call);
405 
406 	if (did_receive &&
407 	    (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_SEND_REQUEST ||
408 	     __rxrpc_call_state(call) == RXRPC_CALL_SERVER_SEND_REPLY)) {
409 		t = ktime_sub(call->rack_timo_at, ktime_get_real());
410 		trace_rxrpc_rack(call, t);
411 	}
412 
413 	/* Process events */
414 	if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events))
415 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
416 			       rxrpc_propose_ack_ping_for_lost_ack);
417 
418 	if (call->tx_nr_lost > 0 &&
419 	    __rxrpc_call_state(call) != RXRPC_CALL_CLIENT_RECV_REPLY &&
420 	    !test_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags))
421 		rxrpc_resend(call);
422 
423 	if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
424 		rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
425 			       rxrpc_propose_ack_rx_idle);
426 
427 	if (call->ackr_nr_unacked > 2) {
428 		if (call->rtt_count < 3)
429 			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
430 				       rxrpc_propose_ack_ping_for_rtt);
431 		else if (ktime_before(ktime_add_ms(call->rtt_last_req, 1000),
432 				      ktime_get_real()))
433 			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
434 				       rxrpc_propose_ack_ping_for_old_rtt);
435 		else
436 			rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
437 				       rxrpc_propose_ack_input_data);
438 	}
439 
440 	/* Make sure the timer is restarted */
441 	if (!__rxrpc_call_is_complete(call)) {
442 		ktime_t next = READ_ONCE(call->expect_term_by), delay;
443 
444 #define set(T) { ktime_t _t = (T); if (ktime_before(_t, next)) next = _t; }
445 
446 		set(call->expect_req_by);
447 		set(call->expect_rx_by);
448 		set(call->delay_ack_at);
449 		set(call->rack_timo_at);
450 		set(call->keepalive_at);
451 		set(call->ping_at);
452 
453 		now = ktime_get_real();
454 		delay = ktime_sub(next, now);
455 		if (delay <= 0) {
456 			rxrpc_poke_call(call, rxrpc_call_poke_timer_now);
457 		} else {
458 			unsigned long nowj = jiffies, delayj, nextj;
459 
460 			delayj = umax(nsecs_to_jiffies(delay), 1);
461 			nextj = nowj + delayj;
462 			if (time_before(nextj, call->timer.expires) ||
463 			    !timer_pending(&call->timer)) {
464 				trace_rxrpc_timer_restart(call, delay, delayj);
465 				timer_reduce(&call->timer, nextj);
466 			}
467 		}
468 	}
469 
470 out:
471 	if (__rxrpc_call_is_complete(call)) {
472 		del_timer_sync(&call->timer);
473 		if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
474 			rxrpc_disconnect_call(call);
475 		if (call->security)
476 			call->security->free_call_crypto(call);
477 	} else {
478 		if (did_receive &&
479 		    call->peer->ackr_adv_pmtud &&
480 		    call->peer->pmtud_pending)
481 			rxrpc_send_probe_for_pmtud(call);
482 	}
483 	_leave("");
484 	return true;
485 
486 expired:
487 	if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) &&
488 	    (int)call->conn->hi_serial - (int)call->rx_serial > 0) {
489 		trace_rxrpc_call_reset(call);
490 		rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ECONNRESET,
491 				 rxrpc_abort_call_reset);
492 	} else {
493 		rxrpc_abort_call(call, 0, RX_CALL_TIMEOUT, -ETIME,
494 				 rxrpc_abort_call_timeout);
495 	}
496 	goto out;
497 }
498