xref: /linux/net/rxrpc/recvmsg.c (revision 805185b7c7a1069e407b6f7b3bc98e44d415f484)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14 
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18 
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25 	struct rxrpc_sock *rx;
26 	struct sock *sk;
27 
28 	_enter("%d", call->debug_id);
29 
30 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
31 		rxrpc_see_call(call, rxrpc_call_see_notify_released);
32 		return;
33 	}
34 
35 	rcu_read_lock();
36 
37 	rx = rcu_dereference(call->socket);
38 	sk = &rx->sk;
39 	if (rx && sk->sk_state < RXRPC_CLOSE) {
40 		if (call->notify_rx) {
41 			spin_lock_irq(&call->notify_lock);
42 			call->notify_rx(sk, call, call->user_call_ID);
43 			spin_unlock_irq(&call->notify_lock);
44 		} else {
45 			spin_lock_irq(&rx->recvmsg_lock);
46 			if (list_empty(&call->recvmsg_link)) {
47 				rxrpc_get_call(call, rxrpc_call_get_notify_socket);
48 				list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
49 			}
50 			spin_unlock_irq(&rx->recvmsg_lock);
51 
52 			if (!sock_flag(sk, SOCK_DEAD)) {
53 				_debug("call %ps", sk->sk_data_ready);
54 				sk->sk_data_ready(sk);
55 			}
56 		}
57 	}
58 
59 	rcu_read_unlock();
60 	_leave("");
61 }
62 
63 /*
64  * Pass a call terminating message to userspace.
65  */
66 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
67 {
68 	u32 tmp = 0;
69 	int ret;
70 
71 	switch (call->completion) {
72 	case RXRPC_CALL_SUCCEEDED:
73 		ret = 0;
74 		if (rxrpc_is_service_call(call))
75 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
76 		break;
77 	case RXRPC_CALL_REMOTELY_ABORTED:
78 		tmp = call->abort_code;
79 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
80 		break;
81 	case RXRPC_CALL_LOCALLY_ABORTED:
82 		tmp = call->abort_code;
83 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
84 		break;
85 	case RXRPC_CALL_NETWORK_ERROR:
86 		tmp = -call->error;
87 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
88 		break;
89 	case RXRPC_CALL_LOCAL_ERROR:
90 		tmp = -call->error;
91 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
92 		break;
93 	default:
94 		pr_err("Invalid terminal call state %u\n", call->completion);
95 		BUG();
96 		break;
97 	}
98 
99 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
100 			     call->ackr_window - 1,
101 			     call->rx_pkt_offset, call->rx_pkt_len, ret);
102 	return ret;
103 }
104 
105 /*
106  * Discard a packet we've used up and advance the Rx window by one.
107  */
108 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
109 {
110 	struct rxrpc_skb_priv *sp;
111 	struct sk_buff *skb;
112 	rxrpc_serial_t serial;
113 	rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
114 	bool last;
115 	int acked;
116 
117 	_enter("%d", call->debug_id);
118 
119 	skb = skb_dequeue(&call->recvmsg_queue);
120 	rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
121 
122 	sp = rxrpc_skb(skb);
123 	tseq   = sp->hdr.seq;
124 	serial = sp->hdr.serial;
125 	last   = sp->hdr.flags & RXRPC_LAST_PACKET;
126 
127 	/* Barrier against rxrpc_input_data(). */
128 	if (after(tseq, call->rx_consumed))
129 		smp_store_release(&call->rx_consumed, tseq);
130 
131 	rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
132 
133 	trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
134 			    serial, call->rx_consumed);
135 
136 	if (last)
137 		set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
138 
139 	/* Check to see if there's an ACK that needs sending. */
140 	acked = atomic_add_return(call->rx_consumed - old_consumed,
141 				  &call->ackr_nr_consumed);
142 	if (acked > 8 &&
143 	    !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
144 		rxrpc_poke_call(call, rxrpc_call_poke_idle);
145 }
146 
147 /*
148  * Decrypt and verify a DATA packet.  The content of the packet is pulled out
149  * into a flat buffer rather than decrypting in place in the skbuff.  This also
150  * has the advantage of aligning the buffer correctly for the crypto routines.
151  *
152  * We keep track of the sequence number of the packet currently decrypted into
153  * the buffer in ->rx_dec_seq.  If MSG_PEEK is used and steps onto a new
154  * packet, subsequent recvmsg() calls will have to go back and re-decrypt the
155  * current packet.
156  */
157 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
158 {
159 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
160 	int ret;
161 
162 	if (sp->len > call->rx_dec_bsize || !call->rx_dec_buffer) {
163 		/* Make sure we can hold a 1412-byte jumbo subpacket and make
164 		 * sure that the buffer size is aligned to a crypto blocksize.
165 		 */
166 		size_t size = clamp(round_up(sp->len, 32), 2048, 65535);
167 		void *buffer = krealloc(call->rx_dec_buffer, size, GFP_NOFS);
168 
169 		if (!buffer)
170 			return -ENOMEM;
171 		call->rx_dec_buffer = buffer;
172 		call->rx_dec_bsize = size;
173 	}
174 
175 	ret = -EFAULT;
176 	if (skb_copy_bits(skb, sp->offset, call->rx_dec_buffer, sp->len) < 0)
177 		goto err;
178 
179 	call->rx_dec_offset = 0;
180 	call->rx_dec_len = sp->len;
181 	call->rx_dec_seq = sp->hdr.seq;
182 	ret = call->security->verify_packet(call, skb);
183 	if (ret < 0)
184 		goto err;
185 	return 0;
186 
187 err:
188 	kfree(call->rx_dec_buffer);
189 	call->rx_dec_buffer = NULL;
190 	call->rx_dec_bsize = 0;
191 	call->rx_dec_offset = 0;
192 	call->rx_dec_len = 0;
193 	return ret;
194 }
195 
196 /*
197  * Transcribe a call's user ID to a control message.
198  */
199 static int rxrpc_recvmsg_user_id(struct rxrpc_call *call, struct msghdr *msg,
200 				 int flags)
201 {
202 	if (!test_bit(RXRPC_CALL_HAS_USERID, &call->flags))
203 		return 0;
204 
205 	if (flags & MSG_CMSG_COMPAT) {
206 		unsigned int id32 = call->user_call_ID;
207 
208 		return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
209 				sizeof(unsigned int), &id32);
210 	} else {
211 		unsigned long idl = call->user_call_ID;
212 
213 		return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
214 				sizeof(unsigned long), &idl);
215 	}
216 }
217 
218 /*
219  * Deal with a CHALLENGE packet.
220  */
221 static int rxrpc_recvmsg_challenge(struct socket *sock, struct msghdr *msg,
222 				   struct sk_buff *challenge, unsigned int flags)
223 {
224 	struct rxrpc_skb_priv *sp = rxrpc_skb(challenge);
225 	struct rxrpc_connection *conn = sp->chall.conn;
226 
227 	return conn->security->challenge_to_recvmsg(conn, challenge, msg);
228 }
229 
230 /*
231  * Process OOB packets.  Called with the socket locked.
232  */
233 static int rxrpc_recvmsg_oob(struct socket *sock, struct msghdr *msg,
234 			     unsigned int flags)
235 {
236 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
237 	struct sk_buff *skb;
238 	bool need_response = false;
239 	int ret;
240 
241 	skb = skb_peek(&rx->recvmsg_oobq);
242 	if (!skb)
243 		return -EAGAIN;
244 	rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
245 
246 	ret = put_cmsg(msg, SOL_RXRPC, RXRPC_OOB_ID, sizeof(u64),
247 		       &skb->skb_mstamp_ns);
248 	if (ret < 0)
249 		return ret;
250 
251 	switch ((enum rxrpc_oob_type)skb->mark) {
252 	case RXRPC_OOB_CHALLENGE:
253 		need_response = true;
254 		ret = rxrpc_recvmsg_challenge(sock, msg, skb, flags);
255 		break;
256 	default:
257 		WARN_ONCE(1, "recvmsg() can't process unknown OOB type %u\n",
258 			  skb->mark);
259 		ret = -EIO;
260 		break;
261 	}
262 
263 	if (!(flags & MSG_PEEK)) {
264 		skb_unlink(skb, &rx->recvmsg_oobq);
265 		if (need_response)
266 			rxrpc_add_pending_oob(rx, skb);
267 		else
268 			rxrpc_free_skb(skb, rxrpc_skb_put_oob);
269 	}
270 	return ret;
271 }
272 
273 /*
274  * Deliver messages to a call.  This keeps processing packets until the buffer
275  * is filled and we find either more DATA (returns 0) or the end of the DATA
276  * (returns 1).  If more packets are required, it returns -EAGAIN and if the
277  * call has failed it returns -EIO.
278  */
279 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
280 			      struct msghdr *msg, struct iov_iter *iter,
281 			      size_t len, int flags, size_t *_offset)
282 {
283 	struct rxrpc_skb_priv *sp;
284 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
285 	struct sk_buff *skb;
286 	rxrpc_seq_t seq = 0;
287 	size_t remain;
288 	unsigned int rx_pkt_offset, rx_pkt_len;
289 	int copy, ret = -EAGAIN, ret2;
290 
291 	rx_pkt_offset = call->rx_pkt_offset;
292 	rx_pkt_len = call->rx_pkt_len;
293 
294 	if (rxrpc_call_has_failed(call)) {
295 		seq = call->ackr_window - 1;
296 		ret = -EIO;
297 		goto done;
298 	}
299 
300 	if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
301 		seq = call->ackr_window - 1;
302 		ret = 1;
303 		goto done;
304 	}
305 
306 	/* No one else can be removing stuff from the queue, so we shouldn't
307 	 * need the Rx lock to walk it.
308 	 */
309 	skb = skb_peek(&call->recvmsg_queue);
310 	while (skb) {
311 		rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
312 		sp = rxrpc_skb(skb);
313 		seq = sp->hdr.seq;
314 
315 		if (!(flags & MSG_PEEK))
316 			trace_rxrpc_receive(call, rxrpc_receive_front,
317 					    sp->hdr.serial, seq);
318 
319 		if (msg)
320 			sock_recv_timestamp(msg, sock->sk, skb);
321 
322 		if (call->rx_dec_seq != sp->hdr.seq ||
323 		    !call->rx_dec_buffer) {
324 			ret2 = rxrpc_verify_data(call, skb);
325 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
326 					     call->rx_dec_offset,
327 					     call->rx_dec_len, ret2);
328 			if (ret2 < 0) {
329 				ret = ret2;
330 				goto out;
331 			}
332 		}
333 
334 		if (rx_pkt_offset == USHRT_MAX) {
335 			rx_pkt_offset = call->rx_dec_offset;
336 			rx_pkt_len = call->rx_dec_len;
337 		} else {
338 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
339 					     rx_pkt_offset, rx_pkt_len, 0);
340 		}
341 
342 		/* We have to handle short, empty and used-up DATA packets. */
343 		remain = len - *_offset;
344 		copy = rx_pkt_len;
345 		if (copy > remain)
346 			copy = remain;
347 		if (copy > 0) {
348 			ret2 = copy_to_iter(call->rx_dec_buffer + rx_pkt_offset,
349 					    copy, iter);
350 			if (ret2 != copy) {
351 				ret = -EFAULT;
352 				goto out;
353 			}
354 
355 			/* handle piecemeal consumption of data packets */
356 			rx_pkt_offset += copy;
357 			rx_pkt_len -= copy;
358 			*_offset += copy;
359 		}
360 
361 		if (rx_pkt_len > 0) {
362 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
363 					     rx_pkt_offset, rx_pkt_len, 0);
364 			ASSERTCMP(*_offset, ==, len);
365 			ret = 0;
366 			break;
367 		}
368 
369 		/* The whole packet has been transferred. */
370 		if (sp->hdr.flags & RXRPC_LAST_PACKET)
371 			ret = 1;
372 		rx_pkt_offset = USHRT_MAX;
373 		rx_pkt_len = 0;
374 
375 		skb = skb_peek_next(skb, &call->recvmsg_queue);
376 
377 		if (!(flags & MSG_PEEK))
378 			rxrpc_rotate_rx_window(call);
379 
380 		if (!rx->app_ops &&
381 		    !skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
382 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_oobq, seq,
383 					     rx_pkt_offset, rx_pkt_len, ret);
384 			break;
385 		}
386 	}
387 
388 out:
389 	if (!(flags & MSG_PEEK)) {
390 		call->rx_pkt_offset = rx_pkt_offset;
391 		call->rx_pkt_len = rx_pkt_len;
392 	}
393 
394 done:
395 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
396 			     rx_pkt_offset, rx_pkt_len, ret);
397 	if (ret == -EAGAIN)
398 		set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
399 	return ret;
400 }
401 
402 /*
403  * Receive a message from an RxRPC socket
404  * - we need to be careful about two or more threads calling recvmsg
405  *   simultaneously
406  */
407 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
408 		  int flags)
409 {
410 	struct rxrpc_call *call;
411 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
412 	struct list_head *l;
413 	unsigned int call_debug_id = 0;
414 	size_t copied = 0;
415 	long timeo;
416 	int ret;
417 
418 	DEFINE_WAIT(wait);
419 
420 	trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
421 
422 	if (flags & (MSG_OOB | MSG_TRUNC))
423 		return -EOPNOTSUPP;
424 
425 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
426 
427 try_again:
428 	lock_sock(&rx->sk);
429 
430 	/* Return immediately if a client socket has no outstanding calls */
431 	if (RB_EMPTY_ROOT(&rx->calls) &&
432 	    list_empty(&rx->recvmsg_q) &&
433 	    skb_queue_empty_lockless(&rx->recvmsg_oobq) &&
434 	    rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
435 		release_sock(&rx->sk);
436 		return -EAGAIN;
437 	}
438 
439 	if (list_empty(&rx->recvmsg_q) &&
440 	    skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
441 		ret = -EWOULDBLOCK;
442 		if (timeo == 0) {
443 			call = NULL;
444 			goto error_no_call;
445 		}
446 
447 		release_sock(&rx->sk);
448 
449 		/* Wait for something to happen */
450 		prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
451 					  TASK_INTERRUPTIBLE);
452 		ret = sock_error(&rx->sk);
453 		if (ret)
454 			goto wait_error;
455 
456 		if (list_empty(&rx->recvmsg_q) &&
457 		    skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
458 			if (signal_pending(current))
459 				goto wait_interrupted;
460 			trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
461 			timeo = schedule_timeout(timeo);
462 		}
463 		finish_wait(sk_sleep(&rx->sk), &wait);
464 		goto try_again;
465 	}
466 
467 	/* Deal with OOB messages before we consider getting normal data. */
468 	if (!skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
469 		ret = rxrpc_recvmsg_oob(sock, msg, flags);
470 		release_sock(&rx->sk);
471 		if (ret == -EAGAIN)
472 			goto try_again;
473 		goto error_trace;
474 	}
475 
476 	/* Find the next call and dequeue it if we're not just peeking.  If we
477 	 * do dequeue it, that comes with a ref that we will need to release.
478 	 * We also want to weed out calls that got requeued whilst we were
479 	 * shovelling data out.
480 	 */
481 	spin_lock_irq(&rx->recvmsg_lock);
482 	l = rx->recvmsg_q.next;
483 	call = list_entry(l, struct rxrpc_call, recvmsg_link);
484 
485 	if (!rxrpc_call_is_complete(call) &&
486 	    skb_queue_empty(&call->recvmsg_queue) &&
487 	    skb_queue_empty(&rx->recvmsg_oobq)) {
488 		list_del_init(&call->recvmsg_link);
489 		spin_unlock_irq(&rx->recvmsg_lock);
490 		release_sock(&rx->sk);
491 		trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
492 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
493 		goto try_again;
494 	}
495 
496 	rxrpc_see_call(call, rxrpc_call_see_recvmsg);
497 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
498 		rxrpc_see_call(call, rxrpc_call_see_already_released);
499 		list_del_init(&call->recvmsg_link);
500 		spin_unlock_irq(&rx->recvmsg_lock);
501 		release_sock(&rx->sk);
502 		trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
503 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
504 		goto try_again;
505 	}
506 	if (!(flags & MSG_PEEK))
507 		list_del_init(&call->recvmsg_link);
508 	else
509 		rxrpc_get_call(call, rxrpc_call_get_recvmsg);
510 	spin_unlock_irq(&rx->recvmsg_lock);
511 
512 	call_debug_id = call->debug_id;
513 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
514 
515 	/* We're going to drop the socket lock, so we need to lock the call
516 	 * against interference by sendmsg.
517 	 */
518 	if (!mutex_trylock(&call->user_mutex)) {
519 		ret = -EWOULDBLOCK;
520 		if (flags & MSG_DONTWAIT)
521 			goto error_requeue_call;
522 		ret = -ERESTARTSYS;
523 		if (mutex_lock_interruptible(&call->user_mutex) < 0)
524 			goto error_requeue_call;
525 	}
526 
527 	release_sock(&rx->sk);
528 
529 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
530 		rxrpc_see_call(call, rxrpc_call_see_already_released);
531 		mutex_unlock(&call->user_mutex);
532 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
533 		goto try_again;
534 	}
535 
536 	ret = rxrpc_recvmsg_user_id(call, msg, flags);
537 	if (ret < 0)
538 		goto error_unlock_call;
539 
540 	if (msg->msg_name && call->peer) {
541 		size_t len = sizeof(call->dest_srx);
542 
543 		memcpy(msg->msg_name, &call->dest_srx, len);
544 		msg->msg_namelen = len;
545 	}
546 
547 	ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
548 				 flags, &copied);
549 	if (ret == -EAGAIN)
550 		ret = 0;
551 	if (ret == -EIO)
552 		goto call_failed;
553 	if (ret < 0)
554 		goto error_unlock_call;
555 
556 	if (rxrpc_call_is_complete(call) &&
557 	    skb_queue_empty(&call->recvmsg_queue))
558 		goto call_complete;
559 	if (rxrpc_call_has_failed(call))
560 		goto call_failed;
561 
562 	if (!(flags & MSG_PEEK) &&
563 	    !skb_queue_empty(&call->recvmsg_queue))
564 		rxrpc_notify_socket(call);
565 	goto not_yet_complete;
566 
567 call_failed:
568 	rxrpc_purge_queue(&call->recvmsg_queue);
569 call_complete:
570 	ret = rxrpc_recvmsg_term(call, msg);
571 	if (ret < 0)
572 		goto error_unlock_call;
573 	if (!(flags & MSG_PEEK))
574 		rxrpc_release_call(rx, call);
575 	msg->msg_flags |= MSG_EOR;
576 	ret = 1;
577 
578 not_yet_complete:
579 	if (ret == 0)
580 		msg->msg_flags |= MSG_MORE;
581 	else
582 		msg->msg_flags &= ~MSG_MORE;
583 	ret = copied;
584 
585 error_unlock_call:
586 	mutex_unlock(&call->user_mutex);
587 	rxrpc_put_call(call, rxrpc_call_put_recvmsg);
588 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
589 	return ret;
590 
591 error_requeue_call:
592 	if (!(flags & MSG_PEEK)) {
593 		spin_lock_irq(&rx->recvmsg_lock);
594 		if (list_empty(&call->recvmsg_link)) {
595 			list_add(&call->recvmsg_link, &rx->recvmsg_q);
596 			rxrpc_see_call(call, rxrpc_call_see_recvmsg_requeue);
597 			spin_unlock_irq(&rx->recvmsg_lock);
598 		} else if (list_is_first(&call->recvmsg_link, &rx->recvmsg_q)) {
599 			spin_unlock_irq(&rx->recvmsg_lock);
600 			rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_first);
601 		} else {
602 			list_move(&call->recvmsg_link, &rx->recvmsg_q);
603 			spin_unlock_irq(&rx->recvmsg_lock);
604 			rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_move);
605 		}
606 		trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
607 	} else {
608 		rxrpc_put_call(call, rxrpc_call_put_recvmsg_peek_nowait);
609 	}
610 error_no_call:
611 	release_sock(&rx->sk);
612 error_trace:
613 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
614 	return ret;
615 
616 wait_interrupted:
617 	ret = sock_intr_errno(timeo);
618 wait_error:
619 	finish_wait(sk_sleep(&rx->sk), &wait);
620 	call = NULL;
621 	goto error_trace;
622 }
623 
624 /**
625  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
626  * @sock: The socket that the call exists on
627  * @call: The call to send data through
628  * @iter: The buffer to receive into
629  * @_len: The amount of data we want to receive (decreased on return)
630  * @want_more: True if more data is expected to be read
631  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
632  * @_service: Where to store the actual service ID (may be upgraded)
633  *
634  * Allow a kernel service to receive data and pick up information about the
635  * state of a call.  Note that *@_abort should also be initialised to %0.
636  *
637  * Note that we may return %-EAGAIN to drain empty packets at the end
638  * of the data, even if we've already copied over the requested data.
639  *
640  * Return: %0 if got what was asked for and there's more available, %1
641  * if we got what was asked for and we're at the end of the data and
642  * %-EAGAIN if we need more data.
643  */
644 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
645 			   struct iov_iter *iter, size_t *_len,
646 			   bool want_more, u32 *_abort, u16 *_service)
647 {
648 	size_t offset = 0;
649 	int ret;
650 
651 	_enter("{%d},%zu,%d", call->debug_id, *_len, want_more);
652 
653 	mutex_lock(&call->user_mutex);
654 
655 	ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
656 	*_len -= offset;
657 	if (ret == -EIO)
658 		goto call_failed;
659 	if (ret < 0)
660 		goto out;
661 
662 	/* We can only reach here with a partially full buffer if we have
663 	 * reached the end of the data.  We must otherwise have a full buffer
664 	 * or have been given -EAGAIN.
665 	 */
666 	if (ret == 1) {
667 		if (iov_iter_count(iter) > 0)
668 			goto short_data;
669 		if (!want_more)
670 			goto read_phase_complete;
671 		ret = 0;
672 		goto out;
673 	}
674 
675 	if (!want_more)
676 		goto excess_data;
677 	goto out;
678 
679 read_phase_complete:
680 	ret = 1;
681 out:
682 	if (_service)
683 		*_service = call->dest_srx.srx_service;
684 	mutex_unlock(&call->user_mutex);
685 	_leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
686 	return ret;
687 
688 short_data:
689 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
690 			  call->cid, call->call_id, call->rx_consumed,
691 			  0, -EBADMSG);
692 	ret = -EBADMSG;
693 	goto out;
694 excess_data:
695 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
696 			  call->cid, call->call_id, call->rx_consumed,
697 			  0, -EMSGSIZE);
698 	ret = -EMSGSIZE;
699 	goto out;
700 call_failed:
701 	*_abort = call->abort_code;
702 	ret = call->error;
703 	if (call->completion == RXRPC_CALL_SUCCEEDED) {
704 		ret = 1;
705 		if (iov_iter_count(iter) > 0)
706 			ret = -ECONNRESET;
707 	}
708 	goto out;
709 }
710 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
711