xref: /linux/net/rxrpc/call_event.c (revision aa54b1d27fe0c2b78e664a34fd0fdf7cd1960d71)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/module.h>
11 #include <linux/circ_buf.h>
12 #include <linux/net.h>
13 #include <linux/skbuff.h>
14 #include <linux/slab.h>
15 #include <linux/udp.h>
16 #include <net/sock.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
19 
20 /*
21  * Propose a PING ACK be sent.
22  */
rxrpc_propose_ping(struct rxrpc_call * call,u32 serial,enum rxrpc_propose_ack_trace why)23 void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
24 			enum rxrpc_propose_ack_trace why)
25 {
26 	ktime_t delay = ms_to_ktime(READ_ONCE(rxrpc_idle_ack_delay));
27 	ktime_t now = ktime_get_real();
28 	ktime_t ping_at = ktime_add(now, delay);
29 
30 	trace_rxrpc_propose_ack(call, why, RXRPC_ACK_PING, serial);
31 	if (ktime_before(ping_at, call->ping_at)) {
32 		call->ping_at = ping_at;
33 		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_ping);
34 	}
35 }
36 
37 /*
38  * Propose a DELAY ACK be sent in the future.
39  */
rxrpc_propose_delay_ACK(struct rxrpc_call * call,rxrpc_serial_t serial,enum rxrpc_propose_ack_trace why)40 void rxrpc_propose_delay_ACK(struct rxrpc_call *call, rxrpc_serial_t serial,
41 			     enum rxrpc_propose_ack_trace why)
42 {
43 	ktime_t now = ktime_get_real(), delay;
44 
45 	trace_rxrpc_propose_ack(call, why, RXRPC_ACK_DELAY, serial);
46 
47 	if (call->srtt_us)
48 		delay = (call->srtt_us >> 3) * NSEC_PER_USEC;
49 	else
50 		delay = ms_to_ktime(READ_ONCE(rxrpc_soft_ack_delay));
51 	ktime_add_ms(delay, call->tx_backoff);
52 
53 	call->delay_ack_at = ktime_add(now, delay);
54 	trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_delayed_ack);
55 }
56 
57 /*
58  * Retransmit one or more packets.
59  */
rxrpc_retransmit_data(struct rxrpc_call * call,struct rxrpc_send_data_req * req)60 static bool rxrpc_retransmit_data(struct rxrpc_call *call,
61 				  struct rxrpc_send_data_req *req)
62 {
63 	struct rxrpc_txqueue *tq = req->tq;
64 	unsigned int ix = req->seq & RXRPC_TXQ_MASK;
65 	struct rxrpc_txbuf *txb = tq->bufs[ix];
66 
67 	_enter("%x,%x,%x,%x", tq->qbase, req->seq, ix, txb->debug_id);
68 
69 	req->retrans = true;
70 	trace_rxrpc_retransmit(call, req, txb);
71 
72 	txb->flags |= RXRPC_TXBUF_RESENT;
73 	rxrpc_send_data_packet(call, req);
74 	rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
75 
76 	req->tq		= NULL;
77 	req->n		= 0;
78 	req->did_send	= true;
79 	req->now	= ktime_get_real();
80 	return true;
81 }
82 
83 /*
84  * Perform retransmission of NAK'd and unack'd packets.
85  */
rxrpc_resend(struct rxrpc_call * call)86 static void rxrpc_resend(struct rxrpc_call *call)
87 {
88 	struct rxrpc_send_data_req req = {
89 		.now	= ktime_get_real(),
90 		.trace	= rxrpc_txdata_retransmit,
91 	};
92 	struct rxrpc_txqueue *tq;
93 
94 	_enter("{%d,%d}", call->tx_bottom, call->tx_top);
95 
96 	trace_rxrpc_resend(call, call->acks_highest_serial);
97 
98 	/* Scan the transmission queue, looking for lost packets. */
99 	for (tq = call->tx_queue; tq; tq = tq->next) {
100 		unsigned long lost = tq->segment_lost;
101 
102 		if (after(tq->qbase, call->tx_transmitted))
103 			break;
104 
105 		_debug("retr %16lx %u c=%08x [%x]",
106 		       tq->segment_acked, tq->nr_reported_acks, call->debug_id, tq->qbase);
107 		_debug("lost %16lx", lost);
108 
109 		trace_rxrpc_resend_lost(call, tq, lost);
110 		while (lost) {
111 			unsigned int ix = __ffs(lost);
112 			struct rxrpc_txbuf *txb = tq->bufs[ix];
113 
114 			__clear_bit(ix, &lost);
115 			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_lost);
116 
117 			req.tq  = tq;
118 			req.seq = tq->qbase + ix;
119 			req.n   = 1;
120 			rxrpc_retransmit_data(call, &req);
121 		}
122 	}
123 
124 	rxrpc_get_rto_backoff(call, req.did_send);
125 	_leave("");
126 }
127 
128 /*
129  * Resend the highest-seq DATA packet so far transmitted for RACK-TLP [RFC8985 7.3].
130  */
rxrpc_resend_tlp(struct rxrpc_call * call)131 void rxrpc_resend_tlp(struct rxrpc_call *call)
132 {
133 	struct rxrpc_send_data_req req = {
134 		.now		= ktime_get_real(),
135 		.seq		= call->tx_transmitted,
136 		.n		= 1,
137 		.tlp_probe	= true,
138 		.trace		= rxrpc_txdata_tlp_retransmit,
139 	};
140 
141 	/* There's a chance it'll be on the tail segment of the queue. */
142 	req.tq = READ_ONCE(call->tx_qtail);
143 	if (req.tq &&
144 	    before(call->tx_transmitted, req.tq->qbase + RXRPC_NR_TXQUEUE)) {
145 		rxrpc_retransmit_data(call, &req);
146 		return;
147 	}
148 
149 	for (req.tq = call->tx_queue; req.tq; req.tq = req.tq->next) {
150 		if (after_eq(call->tx_transmitted, req.tq->qbase) &&
151 		    before(call->tx_transmitted, req.tq->qbase + RXRPC_NR_TXQUEUE)) {
152 			rxrpc_retransmit_data(call, &req);
153 			return;
154 		}
155 	}
156 }
157 
158 /*
159  * Start transmitting the reply to a service.  This cancels the need to ACK the
160  * request if we haven't yet done so.
161  */
rxrpc_begin_service_reply(struct rxrpc_call * call)162 static void rxrpc_begin_service_reply(struct rxrpc_call *call)
163 {
164 	rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SEND_REPLY);
165 	if (call->ackr_reason == RXRPC_ACK_DELAY)
166 		call->ackr_reason = 0;
167 	call->delay_ack_at = KTIME_MAX;
168 	trace_rxrpc_timer_can(call, rxrpc_timer_trace_delayed_ack);
169 }
170 
171 /*
172  * Close the transmission phase.  After this point there is no more data to be
173  * transmitted in the call.
174  */
rxrpc_close_tx_phase(struct rxrpc_call * call)175 static void rxrpc_close_tx_phase(struct rxrpc_call *call)
176 {
177 	_debug("________awaiting reply/ACK__________");
178 
179 	switch (__rxrpc_call_state(call)) {
180 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
181 		rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY);
182 		break;
183 	case RXRPC_CALL_SERVER_SEND_REPLY:
184 		rxrpc_set_call_state(call, RXRPC_CALL_SERVER_AWAIT_ACK);
185 		break;
186 	default:
187 		break;
188 	}
189 }
190 
191 /*
192  * Transmit some as-yet untransmitted data, to a maximum of the supplied limit.
193  */
rxrpc_transmit_fresh_data(struct rxrpc_call * call,unsigned int limit,enum rxrpc_txdata_trace trace)194 static void rxrpc_transmit_fresh_data(struct rxrpc_call *call, unsigned int limit,
195 				      enum rxrpc_txdata_trace trace)
196 {
197 	int space = rxrpc_tx_window_space(call);
198 
199 	if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
200 		if (call->send_top == call->tx_top)
201 			return;
202 		rxrpc_expose_client_call(call);
203 	}
204 
205 	while (space > 0) {
206 		struct rxrpc_send_data_req req = {
207 			.now	= ktime_get_real(),
208 			.seq	= call->tx_transmitted + 1,
209 			.n	= 0,
210 			.trace	= trace,
211 		};
212 		struct rxrpc_txqueue *tq;
213 		struct rxrpc_txbuf *txb;
214 		rxrpc_seq_t send_top, seq;
215 		int limit = min(space, max(call->peer->pmtud_jumbo, 1));
216 
217 		/* Order send_top before the contents of the new txbufs and
218 		 * txqueue pointers
219 		 */
220 		send_top = smp_load_acquire(&call->send_top);
221 		if (call->tx_top == send_top)
222 			break;
223 
224 		trace_rxrpc_transmit(call, send_top, space);
225 
226 		tq = call->tx_qtail;
227 		seq = call->tx_top;
228 		trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant);
229 
230 		do {
231 			int ix;
232 
233 			seq++;
234 			ix = seq & RXRPC_TXQ_MASK;
235 			if (!ix) {
236 				tq = tq->next;
237 				trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant_advance);
238 			}
239 			if (!req.tq)
240 				req.tq = tq;
241 			txb = tq->bufs[ix];
242 			req.n++;
243 			if (!txb->jumboable)
244 				break;
245 		} while (req.n < limit && before(seq, send_top));
246 
247 		if (txb->flags & RXRPC_LAST_PACKET) {
248 			rxrpc_close_tx_phase(call);
249 			tq = NULL;
250 		}
251 		call->tx_qtail = tq;
252 		call->tx_top = seq;
253 
254 		space -= req.n;
255 		rxrpc_send_data_packet(call, &req);
256 	}
257 }
258 
rxrpc_transmit_some_data(struct rxrpc_call * call,unsigned int limit,enum rxrpc_txdata_trace trace)259 void rxrpc_transmit_some_data(struct rxrpc_call *call, unsigned int limit,
260 			      enum rxrpc_txdata_trace trace)
261 {
262 	switch (__rxrpc_call_state(call)) {
263 	case RXRPC_CALL_SERVER_ACK_REQUEST:
264 		if (call->tx_bottom == READ_ONCE(call->send_top))
265 			return;
266 		rxrpc_begin_service_reply(call);
267 		fallthrough;
268 
269 	case RXRPC_CALL_SERVER_SEND_REPLY:
270 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
271 		if (!rxrpc_tx_window_space(call))
272 			return;
273 		if (call->tx_bottom == READ_ONCE(call->send_top)) {
274 			rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow);
275 			return;
276 		}
277 		rxrpc_transmit_fresh_data(call, limit, trace);
278 		break;
279 	default:
280 		return;
281 	}
282 }
283 
284 /*
285  * Ping the other end to fill our RTT cache and to retrieve the rwind
286  * and MTU parameters.
287  */
rxrpc_send_initial_ping(struct rxrpc_call * call)288 static void rxrpc_send_initial_ping(struct rxrpc_call *call)
289 {
290 	if (call->rtt_count < 3 ||
291 	    ktime_before(ktime_add_ms(call->rtt_last_req, 1000),
292 			 ktime_get_real()))
293 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
294 			       rxrpc_propose_ack_ping_for_params);
295 }
296 
297 /*
298  * Handle retransmission and deferred ACK/abort generation.
299  */
rxrpc_input_call_event(struct rxrpc_call * call)300 bool rxrpc_input_call_event(struct rxrpc_call *call)
301 {
302 	struct sk_buff *skb;
303 	ktime_t now, t;
304 	bool did_receive = false, saw_ack = false;
305 	s32 abort_code;
306 
307 	rxrpc_see_call(call, rxrpc_call_see_input);
308 
309 	//printk("\n--------------------\n");
310 	_enter("{%d,%s,%lx}",
311 	       call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)],
312 	       call->events);
313 
314 	/* Handle abort request locklessly, vs rxrpc_propose_abort(). */
315 	abort_code = smp_load_acquire(&call->send_abort);
316 	if (abort_code) {
317 		rxrpc_abort_call(call, 0, call->send_abort, call->send_abort_err,
318 				 call->send_abort_why);
319 		goto out;
320 	}
321 
322 	do {
323 		skb = __skb_dequeue(&call->rx_queue);
324 		if (skb) {
325 			struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
326 
327 			if (__rxrpc_call_is_complete(call) ||
328 			    skb->mark == RXRPC_SKB_MARK_ERROR) {
329 				rxrpc_free_skb(skb, rxrpc_skb_put_call_rx);
330 				goto out;
331 			}
332 
333 			saw_ack |= sp->hdr.type == RXRPC_PACKET_TYPE_ACK;
334 
335 			if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
336 			    sp->hdr.securityIndex != 0 &&
337 			    (skb_cloned(skb) ||
338 			     skb_has_frag_list(skb) ||
339 			     skb_has_shared_frag(skb))) {
340 				/* Unshare the packet so that it can be
341 				 * modified by in-place decryption.
342 				 */
343 				struct sk_buff *nskb = skb_copy(skb, GFP_ATOMIC);
344 
345 				if (nskb) {
346 					rxrpc_new_skb(nskb, rxrpc_skb_new_unshared);
347 					rxrpc_input_call_packet(call, nskb);
348 					rxrpc_free_skb(nskb, rxrpc_skb_put_call_rx);
349 				} else {
350 					/* OOM - Drop the packet. */
351 					rxrpc_see_skb(skb, rxrpc_skb_see_unshare_nomem);
352 				}
353 			} else {
354 				rxrpc_input_call_packet(call, skb);
355 			}
356 			rxrpc_free_skb(skb, rxrpc_skb_put_call_rx);
357 			did_receive = true;
358 		}
359 
360 		t = ktime_sub(call->rack_timo_at, ktime_get_real());
361 		if (t <= 0) {
362 			trace_rxrpc_timer_exp(call, t,
363 					      rxrpc_timer_trace_rack_off + call->rack_timer_mode);
364 			call->rack_timo_at = KTIME_MAX;
365 			rxrpc_rack_timer_expired(call, t);
366 		}
367 
368 	} while (!skb_queue_empty(&call->rx_queue));
369 
370 	/* If we see our async-event poke, check for timeout trippage. */
371 	now = ktime_get_real();
372 	t = ktime_sub(call->expect_rx_by, now);
373 	if (t <= 0) {
374 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_expect_rx);
375 		goto expired;
376 	}
377 
378 	t = ktime_sub(call->expect_req_by, now);
379 	if (t <= 0) {
380 		call->expect_req_by = KTIME_MAX;
381 		if (__rxrpc_call_state(call) == RXRPC_CALL_SERVER_RECV_REQUEST) {
382 			trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_idle);
383 			goto expired;
384 		}
385 	}
386 
387 	t = ktime_sub(READ_ONCE(call->expect_term_by), now);
388 	if (t <= 0) {
389 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_hard);
390 		goto expired;
391 	}
392 
393 	t = ktime_sub(call->delay_ack_at, now);
394 	if (t <= 0) {
395 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_delayed_ack);
396 		call->delay_ack_at = KTIME_MAX;
397 		rxrpc_send_ACK(call, RXRPC_ACK_DELAY, 0,
398 			       rxrpc_propose_ack_delayed_ack);
399 	}
400 
401 	t = ktime_sub(call->ping_at, now);
402 	if (t <= 0) {
403 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_ping);
404 		call->ping_at = KTIME_MAX;
405 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
406 			       rxrpc_propose_ack_ping_for_keepalive);
407 	}
408 
409 	now = ktime_get_real();
410 	t = ktime_sub(call->keepalive_at, now);
411 	if (t <= 0) {
412 		trace_rxrpc_timer_exp(call, t, rxrpc_timer_trace_keepalive);
413 		call->keepalive_at = KTIME_MAX;
414 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
415 			       rxrpc_propose_ack_ping_for_keepalive);
416 	}
417 
418 	if (test_and_clear_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events))
419 		rxrpc_send_initial_ping(call);
420 
421 	rxrpc_transmit_some_data(call, UINT_MAX, rxrpc_txdata_new_data);
422 
423 	if (saw_ack)
424 		rxrpc_congestion_degrade(call);
425 
426 	if (did_receive &&
427 	    (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_SEND_REQUEST ||
428 	     __rxrpc_call_state(call) == RXRPC_CALL_SERVER_SEND_REPLY)) {
429 		t = ktime_sub(call->rack_timo_at, ktime_get_real());
430 		trace_rxrpc_rack(call, t);
431 	}
432 
433 	/* Process events */
434 	if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events))
435 		rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
436 			       rxrpc_propose_ack_ping_for_lost_ack);
437 
438 	if (call->tx_nr_lost > 0 &&
439 	    __rxrpc_call_state(call) != RXRPC_CALL_CLIENT_RECV_REPLY &&
440 	    !test_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags))
441 		rxrpc_resend(call);
442 
443 	if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
444 		rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
445 			       rxrpc_propose_ack_rx_idle);
446 
447 	if (call->ackr_nr_unacked > 2) {
448 		if (call->rtt_count < 3)
449 			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
450 				       rxrpc_propose_ack_ping_for_rtt);
451 		else if (ktime_before(ktime_add_ms(call->rtt_last_req, 1000),
452 				      ktime_get_real()))
453 			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
454 				       rxrpc_propose_ack_ping_for_old_rtt);
455 		else
456 			rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
457 				       rxrpc_propose_ack_input_data);
458 	}
459 
460 	/* Make sure the timer is restarted */
461 	if (!__rxrpc_call_is_complete(call)) {
462 		ktime_t next = READ_ONCE(call->expect_term_by), delay;
463 
464 #define set(T) { ktime_t _t = (T); if (ktime_before(_t, next)) next = _t; }
465 
466 		set(call->expect_req_by);
467 		set(call->expect_rx_by);
468 		set(call->delay_ack_at);
469 		set(call->rack_timo_at);
470 		set(call->keepalive_at);
471 		set(call->ping_at);
472 
473 		now = ktime_get_real();
474 		delay = ktime_sub(next, now);
475 		if (delay <= 0) {
476 			rxrpc_poke_call(call, rxrpc_call_poke_timer_now);
477 		} else {
478 			unsigned long nowj = jiffies, delayj, nextj;
479 
480 			delayj = umax(nsecs_to_jiffies(delay), 1);
481 			nextj = nowj + delayj;
482 			if (time_before(nextj, call->timer.expires) ||
483 			    !timer_pending(&call->timer)) {
484 				trace_rxrpc_timer_restart(call, delay, delayj);
485 				timer_reduce(&call->timer, nextj);
486 			}
487 		}
488 	}
489 
490 out:
491 	if (__rxrpc_call_is_complete(call)) {
492 		timer_delete_sync(&call->timer);
493 		if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
494 			rxrpc_disconnect_call(call);
495 		if (call->security)
496 			call->security->free_call_crypto(call);
497 	} else {
498 		if (did_receive &&
499 		    call->peer->ackr_adv_pmtud &&
500 		    call->peer->pmtud_pending)
501 			rxrpc_send_probe_for_pmtud(call);
502 	}
503 	_leave("");
504 	return true;
505 
506 expired:
507 	if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) &&
508 	    (int)call->conn->hi_serial - (int)call->rx_serial > 0) {
509 		trace_rxrpc_call_reset(call);
510 		rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ECONNRESET,
511 				 rxrpc_abort_call_reset);
512 	} else {
513 		rxrpc_abort_call(call, 0, RX_CALL_TIMEOUT, -ETIME,
514 				 rxrpc_abort_call_timeout);
515 	}
516 	goto out;
517 }
518