xref: /linux/net/rxrpc/recvmsg.c (revision 5801cff7d5d7b4e9d877dfb627b23eb63167f02c)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14 
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18 
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25 	struct rxrpc_sock *rx;
26 	struct sock *sk;
27 
28 	_enter("%d", call->debug_id);
29 
30 	if (!list_empty(&call->recvmsg_link))
31 		return;
32 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
33 		rxrpc_see_call(call, rxrpc_call_see_notify_released);
34 		return;
35 	}
36 
37 	rcu_read_lock();
38 
39 	rx = rcu_dereference(call->socket);
40 	sk = &rx->sk;
41 	if (rx && sk->sk_state < RXRPC_CLOSE) {
42 		if (call->notify_rx) {
43 			spin_lock_irq(&call->notify_lock);
44 			call->notify_rx(sk, call, call->user_call_ID);
45 			spin_unlock_irq(&call->notify_lock);
46 		} else {
47 			spin_lock_irq(&rx->recvmsg_lock);
48 			if (list_empty(&call->recvmsg_link)) {
49 				rxrpc_get_call(call, rxrpc_call_get_notify_socket);
50 				list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
51 			}
52 			spin_unlock_irq(&rx->recvmsg_lock);
53 
54 			if (!sock_flag(sk, SOCK_DEAD)) {
55 				_debug("call %ps", sk->sk_data_ready);
56 				sk->sk_data_ready(sk);
57 			}
58 		}
59 	}
60 
61 	rcu_read_unlock();
62 	_leave("");
63 }
64 
65 /*
66  * Pass a call terminating message to userspace.
67  */
68 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
69 {
70 	u32 tmp = 0;
71 	int ret;
72 
73 	switch (call->completion) {
74 	case RXRPC_CALL_SUCCEEDED:
75 		ret = 0;
76 		if (rxrpc_is_service_call(call))
77 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
78 		break;
79 	case RXRPC_CALL_REMOTELY_ABORTED:
80 		tmp = call->abort_code;
81 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
82 		break;
83 	case RXRPC_CALL_LOCALLY_ABORTED:
84 		tmp = call->abort_code;
85 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
86 		break;
87 	case RXRPC_CALL_NETWORK_ERROR:
88 		tmp = -call->error;
89 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
90 		break;
91 	case RXRPC_CALL_LOCAL_ERROR:
92 		tmp = -call->error;
93 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
94 		break;
95 	default:
96 		pr_err("Invalid terminal call state %u\n", call->completion);
97 		BUG();
98 		break;
99 	}
100 
101 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
102 			     call->ackr_window - 1,
103 			     call->rx_pkt_offset, call->rx_pkt_len, ret);
104 	return ret;
105 }
106 
107 /*
108  * Discard a packet we've used up and advance the Rx window by one.
109  */
110 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
111 {
112 	struct rxrpc_skb_priv *sp;
113 	struct sk_buff *skb;
114 	rxrpc_serial_t serial;
115 	rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
116 	bool last;
117 	int acked;
118 
119 	_enter("%d", call->debug_id);
120 
121 	skb = skb_dequeue(&call->recvmsg_queue);
122 	rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
123 
124 	sp = rxrpc_skb(skb);
125 	tseq   = sp->hdr.seq;
126 	serial = sp->hdr.serial;
127 	last   = sp->hdr.flags & RXRPC_LAST_PACKET;
128 
129 	/* Barrier against rxrpc_input_data(). */
130 	if (after(tseq, call->rx_consumed))
131 		smp_store_release(&call->rx_consumed, tseq);
132 
133 	rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
134 
135 	trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
136 			    serial, call->rx_consumed);
137 
138 	if (last)
139 		set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
140 
141 	/* Check to see if there's an ACK that needs sending. */
142 	acked = atomic_add_return(call->rx_consumed - old_consumed,
143 				  &call->ackr_nr_consumed);
144 	if (acked > 8 &&
145 	    !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
146 		rxrpc_poke_call(call, rxrpc_call_poke_idle);
147 }
148 
149 /*
150  * Decrypt and verify a DATA packet.  The content of the packet is pulled out
151  * into a flat buffer rather than decrypting in place in the skbuff.  This also
152  * has the advantage of aligning the buffer correctly for the crypto routines.
153  *
154  * We keep track of the sequence number of the packet currently decrypted into
155  * the buffer in ->rx_dec_seq.  If MSG_PEEK is used and steps onto a new
156  * packet, subsequent recvmsg() calls will have to go back and re-decrypt the
157  * current packet.
158  */
159 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
160 {
161 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
162 	int ret;
163 
164 	if (sp->len > call->rx_dec_bsize || !call->rx_dec_buffer) {
165 		/* Make sure we can hold a 1412-byte jumbo subpacket and make
166 		 * sure that the buffer size is aligned to a crypto blocksize.
167 		 */
168 		size_t size = clamp(round_up(sp->len, 32), 2048, 65535);
169 		void *buffer = krealloc(call->rx_dec_buffer, size, GFP_NOFS);
170 
171 		if (!buffer)
172 			return -ENOMEM;
173 		call->rx_dec_buffer = buffer;
174 		call->rx_dec_bsize = size;
175 	}
176 
177 	ret = -EFAULT;
178 	if (skb_copy_bits(skb, sp->offset, call->rx_dec_buffer, sp->len) < 0)
179 		goto err;
180 
181 	call->rx_dec_offset = 0;
182 	call->rx_dec_len = sp->len;
183 	call->rx_dec_seq = sp->hdr.seq;
184 	ret = call->security->verify_packet(call, skb);
185 	if (ret < 0)
186 		goto err;
187 	return 0;
188 
189 err:
190 	kfree(call->rx_dec_buffer);
191 	call->rx_dec_buffer = NULL;
192 	call->rx_dec_bsize = 0;
193 	call->rx_dec_offset = 0;
194 	call->rx_dec_len = 0;
195 	return ret;
196 }
197 
198 /*
199  * Transcribe a call's user ID to a control message.
200  */
201 static int rxrpc_recvmsg_user_id(struct rxrpc_call *call, struct msghdr *msg,
202 				 int flags)
203 {
204 	if (!test_bit(RXRPC_CALL_HAS_USERID, &call->flags))
205 		return 0;
206 
207 	if (flags & MSG_CMSG_COMPAT) {
208 		unsigned int id32 = call->user_call_ID;
209 
210 		return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
211 				sizeof(unsigned int), &id32);
212 	} else {
213 		unsigned long idl = call->user_call_ID;
214 
215 		return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
216 				sizeof(unsigned long), &idl);
217 	}
218 }
219 
220 /*
221  * Deal with a CHALLENGE packet.
222  */
223 static int rxrpc_recvmsg_challenge(struct socket *sock, struct msghdr *msg,
224 				   struct sk_buff *challenge, unsigned int flags)
225 {
226 	struct rxrpc_skb_priv *sp = rxrpc_skb(challenge);
227 	struct rxrpc_connection *conn = sp->chall.conn;
228 
229 	return conn->security->challenge_to_recvmsg(conn, challenge, msg);
230 }
231 
232 /*
233  * Process OOB packets.  Called with the socket locked.
234  */
235 static int rxrpc_recvmsg_oob(struct socket *sock, struct msghdr *msg,
236 			     unsigned int flags)
237 {
238 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
239 	struct sk_buff *skb;
240 	bool need_response = false;
241 	int ret;
242 
243 	skb = skb_peek(&rx->recvmsg_oobq);
244 	if (!skb)
245 		return -EAGAIN;
246 	rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
247 
248 	ret = put_cmsg(msg, SOL_RXRPC, RXRPC_OOB_ID, sizeof(u64),
249 		       &skb->skb_mstamp_ns);
250 	if (ret < 0)
251 		return ret;
252 
253 	switch ((enum rxrpc_oob_type)skb->mark) {
254 	case RXRPC_OOB_CHALLENGE:
255 		need_response = true;
256 		ret = rxrpc_recvmsg_challenge(sock, msg, skb, flags);
257 		break;
258 	default:
259 		WARN_ONCE(1, "recvmsg() can't process unknown OOB type %u\n",
260 			  skb->mark);
261 		ret = -EIO;
262 		break;
263 	}
264 
265 	if (!(flags & MSG_PEEK)) {
266 		skb_unlink(skb, &rx->recvmsg_oobq);
267 		if (need_response)
268 			rxrpc_add_pending_oob(rx, skb);
269 		else
270 			rxrpc_free_skb(skb, rxrpc_skb_put_oob);
271 	}
272 	return ret;
273 }
274 
275 /*
276  * Deliver messages to a call.  This keeps processing packets until the buffer
277  * is filled and we find either more DATA (returns 0) or the end of the DATA
278  * (returns 1).  If more packets are required, it returns -EAGAIN and if the
279  * call has failed it returns -EIO.
280  */
281 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
282 			      struct msghdr *msg, struct iov_iter *iter,
283 			      size_t len, int flags, size_t *_offset)
284 {
285 	struct rxrpc_skb_priv *sp;
286 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
287 	struct sk_buff *skb;
288 	rxrpc_seq_t seq = 0;
289 	size_t remain;
290 	unsigned int rx_pkt_offset, rx_pkt_len;
291 	int copy, ret = -EAGAIN, ret2;
292 
293 	rx_pkt_offset = call->rx_pkt_offset;
294 	rx_pkt_len = call->rx_pkt_len;
295 
296 	if (rxrpc_call_has_failed(call)) {
297 		seq = call->ackr_window - 1;
298 		ret = -EIO;
299 		goto done;
300 	}
301 
302 	if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
303 		seq = call->ackr_window - 1;
304 		ret = 1;
305 		goto done;
306 	}
307 
308 	/* No one else can be removing stuff from the queue, so we shouldn't
309 	 * need the Rx lock to walk it.
310 	 */
311 	skb = skb_peek(&call->recvmsg_queue);
312 	while (skb) {
313 		rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
314 		sp = rxrpc_skb(skb);
315 		seq = sp->hdr.seq;
316 
317 		if (!(flags & MSG_PEEK))
318 			trace_rxrpc_receive(call, rxrpc_receive_front,
319 					    sp->hdr.serial, seq);
320 
321 		if (msg)
322 			sock_recv_timestamp(msg, sock->sk, skb);
323 
324 		if (call->rx_dec_seq != sp->hdr.seq ||
325 		    !call->rx_dec_buffer) {
326 			ret2 = rxrpc_verify_data(call, skb);
327 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
328 					     call->rx_dec_offset,
329 					     call->rx_dec_len, ret2);
330 			if (ret2 < 0) {
331 				ret = ret2;
332 				goto out;
333 			}
334 		}
335 
336 		if (rx_pkt_offset == USHRT_MAX) {
337 			rx_pkt_offset = call->rx_dec_offset;
338 			rx_pkt_len = call->rx_dec_len;
339 		} else {
340 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
341 					     rx_pkt_offset, rx_pkt_len, 0);
342 		}
343 
344 		/* We have to handle short, empty and used-up DATA packets. */
345 		remain = len - *_offset;
346 		copy = rx_pkt_len;
347 		if (copy > remain)
348 			copy = remain;
349 		if (copy > 0) {
350 			ret2 = copy_to_iter(call->rx_dec_buffer + rx_pkt_offset,
351 					    copy, iter);
352 			if (ret2 != copy) {
353 				ret = -EFAULT;
354 				goto out;
355 			}
356 
357 			/* handle piecemeal consumption of data packets */
358 			rx_pkt_offset += copy;
359 			rx_pkt_len -= copy;
360 			*_offset += copy;
361 		}
362 
363 		if (rx_pkt_len > 0) {
364 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
365 					     rx_pkt_offset, rx_pkt_len, 0);
366 			ASSERTCMP(*_offset, ==, len);
367 			ret = 0;
368 			break;
369 		}
370 
371 		/* The whole packet has been transferred. */
372 		if (sp->hdr.flags & RXRPC_LAST_PACKET)
373 			ret = 1;
374 		rx_pkt_offset = USHRT_MAX;
375 		rx_pkt_len = 0;
376 
377 		skb = skb_peek_next(skb, &call->recvmsg_queue);
378 
379 		if (!(flags & MSG_PEEK))
380 			rxrpc_rotate_rx_window(call);
381 
382 		if (!rx->app_ops &&
383 		    !skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
384 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_oobq, seq,
385 					     rx_pkt_offset, rx_pkt_len, ret);
386 			break;
387 		}
388 	}
389 
390 out:
391 	if (!(flags & MSG_PEEK)) {
392 		call->rx_pkt_offset = rx_pkt_offset;
393 		call->rx_pkt_len = rx_pkt_len;
394 	}
395 
396 done:
397 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
398 			     rx_pkt_offset, rx_pkt_len, ret);
399 	if (ret == -EAGAIN)
400 		set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
401 	return ret;
402 }
403 
404 /*
405  * Receive a message from an RxRPC socket
406  * - we need to be careful about two or more threads calling recvmsg
407  *   simultaneously
408  */
409 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
410 		  int flags)
411 {
412 	struct rxrpc_call *call;
413 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
414 	struct list_head *l;
415 	unsigned int call_debug_id = 0;
416 	size_t copied = 0;
417 	long timeo;
418 	int ret;
419 
420 	DEFINE_WAIT(wait);
421 
422 	trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
423 
424 	if (flags & (MSG_OOB | MSG_TRUNC))
425 		return -EOPNOTSUPP;
426 
427 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
428 
429 try_again:
430 	lock_sock(&rx->sk);
431 
432 	/* Return immediately if a client socket has no outstanding calls */
433 	if (RB_EMPTY_ROOT(&rx->calls) &&
434 	    list_empty(&rx->recvmsg_q) &&
435 	    skb_queue_empty_lockless(&rx->recvmsg_oobq) &&
436 	    rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
437 		release_sock(&rx->sk);
438 		return -EAGAIN;
439 	}
440 
441 	if (list_empty(&rx->recvmsg_q)) {
442 		ret = -EWOULDBLOCK;
443 		if (timeo == 0) {
444 			call = NULL;
445 			goto error_no_call;
446 		}
447 
448 		release_sock(&rx->sk);
449 
450 		/* Wait for something to happen */
451 		prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
452 					  TASK_INTERRUPTIBLE);
453 		ret = sock_error(&rx->sk);
454 		if (ret)
455 			goto wait_error;
456 
457 		if (list_empty(&rx->recvmsg_q) &&
458 		    skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
459 			if (signal_pending(current))
460 				goto wait_interrupted;
461 			trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
462 			timeo = schedule_timeout(timeo);
463 		}
464 		finish_wait(sk_sleep(&rx->sk), &wait);
465 		goto try_again;
466 	}
467 
468 	/* Deal with OOB messages before we consider getting normal data. */
469 	if (!skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
470 		ret = rxrpc_recvmsg_oob(sock, msg, flags);
471 		release_sock(&rx->sk);
472 		if (ret == -EAGAIN)
473 			goto try_again;
474 		goto error_no_call;
475 	}
476 
477 	/* Find the next call and dequeue it if we're not just peeking.  If we
478 	 * do dequeue it, that comes with a ref that we will need to release.
479 	 * We also want to weed out calls that got requeued whilst we were
480 	 * shovelling data out.
481 	 */
482 	spin_lock_irq(&rx->recvmsg_lock);
483 	l = rx->recvmsg_q.next;
484 	call = list_entry(l, struct rxrpc_call, recvmsg_link);
485 
486 	if (!rxrpc_call_is_complete(call) &&
487 	    skb_queue_empty(&call->recvmsg_queue) &&
488 	    skb_queue_empty(&rx->recvmsg_oobq)) {
489 		list_del_init(&call->recvmsg_link);
490 		spin_unlock_irq(&rx->recvmsg_lock);
491 		release_sock(&rx->sk);
492 		trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
493 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
494 		goto try_again;
495 	}
496 
497 	rxrpc_see_call(call, rxrpc_call_see_recvmsg);
498 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
499 		rxrpc_see_call(call, rxrpc_call_see_already_released);
500 		list_del_init(&call->recvmsg_link);
501 		spin_unlock_irq(&rx->recvmsg_lock);
502 		release_sock(&rx->sk);
503 		trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
504 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
505 		goto try_again;
506 	}
507 	if (!(flags & MSG_PEEK))
508 		list_del_init(&call->recvmsg_link);
509 	else
510 		rxrpc_get_call(call, rxrpc_call_get_recvmsg);
511 	spin_unlock_irq(&rx->recvmsg_lock);
512 
513 	call_debug_id = call->debug_id;
514 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
515 
516 	/* We're going to drop the socket lock, so we need to lock the call
517 	 * against interference by sendmsg.
518 	 */
519 	if (!mutex_trylock(&call->user_mutex)) {
520 		ret = -EWOULDBLOCK;
521 		if (flags & MSG_DONTWAIT)
522 			goto error_requeue_call;
523 		ret = -ERESTARTSYS;
524 		if (mutex_lock_interruptible(&call->user_mutex) < 0)
525 			goto error_requeue_call;
526 	}
527 
528 	release_sock(&rx->sk);
529 
530 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
531 		rxrpc_see_call(call, rxrpc_call_see_already_released);
532 		mutex_unlock(&call->user_mutex);
533 		if (!(flags & MSG_PEEK))
534 			rxrpc_put_call(call, rxrpc_call_put_recvmsg);
535 		goto try_again;
536 	}
537 
538 	ret = rxrpc_recvmsg_user_id(call, msg, flags);
539 	if (ret < 0)
540 		goto error_unlock_call;
541 
542 	if (msg->msg_name && call->peer) {
543 		size_t len = sizeof(call->dest_srx);
544 
545 		memcpy(msg->msg_name, &call->dest_srx, len);
546 		msg->msg_namelen = len;
547 	}
548 
549 	ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
550 				 flags, &copied);
551 	if (ret == -EAGAIN)
552 		ret = 0;
553 	if (ret == -EIO)
554 		goto call_failed;
555 	if (ret < 0)
556 		goto error_unlock_call;
557 
558 	if (rxrpc_call_is_complete(call) &&
559 	    skb_queue_empty(&call->recvmsg_queue))
560 		goto call_complete;
561 	if (rxrpc_call_has_failed(call))
562 		goto call_failed;
563 
564 	if (!(flags & MSG_PEEK) &&
565 	    !skb_queue_empty(&call->recvmsg_queue))
566 		rxrpc_notify_socket(call);
567 	goto not_yet_complete;
568 
569 call_failed:
570 	rxrpc_purge_queue(&call->recvmsg_queue);
571 call_complete:
572 	ret = rxrpc_recvmsg_term(call, msg);
573 	if (ret < 0)
574 		goto error_unlock_call;
575 	if (!(flags & MSG_PEEK))
576 		rxrpc_release_call(rx, call);
577 	msg->msg_flags |= MSG_EOR;
578 	ret = 1;
579 
580 not_yet_complete:
581 	if (ret == 0)
582 		msg->msg_flags |= MSG_MORE;
583 	else
584 		msg->msg_flags &= ~MSG_MORE;
585 	ret = copied;
586 
587 error_unlock_call:
588 	mutex_unlock(&call->user_mutex);
589 	rxrpc_put_call(call, rxrpc_call_put_recvmsg);
590 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
591 	return ret;
592 
593 error_requeue_call:
594 	if (!(flags & MSG_PEEK)) {
595 		spin_lock_irq(&rx->recvmsg_lock);
596 		if (list_empty(&call->recvmsg_link)) {
597 			list_add(&call->recvmsg_link, &rx->recvmsg_q);
598 			rxrpc_see_call(call, rxrpc_call_see_recvmsg_requeue);
599 			spin_unlock_irq(&rx->recvmsg_lock);
600 		} else if (list_is_first(&call->recvmsg_link, &rx->recvmsg_q)) {
601 			spin_unlock_irq(&rx->recvmsg_lock);
602 			rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_first);
603 		} else {
604 			list_move(&call->recvmsg_link, &rx->recvmsg_q);
605 			spin_unlock_irq(&rx->recvmsg_lock);
606 			rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_move);
607 		}
608 		trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
609 	} else {
610 		rxrpc_put_call(call, rxrpc_call_put_recvmsg_peek_nowait);
611 	}
612 error_no_call:
613 	release_sock(&rx->sk);
614 error_trace:
615 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
616 	return ret;
617 
618 wait_interrupted:
619 	ret = sock_intr_errno(timeo);
620 wait_error:
621 	finish_wait(sk_sleep(&rx->sk), &wait);
622 	call = NULL;
623 	goto error_trace;
624 }
625 
626 /**
627  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
628  * @sock: The socket that the call exists on
629  * @call: The call to send data through
630  * @iter: The buffer to receive into
631  * @_len: The amount of data we want to receive (decreased on return)
632  * @want_more: True if more data is expected to be read
633  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
634  * @_service: Where to store the actual service ID (may be upgraded)
635  *
636  * Allow a kernel service to receive data and pick up information about the
637  * state of a call.  Note that *@_abort should also be initialised to %0.
638  *
639  * Note that we may return %-EAGAIN to drain empty packets at the end
640  * of the data, even if we've already copied over the requested data.
641  *
642  * Return: %0 if got what was asked for and there's more available, %1
643  * if we got what was asked for and we're at the end of the data and
644  * %-EAGAIN if we need more data.
645  */
646 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
647 			   struct iov_iter *iter, size_t *_len,
648 			   bool want_more, u32 *_abort, u16 *_service)
649 {
650 	size_t offset = 0;
651 	int ret;
652 
653 	_enter("{%d},%zu,%d", call->debug_id, *_len, want_more);
654 
655 	mutex_lock(&call->user_mutex);
656 
657 	ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
658 	*_len -= offset;
659 	if (ret == -EIO)
660 		goto call_failed;
661 	if (ret < 0)
662 		goto out;
663 
664 	/* We can only reach here with a partially full buffer if we have
665 	 * reached the end of the data.  We must otherwise have a full buffer
666 	 * or have been given -EAGAIN.
667 	 */
668 	if (ret == 1) {
669 		if (iov_iter_count(iter) > 0)
670 			goto short_data;
671 		if (!want_more)
672 			goto read_phase_complete;
673 		ret = 0;
674 		goto out;
675 	}
676 
677 	if (!want_more)
678 		goto excess_data;
679 	goto out;
680 
681 read_phase_complete:
682 	ret = 1;
683 out:
684 	if (_service)
685 		*_service = call->dest_srx.srx_service;
686 	mutex_unlock(&call->user_mutex);
687 	_leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
688 	return ret;
689 
690 short_data:
691 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
692 			  call->cid, call->call_id, call->rx_consumed,
693 			  0, -EBADMSG);
694 	ret = -EBADMSG;
695 	goto out;
696 excess_data:
697 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
698 			  call->cid, call->call_id, call->rx_consumed,
699 			  0, -EMSGSIZE);
700 	ret = -EMSGSIZE;
701 	goto out;
702 call_failed:
703 	*_abort = call->abort_code;
704 	ret = call->error;
705 	if (call->completion == RXRPC_CALL_SUCCEEDED) {
706 		ret = 1;
707 		if (iov_iter_count(iter) > 0)
708 			ret = -ECONNRESET;
709 	}
710 	goto out;
711 }
712 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
713