xref: /linux/net/rxrpc/io_thread.c (revision e65e175b07bef5974045cc42238de99057669ca7)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC packet reception
3  *
4  * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include "ar-internal.h"
11 
12 static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
13 				      struct sockaddr_rxrpc *peer_srx,
14 				      struct sk_buff *skb);
15 
16 /*
17  * handle data received on the local endpoint
18  * - may be called in interrupt context
19  *
20  * [!] Note that as this is called from the encap_rcv hook, the socket is not
21  * held locked by the caller and nothing prevents sk_user_data on the UDP from
22  * being cleared in the middle of processing this function.
23  *
24  * Called with the RCU read lock held from the IP layer via UDP.
25  */
26 int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
27 {
28 	struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
29 
30 	if (unlikely(!local)) {
31 		kfree_skb(skb);
32 		return 0;
33 	}
34 	if (skb->tstamp == 0)
35 		skb->tstamp = ktime_get_real();
36 
37 	skb->mark = RXRPC_SKB_MARK_PACKET;
38 	rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
39 	skb_queue_tail(&local->rx_queue, skb);
40 	rxrpc_wake_up_io_thread(local);
41 	return 0;
42 }
43 
44 /*
45  * Handle an error received on the local endpoint.
46  */
47 void rxrpc_error_report(struct sock *sk)
48 {
49 	struct rxrpc_local *local;
50 	struct sk_buff *skb;
51 
52 	rcu_read_lock();
53 	local = rcu_dereference_sk_user_data(sk);
54 	if (unlikely(!local)) {
55 		rcu_read_unlock();
56 		return;
57 	}
58 
59 	while ((skb = skb_dequeue(&sk->sk_error_queue))) {
60 		skb->mark = RXRPC_SKB_MARK_ERROR;
61 		rxrpc_new_skb(skb, rxrpc_skb_new_error_report);
62 		skb_queue_tail(&local->rx_queue, skb);
63 	}
64 
65 	rxrpc_wake_up_io_thread(local);
66 	rcu_read_unlock();
67 }
68 
69 /*
70  * Process event packets targeted at a local endpoint.
71  */
72 static void rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb)
73 {
74 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
75 	char v;
76 
77 	_enter("");
78 
79 	rxrpc_see_skb(skb, rxrpc_skb_see_version);
80 	if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) {
81 		if (v == 0)
82 			rxrpc_send_version_request(local, &sp->hdr, skb);
83 	}
84 }
85 
86 /*
87  * Extract the wire header from a packet and translate the byte order.
88  */
89 static noinline
90 int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
91 {
92 	struct rxrpc_wire_header whdr;
93 
94 	/* dig out the RxRPC connection details */
95 	if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) {
96 		trace_rxrpc_rx_eproto(NULL, sp->hdr.serial,
97 				      tracepoint_string("bad_hdr"));
98 		return -EBADMSG;
99 	}
100 
101 	memset(sp, 0, sizeof(*sp));
102 	sp->hdr.epoch		= ntohl(whdr.epoch);
103 	sp->hdr.cid		= ntohl(whdr.cid);
104 	sp->hdr.callNumber	= ntohl(whdr.callNumber);
105 	sp->hdr.seq		= ntohl(whdr.seq);
106 	sp->hdr.serial		= ntohl(whdr.serial);
107 	sp->hdr.flags		= whdr.flags;
108 	sp->hdr.type		= whdr.type;
109 	sp->hdr.userStatus	= whdr.userStatus;
110 	sp->hdr.securityIndex	= whdr.securityIndex;
111 	sp->hdr._rsvd		= ntohs(whdr._rsvd);
112 	sp->hdr.serviceId	= ntohs(whdr.serviceId);
113 	return 0;
114 }
115 
116 /*
117  * Extract the abort code from an ABORT packet and stash it in skb->priority.
118  */
119 static bool rxrpc_extract_abort(struct sk_buff *skb)
120 {
121 	__be32 wtmp;
122 
123 	if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
124 			  &wtmp, sizeof(wtmp)) < 0)
125 		return false;
126 	skb->priority = ntohl(wtmp);
127 	return true;
128 }
129 
130 /*
131  * Process packets received on the local endpoint
132  */
133 static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
134 {
135 	struct rxrpc_connection *conn;
136 	struct sockaddr_rxrpc peer_srx;
137 	struct rxrpc_skb_priv *sp;
138 	struct rxrpc_peer *peer = NULL;
139 	struct sk_buff *skb = *_skb;
140 	int ret = 0;
141 
142 	skb_pull(skb, sizeof(struct udphdr));
143 
144 	sp = rxrpc_skb(skb);
145 
146 	/* dig out the RxRPC connection details */
147 	if (rxrpc_extract_header(sp, skb) < 0)
148 		goto bad_message;
149 
150 	if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
151 		static int lose;
152 		if ((lose++ & 7) == 7) {
153 			trace_rxrpc_rx_lose(sp);
154 			return 0;
155 		}
156 	}
157 
158 	trace_rxrpc_rx_packet(sp);
159 
160 	switch (sp->hdr.type) {
161 	case RXRPC_PACKET_TYPE_VERSION:
162 		if (rxrpc_to_client(sp))
163 			return 0;
164 		rxrpc_input_version(local, skb);
165 		return 0;
166 
167 	case RXRPC_PACKET_TYPE_BUSY:
168 		if (rxrpc_to_server(sp))
169 			return 0;
170 		fallthrough;
171 	case RXRPC_PACKET_TYPE_ACK:
172 	case RXRPC_PACKET_TYPE_ACKALL:
173 		if (sp->hdr.callNumber == 0)
174 			goto bad_message;
175 		break;
176 	case RXRPC_PACKET_TYPE_ABORT:
177 		if (!rxrpc_extract_abort(skb))
178 			return 0; /* Just discard if malformed */
179 		break;
180 
181 	case RXRPC_PACKET_TYPE_DATA:
182 		if (sp->hdr.callNumber == 0 ||
183 		    sp->hdr.seq == 0)
184 			goto bad_message;
185 
186 		/* Unshare the packet so that it can be modified for in-place
187 		 * decryption.
188 		 */
189 		if (sp->hdr.securityIndex != 0) {
190 			skb = skb_unshare(skb, GFP_ATOMIC);
191 			if (!skb) {
192 				rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare_nomem);
193 				*_skb = NULL;
194 				return 0;
195 			}
196 
197 			if (skb != *_skb) {
198 				rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare);
199 				*_skb = skb;
200 				rxrpc_new_skb(skb, rxrpc_skb_new_unshared);
201 				sp = rxrpc_skb(skb);
202 			}
203 		}
204 		break;
205 
206 	case RXRPC_PACKET_TYPE_CHALLENGE:
207 		if (rxrpc_to_server(sp))
208 			return 0;
209 		break;
210 	case RXRPC_PACKET_TYPE_RESPONSE:
211 		if (rxrpc_to_client(sp))
212 			return 0;
213 		break;
214 
215 		/* Packet types 9-11 should just be ignored. */
216 	case RXRPC_PACKET_TYPE_PARAMS:
217 	case RXRPC_PACKET_TYPE_10:
218 	case RXRPC_PACKET_TYPE_11:
219 		return 0;
220 
221 	default:
222 		goto bad_message;
223 	}
224 
225 	if (sp->hdr.serviceId == 0)
226 		goto bad_message;
227 
228 	if (WARN_ON_ONCE(rxrpc_extract_addr_from_skb(&peer_srx, skb) < 0))
229 		return true; /* Unsupported address type - discard. */
230 
231 	if (peer_srx.transport.family != local->srx.transport.family &&
232 	    (peer_srx.transport.family == AF_INET &&
233 	     local->srx.transport.family != AF_INET6)) {
234 		pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n",
235 				    peer_srx.transport.family,
236 				    local->srx.transport.family);
237 		return true; /* Wrong address type - discard. */
238 	}
239 
240 	if (rxrpc_to_client(sp)) {
241 		rcu_read_lock();
242 		conn = rxrpc_find_client_connection_rcu(local, &peer_srx, skb);
243 		conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
244 		rcu_read_unlock();
245 		if (!conn) {
246 			trace_rxrpc_abort(0, "NCC", sp->hdr.cid,
247 					  sp->hdr.callNumber, sp->hdr.seq,
248 					  RXKADINCONSISTENCY, EBADMSG);
249 			goto protocol_error;
250 		}
251 
252 		ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
253 		rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
254 		return ret;
255 	}
256 
257 	/* We need to look up service connections by the full protocol
258 	 * parameter set.  We look up the peer first as an intermediate step
259 	 * and then the connection from the peer's tree.
260 	 */
261 	rcu_read_lock();
262 
263 	peer = rxrpc_lookup_peer_rcu(local, &peer_srx);
264 	if (!peer) {
265 		rcu_read_unlock();
266 		return rxrpc_new_incoming_call(local, NULL, NULL, &peer_srx, skb);
267 	}
268 
269 	conn = rxrpc_find_service_conn_rcu(peer, skb);
270 	conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
271 	if (conn) {
272 		rcu_read_unlock();
273 		ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
274 		rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
275 		return ret;
276 	}
277 
278 	peer = rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input);
279 	rcu_read_unlock();
280 
281 	ret = rxrpc_new_incoming_call(local, peer, NULL, &peer_srx, skb);
282 	rxrpc_put_peer(peer, rxrpc_peer_put_input);
283 	if (ret < 0)
284 		goto reject_packet;
285 	return 0;
286 
287 bad_message:
288 	trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
289 			  RX_PROTOCOL_ERROR, EBADMSG);
290 protocol_error:
291 	skb->priority = RX_PROTOCOL_ERROR;
292 	skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
293 reject_packet:
294 	rxrpc_reject_packet(local, skb);
295 	return 0;
296 }
297 
298 /*
299  * Deal with a packet that's associated with an extant connection.
300  */
301 static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
302 				      struct sockaddr_rxrpc *peer_srx,
303 				      struct sk_buff *skb)
304 {
305 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
306 	struct rxrpc_channel *chan;
307 	struct rxrpc_call *call = NULL;
308 	unsigned int channel;
309 
310 	if (sp->hdr.securityIndex != conn->security_ix)
311 		goto wrong_security;
312 
313 	if (sp->hdr.serviceId != conn->service_id) {
314 		int old_id;
315 
316 		if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
317 			goto reupgrade;
318 		old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
319 				 sp->hdr.serviceId);
320 
321 		if (old_id != conn->orig_service_id &&
322 		    old_id != sp->hdr.serviceId)
323 			goto reupgrade;
324 	}
325 
326 	if (after(sp->hdr.serial, conn->hi_serial))
327 		conn->hi_serial = sp->hdr.serial;
328 
329 	/* It's a connection-level packet if the call number is 0. */
330 	if (sp->hdr.callNumber == 0)
331 		return rxrpc_input_conn_packet(conn, skb);
332 
333 	/* Call-bound packets are routed by connection channel. */
334 	channel = sp->hdr.cid & RXRPC_CHANNELMASK;
335 	chan = &conn->channels[channel];
336 
337 	/* Ignore really old calls */
338 	if (sp->hdr.callNumber < chan->last_call)
339 		return 0;
340 
341 	if (sp->hdr.callNumber == chan->last_call) {
342 		if (chan->call ||
343 		    sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
344 			return 0;
345 
346 		/* For the previous service call, if completed successfully, we
347 		 * discard all further packets.
348 		 */
349 		if (rxrpc_conn_is_service(conn) &&
350 		    chan->last_type == RXRPC_PACKET_TYPE_ACK)
351 			return 0;
352 
353 		/* But otherwise we need to retransmit the final packet from
354 		 * data cached in the connection record.
355 		 */
356 		if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
357 			trace_rxrpc_rx_data(chan->call_debug_id,
358 					    sp->hdr.seq,
359 					    sp->hdr.serial,
360 					    sp->hdr.flags);
361 		rxrpc_input_conn_packet(conn, skb);
362 		return 0;
363 	}
364 
365 	rcu_read_lock();
366 	call = rxrpc_try_get_call(rcu_dereference(chan->call),
367 				  rxrpc_call_get_input);
368 	rcu_read_unlock();
369 
370 	if (sp->hdr.callNumber > chan->call_id) {
371 		if (rxrpc_to_client(sp)) {
372 			rxrpc_put_call(call, rxrpc_call_put_input);
373 			goto reject_packet;
374 		}
375 
376 		if (call) {
377 			rxrpc_implicit_end_call(call, skb);
378 			rxrpc_put_call(call, rxrpc_call_put_input);
379 			call = NULL;
380 		}
381 	}
382 
383 	if (!call) {
384 		if (rxrpc_to_client(sp))
385 			goto bad_message;
386 		if (rxrpc_new_incoming_call(conn->local, conn->peer, conn,
387 					    peer_srx, skb) == 0)
388 			return 0;
389 		goto reject_packet;
390 	}
391 
392 	rxrpc_input_call_event(call, skb);
393 	rxrpc_put_call(call, rxrpc_call_put_input);
394 	return 0;
395 
396 wrong_security:
397 	trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
398 			  RXKADINCONSISTENCY, EBADMSG);
399 	skb->priority = RXKADINCONSISTENCY;
400 	goto post_abort;
401 
402 reupgrade:
403 	trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
404 			  RX_PROTOCOL_ERROR, EBADMSG);
405 	goto protocol_error;
406 
407 bad_message:
408 	trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
409 			  RX_PROTOCOL_ERROR, EBADMSG);
410 protocol_error:
411 	skb->priority = RX_PROTOCOL_ERROR;
412 post_abort:
413 	skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
414 reject_packet:
415 	rxrpc_reject_packet(conn->local, skb);
416 	return 0;
417 }
418 
419 /*
420  * I/O and event handling thread.
421  */
422 int rxrpc_io_thread(void *data)
423 {
424 	struct sk_buff_head rx_queue;
425 	struct rxrpc_local *local = data;
426 	struct rxrpc_call *call;
427 	struct sk_buff *skb;
428 	bool should_stop;
429 
430 	complete(&local->io_thread_ready);
431 
432 	skb_queue_head_init(&rx_queue);
433 
434 	set_user_nice(current, MIN_NICE);
435 
436 	for (;;) {
437 		rxrpc_inc_stat(local->rxnet, stat_io_loop);
438 
439 		/* Deal with calls that want immediate attention. */
440 		if ((call = list_first_entry_or_null(&local->call_attend_q,
441 						     struct rxrpc_call,
442 						     attend_link))) {
443 			spin_lock_bh(&local->lock);
444 			list_del_init(&call->attend_link);
445 			spin_unlock_bh(&local->lock);
446 
447 			trace_rxrpc_call_poked(call);
448 			rxrpc_input_call_event(call, NULL);
449 			rxrpc_put_call(call, rxrpc_call_put_poke);
450 			continue;
451 		}
452 
453 		/* Process received packets and errors. */
454 		if ((skb = __skb_dequeue(&rx_queue))) {
455 			switch (skb->mark) {
456 			case RXRPC_SKB_MARK_PACKET:
457 				skb->priority = 0;
458 				rxrpc_input_packet(local, &skb);
459 				trace_rxrpc_rx_done(skb->mark, skb->priority);
460 				rxrpc_free_skb(skb, rxrpc_skb_put_input);
461 				break;
462 			case RXRPC_SKB_MARK_ERROR:
463 				rxrpc_input_error(local, skb);
464 				rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
465 				break;
466 			default:
467 				WARN_ON_ONCE(1);
468 				rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
469 				break;
470 			}
471 			continue;
472 		}
473 
474 		if (!skb_queue_empty(&local->rx_queue)) {
475 			spin_lock_irq(&local->rx_queue.lock);
476 			skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
477 			spin_unlock_irq(&local->rx_queue.lock);
478 			continue;
479 		}
480 
481 		set_current_state(TASK_INTERRUPTIBLE);
482 		should_stop = kthread_should_stop();
483 		if (!skb_queue_empty(&local->rx_queue) ||
484 		    !list_empty(&local->call_attend_q)) {
485 			__set_current_state(TASK_RUNNING);
486 			continue;
487 		}
488 
489 		if (should_stop)
490 			break;
491 		schedule();
492 	}
493 
494 	__set_current_state(TASK_RUNNING);
495 	rxrpc_see_local(local, rxrpc_local_stop);
496 	rxrpc_destroy_local(local);
497 	local->io_thread = NULL;
498 	rxrpc_see_local(local, rxrpc_local_stopped);
499 	return 0;
500 }
501