xref: /linux/net/rxrpc/recvmsg.c (revision 6832a9317eee280117cd695fa885b2b7a7a38daf)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14 
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18 
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
rxrpc_notify_socket(struct rxrpc_call * call)23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25 	struct rxrpc_sock *rx;
26 	struct sock *sk;
27 
28 	_enter("%d", call->debug_id);
29 
30 	if (!list_empty(&call->recvmsg_link))
31 		return;
32 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
33 		rxrpc_see_call(call, rxrpc_call_see_notify_released);
34 		return;
35 	}
36 
37 	rcu_read_lock();
38 
39 	rx = rcu_dereference(call->socket);
40 	sk = &rx->sk;
41 	if (rx && sk->sk_state < RXRPC_CLOSE) {
42 		if (call->notify_rx) {
43 			spin_lock_irq(&call->notify_lock);
44 			call->notify_rx(sk, call, call->user_call_ID);
45 			spin_unlock_irq(&call->notify_lock);
46 		} else {
47 			spin_lock_irq(&rx->recvmsg_lock);
48 			if (list_empty(&call->recvmsg_link)) {
49 				rxrpc_get_call(call, rxrpc_call_get_notify_socket);
50 				list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
51 			}
52 			spin_unlock_irq(&rx->recvmsg_lock);
53 
54 			if (!sock_flag(sk, SOCK_DEAD)) {
55 				_debug("call %ps", sk->sk_data_ready);
56 				sk->sk_data_ready(sk);
57 			}
58 		}
59 	}
60 
61 	rcu_read_unlock();
62 	_leave("");
63 }
64 
65 /*
66  * Pass a call terminating message to userspace.
67  */
rxrpc_recvmsg_term(struct rxrpc_call * call,struct msghdr * msg)68 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
69 {
70 	u32 tmp = 0;
71 	int ret;
72 
73 	switch (call->completion) {
74 	case RXRPC_CALL_SUCCEEDED:
75 		ret = 0;
76 		if (rxrpc_is_service_call(call))
77 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
78 		break;
79 	case RXRPC_CALL_REMOTELY_ABORTED:
80 		tmp = call->abort_code;
81 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
82 		break;
83 	case RXRPC_CALL_LOCALLY_ABORTED:
84 		tmp = call->abort_code;
85 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
86 		break;
87 	case RXRPC_CALL_NETWORK_ERROR:
88 		tmp = -call->error;
89 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
90 		break;
91 	case RXRPC_CALL_LOCAL_ERROR:
92 		tmp = -call->error;
93 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
94 		break;
95 	default:
96 		pr_err("Invalid terminal call state %u\n", call->completion);
97 		BUG();
98 		break;
99 	}
100 
101 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
102 			     call->ackr_window - 1,
103 			     call->rx_pkt_offset, call->rx_pkt_len, ret);
104 	return ret;
105 }
106 
107 /*
108  * Discard a packet we've used up and advance the Rx window by one.
109  */
rxrpc_rotate_rx_window(struct rxrpc_call * call)110 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
111 {
112 	struct rxrpc_skb_priv *sp;
113 	struct sk_buff *skb;
114 	rxrpc_serial_t serial;
115 	rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
116 	bool last;
117 	int acked;
118 
119 	_enter("%d", call->debug_id);
120 
121 	skb = skb_dequeue(&call->recvmsg_queue);
122 	rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
123 
124 	sp = rxrpc_skb(skb);
125 	tseq   = sp->hdr.seq;
126 	serial = sp->hdr.serial;
127 	last   = sp->hdr.flags & RXRPC_LAST_PACKET;
128 
129 	/* Barrier against rxrpc_input_data(). */
130 	if (after(tseq, call->rx_consumed))
131 		smp_store_release(&call->rx_consumed, tseq);
132 
133 	rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
134 
135 	trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
136 			    serial, call->rx_consumed);
137 
138 	if (last)
139 		set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
140 
141 	/* Check to see if there's an ACK that needs sending. */
142 	acked = atomic_add_return(call->rx_consumed - old_consumed,
143 				  &call->ackr_nr_consumed);
144 	if (acked > 8 &&
145 	    !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
146 		rxrpc_poke_call(call, rxrpc_call_poke_idle);
147 }
148 
149 /*
150  * Decrypt and verify a DATA packet.
151  */
rxrpc_verify_data(struct rxrpc_call * call,struct sk_buff * skb)152 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
153 {
154 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
155 
156 	if (sp->flags & RXRPC_RX_VERIFIED)
157 		return 0;
158 	return call->security->verify_packet(call, skb);
159 }
160 
161 /*
162  * Transcribe a call's user ID to a control message.
163  */
rxrpc_recvmsg_user_id(struct rxrpc_call * call,struct msghdr * msg,int flags)164 static int rxrpc_recvmsg_user_id(struct rxrpc_call *call, struct msghdr *msg,
165 				 int flags)
166 {
167 	if (!test_bit(RXRPC_CALL_HAS_USERID, &call->flags))
168 		return 0;
169 
170 	if (flags & MSG_CMSG_COMPAT) {
171 		unsigned int id32 = call->user_call_ID;
172 
173 		return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
174 				sizeof(unsigned int), &id32);
175 	} else {
176 		unsigned long idl = call->user_call_ID;
177 
178 		return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
179 				sizeof(unsigned long), &idl);
180 	}
181 }
182 
183 /*
184  * Deal with a CHALLENGE packet.
185  */
rxrpc_recvmsg_challenge(struct socket * sock,struct msghdr * msg,struct sk_buff * challenge,unsigned int flags)186 static int rxrpc_recvmsg_challenge(struct socket *sock, struct msghdr *msg,
187 				   struct sk_buff *challenge, unsigned int flags)
188 {
189 	struct rxrpc_skb_priv *sp = rxrpc_skb(challenge);
190 	struct rxrpc_connection *conn = sp->chall.conn;
191 
192 	return conn->security->challenge_to_recvmsg(conn, challenge, msg);
193 }
194 
195 /*
196  * Process OOB packets.  Called with the socket locked.
197  */
rxrpc_recvmsg_oob(struct socket * sock,struct msghdr * msg,unsigned int flags)198 static int rxrpc_recvmsg_oob(struct socket *sock, struct msghdr *msg,
199 			     unsigned int flags)
200 {
201 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
202 	struct sk_buff *skb;
203 	bool need_response = false;
204 	int ret;
205 
206 	skb = skb_peek(&rx->recvmsg_oobq);
207 	if (!skb)
208 		return -EAGAIN;
209 	rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
210 
211 	ret = put_cmsg(msg, SOL_RXRPC, RXRPC_OOB_ID, sizeof(u64),
212 		       &skb->skb_mstamp_ns);
213 	if (ret < 0)
214 		return ret;
215 
216 	switch ((enum rxrpc_oob_type)skb->mark) {
217 	case RXRPC_OOB_CHALLENGE:
218 		need_response = true;
219 		ret = rxrpc_recvmsg_challenge(sock, msg, skb, flags);
220 		break;
221 	default:
222 		WARN_ONCE(1, "recvmsg() can't process unknown OOB type %u\n",
223 			  skb->mark);
224 		ret = -EIO;
225 		break;
226 	}
227 
228 	if (!(flags & MSG_PEEK))
229 		skb_unlink(skb, &rx->recvmsg_oobq);
230 	if (need_response)
231 		rxrpc_add_pending_oob(rx, skb);
232 	else
233 		rxrpc_free_skb(skb, rxrpc_skb_put_oob);
234 	return ret;
235 }
236 
237 /*
238  * Deliver messages to a call.  This keeps processing packets until the buffer
239  * is filled and we find either more DATA (returns 0) or the end of the DATA
240  * (returns 1).  If more packets are required, it returns -EAGAIN and if the
241  * call has failed it returns -EIO.
242  */
rxrpc_recvmsg_data(struct socket * sock,struct rxrpc_call * call,struct msghdr * msg,struct iov_iter * iter,size_t len,int flags,size_t * _offset)243 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
244 			      struct msghdr *msg, struct iov_iter *iter,
245 			      size_t len, int flags, size_t *_offset)
246 {
247 	struct rxrpc_skb_priv *sp;
248 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
249 	struct sk_buff *skb;
250 	rxrpc_seq_t seq = 0;
251 	size_t remain;
252 	unsigned int rx_pkt_offset, rx_pkt_len;
253 	int copy, ret = -EAGAIN, ret2;
254 
255 	rx_pkt_offset = call->rx_pkt_offset;
256 	rx_pkt_len = call->rx_pkt_len;
257 
258 	if (rxrpc_call_has_failed(call)) {
259 		seq = call->ackr_window - 1;
260 		ret = -EIO;
261 		goto done;
262 	}
263 
264 	if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
265 		seq = call->ackr_window - 1;
266 		ret = 1;
267 		goto done;
268 	}
269 
270 	/* No one else can be removing stuff from the queue, so we shouldn't
271 	 * need the Rx lock to walk it.
272 	 */
273 	skb = skb_peek(&call->recvmsg_queue);
274 	while (skb) {
275 		rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
276 		sp = rxrpc_skb(skb);
277 		seq = sp->hdr.seq;
278 
279 		if (!(flags & MSG_PEEK))
280 			trace_rxrpc_receive(call, rxrpc_receive_front,
281 					    sp->hdr.serial, seq);
282 
283 		if (msg)
284 			sock_recv_timestamp(msg, sock->sk, skb);
285 
286 		if (rx_pkt_offset == 0) {
287 			ret2 = rxrpc_verify_data(call, skb);
288 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
289 					     sp->offset, sp->len, ret2);
290 			if (ret2 < 0) {
291 				ret = ret2;
292 				goto out;
293 			}
294 			rx_pkt_offset = sp->offset;
295 			rx_pkt_len = sp->len;
296 		} else {
297 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
298 					     rx_pkt_offset, rx_pkt_len, 0);
299 		}
300 
301 		/* We have to handle short, empty and used-up DATA packets. */
302 		remain = len - *_offset;
303 		copy = rx_pkt_len;
304 		if (copy > remain)
305 			copy = remain;
306 		if (copy > 0) {
307 			ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
308 						      copy);
309 			if (ret2 < 0) {
310 				ret = ret2;
311 				goto out;
312 			}
313 
314 			/* handle piecemeal consumption of data packets */
315 			rx_pkt_offset += copy;
316 			rx_pkt_len -= copy;
317 			*_offset += copy;
318 		}
319 
320 		if (rx_pkt_len > 0) {
321 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
322 					     rx_pkt_offset, rx_pkt_len, 0);
323 			ASSERTCMP(*_offset, ==, len);
324 			ret = 0;
325 			break;
326 		}
327 
328 		/* The whole packet has been transferred. */
329 		if (sp->hdr.flags & RXRPC_LAST_PACKET)
330 			ret = 1;
331 		rx_pkt_offset = 0;
332 		rx_pkt_len = 0;
333 
334 		skb = skb_peek_next(skb, &call->recvmsg_queue);
335 
336 		if (!(flags & MSG_PEEK))
337 			rxrpc_rotate_rx_window(call);
338 
339 		if (!rx->app_ops &&
340 		    !skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
341 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_oobq, seq,
342 					     rx_pkt_offset, rx_pkt_len, ret);
343 			break;
344 		}
345 	}
346 
347 out:
348 	if (!(flags & MSG_PEEK)) {
349 		call->rx_pkt_offset = rx_pkt_offset;
350 		call->rx_pkt_len = rx_pkt_len;
351 	}
352 
353 done:
354 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
355 			     rx_pkt_offset, rx_pkt_len, ret);
356 	if (ret == -EAGAIN)
357 		set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
358 	return ret;
359 }
360 
361 /*
362  * Receive a message from an RxRPC socket
363  * - we need to be careful about two or more threads calling recvmsg
364  *   simultaneously
365  */
rxrpc_recvmsg(struct socket * sock,struct msghdr * msg,size_t len,int flags)366 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
367 		  int flags)
368 {
369 	struct rxrpc_call *call;
370 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
371 	struct list_head *l;
372 	unsigned int call_debug_id = 0;
373 	size_t copied = 0;
374 	long timeo;
375 	int ret;
376 
377 	DEFINE_WAIT(wait);
378 
379 	trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
380 
381 	if (flags & (MSG_OOB | MSG_TRUNC))
382 		return -EOPNOTSUPP;
383 
384 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
385 
386 try_again:
387 	lock_sock(&rx->sk);
388 
389 	/* Return immediately if a client socket has no outstanding calls */
390 	if (RB_EMPTY_ROOT(&rx->calls) &&
391 	    list_empty(&rx->recvmsg_q) &&
392 	    skb_queue_empty_lockless(&rx->recvmsg_oobq) &&
393 	    rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
394 		release_sock(&rx->sk);
395 		return -EAGAIN;
396 	}
397 
398 	if (list_empty(&rx->recvmsg_q)) {
399 		ret = -EWOULDBLOCK;
400 		if (timeo == 0) {
401 			call = NULL;
402 			goto error_no_call;
403 		}
404 
405 		release_sock(&rx->sk);
406 
407 		/* Wait for something to happen */
408 		prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
409 					  TASK_INTERRUPTIBLE);
410 		ret = sock_error(&rx->sk);
411 		if (ret)
412 			goto wait_error;
413 
414 		if (list_empty(&rx->recvmsg_q) &&
415 		    skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
416 			if (signal_pending(current))
417 				goto wait_interrupted;
418 			trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
419 			timeo = schedule_timeout(timeo);
420 		}
421 		finish_wait(sk_sleep(&rx->sk), &wait);
422 		goto try_again;
423 	}
424 
425 	/* Deal with OOB messages before we consider getting normal data. */
426 	if (!skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
427 		ret = rxrpc_recvmsg_oob(sock, msg, flags);
428 		release_sock(&rx->sk);
429 		if (ret == -EAGAIN)
430 			goto try_again;
431 		goto error_no_call;
432 	}
433 
434 	/* Find the next call and dequeue it if we're not just peeking.  If we
435 	 * do dequeue it, that comes with a ref that we will need to release.
436 	 * We also want to weed out calls that got requeued whilst we were
437 	 * shovelling data out.
438 	 */
439 	spin_lock_irq(&rx->recvmsg_lock);
440 	l = rx->recvmsg_q.next;
441 	call = list_entry(l, struct rxrpc_call, recvmsg_link);
442 
443 	if (!rxrpc_call_is_complete(call) &&
444 	    skb_queue_empty(&call->recvmsg_queue) &&
445 	    skb_queue_empty(&rx->recvmsg_oobq)) {
446 		list_del_init(&call->recvmsg_link);
447 		spin_unlock_irq(&rx->recvmsg_lock);
448 		release_sock(&rx->sk);
449 		trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
450 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
451 		goto try_again;
452 	}
453 
454 	rxrpc_see_call(call, rxrpc_call_see_recvmsg);
455 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
456 		rxrpc_see_call(call, rxrpc_call_see_already_released);
457 		list_del_init(&call->recvmsg_link);
458 		spin_unlock_irq(&rx->recvmsg_lock);
459 		release_sock(&rx->sk);
460 		trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
461 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
462 		goto try_again;
463 	}
464 	if (!(flags & MSG_PEEK))
465 		list_del_init(&call->recvmsg_link);
466 	else
467 		rxrpc_get_call(call, rxrpc_call_get_recvmsg);
468 	spin_unlock_irq(&rx->recvmsg_lock);
469 
470 	call_debug_id = call->debug_id;
471 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
472 
473 	/* We're going to drop the socket lock, so we need to lock the call
474 	 * against interference by sendmsg.
475 	 */
476 	if (!mutex_trylock(&call->user_mutex)) {
477 		ret = -EWOULDBLOCK;
478 		if (flags & MSG_DONTWAIT)
479 			goto error_requeue_call;
480 		ret = -ERESTARTSYS;
481 		if (mutex_lock_interruptible(&call->user_mutex) < 0)
482 			goto error_requeue_call;
483 	}
484 
485 	release_sock(&rx->sk);
486 
487 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
488 		rxrpc_see_call(call, rxrpc_call_see_already_released);
489 		mutex_unlock(&call->user_mutex);
490 		if (!(flags & MSG_PEEK))
491 			rxrpc_put_call(call, rxrpc_call_put_recvmsg);
492 		goto try_again;
493 	}
494 
495 	ret = rxrpc_recvmsg_user_id(call, msg, flags);
496 	if (ret < 0)
497 		goto error_unlock_call;
498 
499 	if (msg->msg_name && call->peer) {
500 		size_t len = sizeof(call->dest_srx);
501 
502 		memcpy(msg->msg_name, &call->dest_srx, len);
503 		msg->msg_namelen = len;
504 	}
505 
506 	ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
507 				 flags, &copied);
508 	if (ret == -EAGAIN)
509 		ret = 0;
510 	if (ret == -EIO)
511 		goto call_failed;
512 	if (ret < 0)
513 		goto error_unlock_call;
514 
515 	if (rxrpc_call_is_complete(call) &&
516 	    skb_queue_empty(&call->recvmsg_queue))
517 		goto call_complete;
518 	if (rxrpc_call_has_failed(call))
519 		goto call_failed;
520 
521 	if (!skb_queue_empty(&call->recvmsg_queue))
522 		rxrpc_notify_socket(call);
523 	goto not_yet_complete;
524 
525 call_failed:
526 	rxrpc_purge_queue(&call->recvmsg_queue);
527 call_complete:
528 	ret = rxrpc_recvmsg_term(call, msg);
529 	if (ret < 0)
530 		goto error_unlock_call;
531 	if (!(flags & MSG_PEEK))
532 		rxrpc_release_call(rx, call);
533 	msg->msg_flags |= MSG_EOR;
534 	ret = 1;
535 
536 not_yet_complete:
537 	if (ret == 0)
538 		msg->msg_flags |= MSG_MORE;
539 	else
540 		msg->msg_flags &= ~MSG_MORE;
541 	ret = copied;
542 
543 error_unlock_call:
544 	mutex_unlock(&call->user_mutex);
545 	rxrpc_put_call(call, rxrpc_call_put_recvmsg);
546 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
547 	return ret;
548 
549 error_requeue_call:
550 	if (!(flags & MSG_PEEK)) {
551 		spin_lock_irq(&rx->recvmsg_lock);
552 		list_add(&call->recvmsg_link, &rx->recvmsg_q);
553 		spin_unlock_irq(&rx->recvmsg_lock);
554 		trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
555 	} else {
556 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
557 	}
558 error_no_call:
559 	release_sock(&rx->sk);
560 error_trace:
561 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
562 	return ret;
563 
564 wait_interrupted:
565 	ret = sock_intr_errno(timeo);
566 wait_error:
567 	finish_wait(sk_sleep(&rx->sk), &wait);
568 	call = NULL;
569 	goto error_trace;
570 }
571 
572 /**
573  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
574  * @sock: The socket that the call exists on
575  * @call: The call to send data through
576  * @iter: The buffer to receive into
577  * @_len: The amount of data we want to receive (decreased on return)
578  * @want_more: True if more data is expected to be read
579  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
580  * @_service: Where to store the actual service ID (may be upgraded)
581  *
582  * Allow a kernel service to receive data and pick up information about the
583  * state of a call.  Note that *@_abort should also be initialised to %0.
584  *
585  * Note that we may return %-EAGAIN to drain empty packets at the end
586  * of the data, even if we've already copied over the requested data.
587  *
588  * Return: %0 if got what was asked for and there's more available, %1
589  * if we got what was asked for and we're at the end of the data and
590  * %-EAGAIN if we need more data.
591  */
rxrpc_kernel_recv_data(struct socket * sock,struct rxrpc_call * call,struct iov_iter * iter,size_t * _len,bool want_more,u32 * _abort,u16 * _service)592 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
593 			   struct iov_iter *iter, size_t *_len,
594 			   bool want_more, u32 *_abort, u16 *_service)
595 {
596 	size_t offset = 0;
597 	int ret;
598 
599 	_enter("{%d},%zu,%d", call->debug_id, *_len, want_more);
600 
601 	mutex_lock(&call->user_mutex);
602 
603 	ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
604 	*_len -= offset;
605 	if (ret == -EIO)
606 		goto call_failed;
607 	if (ret < 0)
608 		goto out;
609 
610 	/* We can only reach here with a partially full buffer if we have
611 	 * reached the end of the data.  We must otherwise have a full buffer
612 	 * or have been given -EAGAIN.
613 	 */
614 	if (ret == 1) {
615 		if (iov_iter_count(iter) > 0)
616 			goto short_data;
617 		if (!want_more)
618 			goto read_phase_complete;
619 		ret = 0;
620 		goto out;
621 	}
622 
623 	if (!want_more)
624 		goto excess_data;
625 	goto out;
626 
627 read_phase_complete:
628 	ret = 1;
629 out:
630 	if (_service)
631 		*_service = call->dest_srx.srx_service;
632 	mutex_unlock(&call->user_mutex);
633 	_leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
634 	return ret;
635 
636 short_data:
637 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
638 			  call->cid, call->call_id, call->rx_consumed,
639 			  0, -EBADMSG);
640 	ret = -EBADMSG;
641 	goto out;
642 excess_data:
643 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
644 			  call->cid, call->call_id, call->rx_consumed,
645 			  0, -EMSGSIZE);
646 	ret = -EMSGSIZE;
647 	goto out;
648 call_failed:
649 	*_abort = call->abort_code;
650 	ret = call->error;
651 	if (call->completion == RXRPC_CALL_SUCCEEDED) {
652 		ret = 1;
653 		if (iov_iter_count(iter) > 0)
654 			ret = -ECONNRESET;
655 	}
656 	goto out;
657 }
658 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
659