xref: /linux/net/rxrpc/recvmsg.c (revision 985d4a55e64e43bd86eeb896b81ceba453301989)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14 
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18 
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25 	struct rxrpc_sock *rx;
26 	struct sock *sk;
27 
28 	_enter("%d", call->debug_id);
29 
30 	if (!list_empty(&call->recvmsg_link))
31 		return;
32 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
33 		rxrpc_see_call(call, rxrpc_call_see_notify_released);
34 		return;
35 	}
36 
37 	rcu_read_lock();
38 
39 	rx = rcu_dereference(call->socket);
40 	sk = &rx->sk;
41 	if (rx && sk->sk_state < RXRPC_CLOSE) {
42 		if (call->notify_rx) {
43 			spin_lock_irq(&call->notify_lock);
44 			call->notify_rx(sk, call, call->user_call_ID);
45 			spin_unlock_irq(&call->notify_lock);
46 		} else {
47 			spin_lock_irq(&rx->recvmsg_lock);
48 			if (list_empty(&call->recvmsg_link)) {
49 				rxrpc_get_call(call, rxrpc_call_get_notify_socket);
50 				list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
51 			}
52 			spin_unlock_irq(&rx->recvmsg_lock);
53 
54 			if (!sock_flag(sk, SOCK_DEAD)) {
55 				_debug("call %ps", sk->sk_data_ready);
56 				sk->sk_data_ready(sk);
57 			}
58 		}
59 	}
60 
61 	rcu_read_unlock();
62 	_leave("");
63 }
64 
65 /*
66  * Pass a call terminating message to userspace.
67  */
68 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
69 {
70 	u32 tmp = 0;
71 	int ret;
72 
73 	switch (call->completion) {
74 	case RXRPC_CALL_SUCCEEDED:
75 		ret = 0;
76 		if (rxrpc_is_service_call(call))
77 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
78 		break;
79 	case RXRPC_CALL_REMOTELY_ABORTED:
80 		tmp = call->abort_code;
81 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
82 		break;
83 	case RXRPC_CALL_LOCALLY_ABORTED:
84 		tmp = call->abort_code;
85 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
86 		break;
87 	case RXRPC_CALL_NETWORK_ERROR:
88 		tmp = -call->error;
89 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
90 		break;
91 	case RXRPC_CALL_LOCAL_ERROR:
92 		tmp = -call->error;
93 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
94 		break;
95 	default:
96 		pr_err("Invalid terminal call state %u\n", call->completion);
97 		BUG();
98 		break;
99 	}
100 
101 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
102 			     call->ackr_window - 1,
103 			     call->rx_pkt_offset, call->rx_pkt_len, ret);
104 	return ret;
105 }
106 
107 /*
108  * Discard a packet we've used up and advance the Rx window by one.
109  */
110 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
111 {
112 	struct rxrpc_skb_priv *sp;
113 	struct sk_buff *skb;
114 	rxrpc_serial_t serial;
115 	rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
116 	bool last;
117 	int acked;
118 
119 	_enter("%d", call->debug_id);
120 
121 	skb = skb_dequeue(&call->recvmsg_queue);
122 	rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
123 
124 	sp = rxrpc_skb(skb);
125 	tseq   = sp->hdr.seq;
126 	serial = sp->hdr.serial;
127 	last   = sp->hdr.flags & RXRPC_LAST_PACKET;
128 
129 	/* Barrier against rxrpc_input_data(). */
130 	if (after(tseq, call->rx_consumed))
131 		smp_store_release(&call->rx_consumed, tseq);
132 
133 	rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
134 
135 	trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
136 			    serial, call->rx_consumed);
137 
138 	if (last)
139 		set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
140 
141 	/* Check to see if there's an ACK that needs sending. */
142 	acked = atomic_add_return(call->rx_consumed - old_consumed,
143 				  &call->ackr_nr_consumed);
144 	if (acked > 8 &&
145 	    !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
146 		rxrpc_poke_call(call, rxrpc_call_poke_idle);
147 }
148 
149 /*
150  * Decrypt and verify a DATA packet.  The content of the packet is pulled out
151  * into a flat buffer rather than decrypting in place in the skbuff.  This also
152  * has the advantage of aligning the buffer correctly for the crypto routines.
153  *
154  * We keep track of the sequence number of the packet currently decrypted into
155  * the buffer in ->rx_dec_seq.  If MSG_PEEK is used and steps onto a new
156  * packet, subsequent recvmsg() calls will have to go back and re-decrypt the
157  * current packet.
158  */
159 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
160 {
161 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
162 	int ret;
163 
164 	if (sp->len > call->rx_dec_bsize) {
165 		/* Make sure we can hold a 1412-byte jumbo subpacket and make
166 		 * sure that the buffer size is aligned to a crypto blocksize.
167 		 */
168 		size_t size = clamp(round_up(sp->len, 32), 2048, 65535);
169 		void *buffer = krealloc(call->rx_dec_buffer, size, GFP_NOFS);
170 
171 		if (!buffer)
172 			return -ENOMEM;
173 		call->rx_dec_buffer = buffer;
174 		call->rx_dec_bsize = size;
175 	}
176 
177 	ret = -EFAULT;
178 	if (skb_copy_bits(skb, sp->offset, call->rx_dec_buffer, sp->len) < 0)
179 		goto err;
180 
181 	call->rx_dec_offset = 0;
182 	call->rx_dec_len = sp->len;
183 	call->rx_dec_seq = sp->hdr.seq;
184 	ret = call->security->verify_packet(call, skb);
185 	if (ret < 0)
186 		goto err;
187 	return 0;
188 
189 err:
190 	kfree(call->rx_dec_buffer);
191 	call->rx_dec_buffer = NULL;
192 	call->rx_dec_bsize = 0;
193 	call->rx_dec_offset = 0;
194 	call->rx_dec_len = 0;
195 	return ret;
196 }
197 
198 /*
199  * Transcribe a call's user ID to a control message.
200  */
201 static int rxrpc_recvmsg_user_id(struct rxrpc_call *call, struct msghdr *msg,
202 				 int flags)
203 {
204 	if (!test_bit(RXRPC_CALL_HAS_USERID, &call->flags))
205 		return 0;
206 
207 	if (flags & MSG_CMSG_COMPAT) {
208 		unsigned int id32 = call->user_call_ID;
209 
210 		return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
211 				sizeof(unsigned int), &id32);
212 	} else {
213 		unsigned long idl = call->user_call_ID;
214 
215 		return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
216 				sizeof(unsigned long), &idl);
217 	}
218 }
219 
220 /*
221  * Deal with a CHALLENGE packet.
222  */
223 static int rxrpc_recvmsg_challenge(struct socket *sock, struct msghdr *msg,
224 				   struct sk_buff *challenge, unsigned int flags)
225 {
226 	struct rxrpc_skb_priv *sp = rxrpc_skb(challenge);
227 	struct rxrpc_connection *conn = sp->chall.conn;
228 
229 	return conn->security->challenge_to_recvmsg(conn, challenge, msg);
230 }
231 
232 /*
233  * Process OOB packets.  Called with the socket locked.
234  */
235 static int rxrpc_recvmsg_oob(struct socket *sock, struct msghdr *msg,
236 			     unsigned int flags)
237 {
238 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
239 	struct sk_buff *skb;
240 	bool need_response = false;
241 	int ret;
242 
243 	skb = skb_peek(&rx->recvmsg_oobq);
244 	if (!skb)
245 		return -EAGAIN;
246 	rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
247 
248 	ret = put_cmsg(msg, SOL_RXRPC, RXRPC_OOB_ID, sizeof(u64),
249 		       &skb->skb_mstamp_ns);
250 	if (ret < 0)
251 		return ret;
252 
253 	switch ((enum rxrpc_oob_type)skb->mark) {
254 	case RXRPC_OOB_CHALLENGE:
255 		need_response = true;
256 		ret = rxrpc_recvmsg_challenge(sock, msg, skb, flags);
257 		break;
258 	default:
259 		WARN_ONCE(1, "recvmsg() can't process unknown OOB type %u\n",
260 			  skb->mark);
261 		ret = -EIO;
262 		break;
263 	}
264 
265 	if (!(flags & MSG_PEEK))
266 		skb_unlink(skb, &rx->recvmsg_oobq);
267 	if (need_response)
268 		rxrpc_add_pending_oob(rx, skb);
269 	else
270 		rxrpc_free_skb(skb, rxrpc_skb_put_oob);
271 	return ret;
272 }
273 
274 /*
275  * Deliver messages to a call.  This keeps processing packets until the buffer
276  * is filled and we find either more DATA (returns 0) or the end of the DATA
277  * (returns 1).  If more packets are required, it returns -EAGAIN and if the
278  * call has failed it returns -EIO.
279  */
280 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
281 			      struct msghdr *msg, struct iov_iter *iter,
282 			      size_t len, int flags, size_t *_offset)
283 {
284 	struct rxrpc_skb_priv *sp;
285 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
286 	struct sk_buff *skb;
287 	rxrpc_seq_t seq = 0;
288 	size_t remain;
289 	unsigned int rx_pkt_offset, rx_pkt_len;
290 	int copy, ret = -EAGAIN, ret2;
291 
292 	rx_pkt_offset = call->rx_pkt_offset;
293 	rx_pkt_len = call->rx_pkt_len;
294 
295 	if (rxrpc_call_has_failed(call)) {
296 		seq = call->ackr_window - 1;
297 		ret = -EIO;
298 		goto done;
299 	}
300 
301 	if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
302 		seq = call->ackr_window - 1;
303 		ret = 1;
304 		goto done;
305 	}
306 
307 	/* No one else can be removing stuff from the queue, so we shouldn't
308 	 * need the Rx lock to walk it.
309 	 */
310 	skb = skb_peek(&call->recvmsg_queue);
311 	while (skb) {
312 		rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
313 		sp = rxrpc_skb(skb);
314 		seq = sp->hdr.seq;
315 
316 		if (!(flags & MSG_PEEK))
317 			trace_rxrpc_receive(call, rxrpc_receive_front,
318 					    sp->hdr.serial, seq);
319 
320 		if (msg)
321 			sock_recv_timestamp(msg, sock->sk, skb);
322 
323 		if (call->rx_dec_seq != sp->hdr.seq ||
324 		    !call->rx_dec_buffer) {
325 			ret2 = rxrpc_verify_data(call, skb);
326 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
327 					     call->rx_dec_offset,
328 					     call->rx_dec_len, ret2);
329 			if (ret2 < 0) {
330 				ret = ret2;
331 				goto out;
332 			}
333 		}
334 
335 		if (rx_pkt_offset == USHRT_MAX) {
336 			rx_pkt_offset = call->rx_dec_offset;
337 			rx_pkt_len = call->rx_dec_len;
338 		} else {
339 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
340 					     rx_pkt_offset, rx_pkt_len, 0);
341 		}
342 
343 		/* We have to handle short, empty and used-up DATA packets. */
344 		remain = len - *_offset;
345 		copy = rx_pkt_len;
346 		if (copy > remain)
347 			copy = remain;
348 		if (copy > 0) {
349 			ret2 = copy_to_iter(call->rx_dec_buffer + rx_pkt_offset,
350 					    copy, iter);
351 			if (ret2 != copy) {
352 				ret = -EFAULT;
353 				goto out;
354 			}
355 
356 			/* handle piecemeal consumption of data packets */
357 			rx_pkt_offset += copy;
358 			rx_pkt_len -= copy;
359 			*_offset += copy;
360 		}
361 
362 		if (rx_pkt_len > 0) {
363 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
364 					     rx_pkt_offset, rx_pkt_len, 0);
365 			ASSERTCMP(*_offset, ==, len);
366 			ret = 0;
367 			break;
368 		}
369 
370 		/* The whole packet has been transferred. */
371 		if (sp->hdr.flags & RXRPC_LAST_PACKET)
372 			ret = 1;
373 		rx_pkt_offset = USHRT_MAX;
374 		rx_pkt_len = 0;
375 
376 		skb = skb_peek_next(skb, &call->recvmsg_queue);
377 
378 		if (!(flags & MSG_PEEK))
379 			rxrpc_rotate_rx_window(call);
380 
381 		if (!rx->app_ops &&
382 		    !skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
383 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_oobq, seq,
384 					     rx_pkt_offset, rx_pkt_len, ret);
385 			break;
386 		}
387 	}
388 
389 out:
390 	if (!(flags & MSG_PEEK)) {
391 		call->rx_pkt_offset = rx_pkt_offset;
392 		call->rx_pkt_len = rx_pkt_len;
393 	}
394 
395 done:
396 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
397 			     rx_pkt_offset, rx_pkt_len, ret);
398 	if (ret == -EAGAIN)
399 		set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
400 	return ret;
401 }
402 
403 /*
404  * Receive a message from an RxRPC socket
405  * - we need to be careful about two or more threads calling recvmsg
406  *   simultaneously
407  */
408 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
409 		  int flags)
410 {
411 	struct rxrpc_call *call;
412 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
413 	struct list_head *l;
414 	unsigned int call_debug_id = 0;
415 	size_t copied = 0;
416 	long timeo;
417 	int ret;
418 
419 	DEFINE_WAIT(wait);
420 
421 	trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
422 
423 	if (flags & (MSG_OOB | MSG_TRUNC))
424 		return -EOPNOTSUPP;
425 
426 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
427 
428 try_again:
429 	lock_sock(&rx->sk);
430 
431 	/* Return immediately if a client socket has no outstanding calls */
432 	if (RB_EMPTY_ROOT(&rx->calls) &&
433 	    list_empty(&rx->recvmsg_q) &&
434 	    skb_queue_empty_lockless(&rx->recvmsg_oobq) &&
435 	    rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
436 		release_sock(&rx->sk);
437 		return -EAGAIN;
438 	}
439 
440 	if (list_empty(&rx->recvmsg_q)) {
441 		ret = -EWOULDBLOCK;
442 		if (timeo == 0) {
443 			call = NULL;
444 			goto error_no_call;
445 		}
446 
447 		release_sock(&rx->sk);
448 
449 		/* Wait for something to happen */
450 		prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
451 					  TASK_INTERRUPTIBLE);
452 		ret = sock_error(&rx->sk);
453 		if (ret)
454 			goto wait_error;
455 
456 		if (list_empty(&rx->recvmsg_q) &&
457 		    skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
458 			if (signal_pending(current))
459 				goto wait_interrupted;
460 			trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
461 			timeo = schedule_timeout(timeo);
462 		}
463 		finish_wait(sk_sleep(&rx->sk), &wait);
464 		goto try_again;
465 	}
466 
467 	/* Deal with OOB messages before we consider getting normal data. */
468 	if (!skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
469 		ret = rxrpc_recvmsg_oob(sock, msg, flags);
470 		release_sock(&rx->sk);
471 		if (ret == -EAGAIN)
472 			goto try_again;
473 		goto error_no_call;
474 	}
475 
476 	/* Find the next call and dequeue it if we're not just peeking.  If we
477 	 * do dequeue it, that comes with a ref that we will need to release.
478 	 * We also want to weed out calls that got requeued whilst we were
479 	 * shovelling data out.
480 	 */
481 	spin_lock_irq(&rx->recvmsg_lock);
482 	l = rx->recvmsg_q.next;
483 	call = list_entry(l, struct rxrpc_call, recvmsg_link);
484 
485 	if (!rxrpc_call_is_complete(call) &&
486 	    skb_queue_empty(&call->recvmsg_queue) &&
487 	    skb_queue_empty(&rx->recvmsg_oobq)) {
488 		list_del_init(&call->recvmsg_link);
489 		spin_unlock_irq(&rx->recvmsg_lock);
490 		release_sock(&rx->sk);
491 		trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
492 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
493 		goto try_again;
494 	}
495 
496 	rxrpc_see_call(call, rxrpc_call_see_recvmsg);
497 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
498 		rxrpc_see_call(call, rxrpc_call_see_already_released);
499 		list_del_init(&call->recvmsg_link);
500 		spin_unlock_irq(&rx->recvmsg_lock);
501 		release_sock(&rx->sk);
502 		trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
503 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
504 		goto try_again;
505 	}
506 	if (!(flags & MSG_PEEK))
507 		list_del_init(&call->recvmsg_link);
508 	else
509 		rxrpc_get_call(call, rxrpc_call_get_recvmsg);
510 	spin_unlock_irq(&rx->recvmsg_lock);
511 
512 	call_debug_id = call->debug_id;
513 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
514 
515 	/* We're going to drop the socket lock, so we need to lock the call
516 	 * against interference by sendmsg.
517 	 */
518 	if (!mutex_trylock(&call->user_mutex)) {
519 		ret = -EWOULDBLOCK;
520 		if (flags & MSG_DONTWAIT)
521 			goto error_requeue_call;
522 		ret = -ERESTARTSYS;
523 		if (mutex_lock_interruptible(&call->user_mutex) < 0)
524 			goto error_requeue_call;
525 	}
526 
527 	release_sock(&rx->sk);
528 
529 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
530 		rxrpc_see_call(call, rxrpc_call_see_already_released);
531 		mutex_unlock(&call->user_mutex);
532 		if (!(flags & MSG_PEEK))
533 			rxrpc_put_call(call, rxrpc_call_put_recvmsg);
534 		goto try_again;
535 	}
536 
537 	ret = rxrpc_recvmsg_user_id(call, msg, flags);
538 	if (ret < 0)
539 		goto error_unlock_call;
540 
541 	if (msg->msg_name && call->peer) {
542 		size_t len = sizeof(call->dest_srx);
543 
544 		memcpy(msg->msg_name, &call->dest_srx, len);
545 		msg->msg_namelen = len;
546 	}
547 
548 	ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
549 				 flags, &copied);
550 	if (ret == -EAGAIN)
551 		ret = 0;
552 	if (ret == -EIO)
553 		goto call_failed;
554 	if (ret < 0)
555 		goto error_unlock_call;
556 
557 	if (rxrpc_call_is_complete(call) &&
558 	    skb_queue_empty(&call->recvmsg_queue))
559 		goto call_complete;
560 	if (rxrpc_call_has_failed(call))
561 		goto call_failed;
562 
563 	if (!(flags & MSG_PEEK) &&
564 	    !skb_queue_empty(&call->recvmsg_queue))
565 		rxrpc_notify_socket(call);
566 	goto not_yet_complete;
567 
568 call_failed:
569 	rxrpc_purge_queue(&call->recvmsg_queue);
570 call_complete:
571 	ret = rxrpc_recvmsg_term(call, msg);
572 	if (ret < 0)
573 		goto error_unlock_call;
574 	if (!(flags & MSG_PEEK))
575 		rxrpc_release_call(rx, call);
576 	msg->msg_flags |= MSG_EOR;
577 	ret = 1;
578 
579 not_yet_complete:
580 	if (ret == 0)
581 		msg->msg_flags |= MSG_MORE;
582 	else
583 		msg->msg_flags &= ~MSG_MORE;
584 	ret = copied;
585 
586 error_unlock_call:
587 	mutex_unlock(&call->user_mutex);
588 	rxrpc_put_call(call, rxrpc_call_put_recvmsg);
589 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
590 	return ret;
591 
592 error_requeue_call:
593 	if (!(flags & MSG_PEEK)) {
594 		spin_lock_irq(&rx->recvmsg_lock);
595 		if (list_empty(&call->recvmsg_link)) {
596 			list_add(&call->recvmsg_link, &rx->recvmsg_q);
597 			rxrpc_see_call(call, rxrpc_call_see_recvmsg_requeue);
598 			spin_unlock_irq(&rx->recvmsg_lock);
599 		} else if (list_is_first(&call->recvmsg_link, &rx->recvmsg_q)) {
600 			spin_unlock_irq(&rx->recvmsg_lock);
601 			rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_first);
602 		} else {
603 			list_move(&call->recvmsg_link, &rx->recvmsg_q);
604 			spin_unlock_irq(&rx->recvmsg_lock);
605 			rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_move);
606 		}
607 		trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
608 	} else {
609 		rxrpc_put_call(call, rxrpc_call_put_recvmsg_peek_nowait);
610 	}
611 error_no_call:
612 	release_sock(&rx->sk);
613 error_trace:
614 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
615 	return ret;
616 
617 wait_interrupted:
618 	ret = sock_intr_errno(timeo);
619 wait_error:
620 	finish_wait(sk_sleep(&rx->sk), &wait);
621 	call = NULL;
622 	goto error_trace;
623 }
624 
625 /**
626  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
627  * @sock: The socket that the call exists on
628  * @call: The call to send data through
629  * @iter: The buffer to receive into
630  * @_len: The amount of data we want to receive (decreased on return)
631  * @want_more: True if more data is expected to be read
632  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
633  * @_service: Where to store the actual service ID (may be upgraded)
634  *
635  * Allow a kernel service to receive data and pick up information about the
636  * state of a call.  Note that *@_abort should also be initialised to %0.
637  *
638  * Note that we may return %-EAGAIN to drain empty packets at the end
639  * of the data, even if we've already copied over the requested data.
640  *
641  * Return: %0 if got what was asked for and there's more available, %1
642  * if we got what was asked for and we're at the end of the data and
643  * %-EAGAIN if we need more data.
644  */
645 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
646 			   struct iov_iter *iter, size_t *_len,
647 			   bool want_more, u32 *_abort, u16 *_service)
648 {
649 	size_t offset = 0;
650 	int ret;
651 
652 	_enter("{%d},%zu,%d", call->debug_id, *_len, want_more);
653 
654 	mutex_lock(&call->user_mutex);
655 
656 	ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
657 	*_len -= offset;
658 	if (ret == -EIO)
659 		goto call_failed;
660 	if (ret < 0)
661 		goto out;
662 
663 	/* We can only reach here with a partially full buffer if we have
664 	 * reached the end of the data.  We must otherwise have a full buffer
665 	 * or have been given -EAGAIN.
666 	 */
667 	if (ret == 1) {
668 		if (iov_iter_count(iter) > 0)
669 			goto short_data;
670 		if (!want_more)
671 			goto read_phase_complete;
672 		ret = 0;
673 		goto out;
674 	}
675 
676 	if (!want_more)
677 		goto excess_data;
678 	goto out;
679 
680 read_phase_complete:
681 	ret = 1;
682 out:
683 	if (_service)
684 		*_service = call->dest_srx.srx_service;
685 	mutex_unlock(&call->user_mutex);
686 	_leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
687 	return ret;
688 
689 short_data:
690 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
691 			  call->cid, call->call_id, call->rx_consumed,
692 			  0, -EBADMSG);
693 	ret = -EBADMSG;
694 	goto out;
695 excess_data:
696 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
697 			  call->cid, call->call_id, call->rx_consumed,
698 			  0, -EMSGSIZE);
699 	ret = -EMSGSIZE;
700 	goto out;
701 call_failed:
702 	*_abort = call->abort_code;
703 	ret = call->error;
704 	if (call->completion == RXRPC_CALL_SUCCEEDED) {
705 		ret = 1;
706 		if (iov_iter_count(iter) > 0)
707 			ret = -ECONNRESET;
708 	}
709 	goto out;
710 }
711 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
712