1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3 *
4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5 * Written by David Howells (dhowells@redhat.com)
6 */
7
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18
19 /*
20 * Post a call for attention by the socket or kernel service. Further
21 * notifications are suppressed by putting recvmsg_link on a dummy queue.
22 */
rxrpc_notify_socket(struct rxrpc_call * call)23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25 struct rxrpc_sock *rx;
26 struct sock *sk;
27
28 _enter("%d", call->debug_id);
29
30 if (!list_empty(&call->recvmsg_link))
31 return;
32 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
33 rxrpc_see_call(call, rxrpc_call_see_notify_released);
34 return;
35 }
36
37 rcu_read_lock();
38
39 rx = rcu_dereference(call->socket);
40 sk = &rx->sk;
41 if (rx && sk->sk_state < RXRPC_CLOSE) {
42 if (call->notify_rx) {
43 spin_lock_irq(&call->notify_lock);
44 call->notify_rx(sk, call, call->user_call_ID);
45 spin_unlock_irq(&call->notify_lock);
46 } else {
47 spin_lock_irq(&rx->recvmsg_lock);
48 if (list_empty(&call->recvmsg_link)) {
49 rxrpc_get_call(call, rxrpc_call_get_notify_socket);
50 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
51 }
52 spin_unlock_irq(&rx->recvmsg_lock);
53
54 if (!sock_flag(sk, SOCK_DEAD)) {
55 _debug("call %ps", sk->sk_data_ready);
56 sk->sk_data_ready(sk);
57 }
58 }
59 }
60
61 rcu_read_unlock();
62 _leave("");
63 }
64
65 /*
66 * Pass a call terminating message to userspace.
67 */
rxrpc_recvmsg_term(struct rxrpc_call * call,struct msghdr * msg)68 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
69 {
70 u32 tmp = 0;
71 int ret;
72
73 switch (call->completion) {
74 case RXRPC_CALL_SUCCEEDED:
75 ret = 0;
76 if (rxrpc_is_service_call(call))
77 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
78 break;
79 case RXRPC_CALL_REMOTELY_ABORTED:
80 tmp = call->abort_code;
81 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
82 break;
83 case RXRPC_CALL_LOCALLY_ABORTED:
84 tmp = call->abort_code;
85 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
86 break;
87 case RXRPC_CALL_NETWORK_ERROR:
88 tmp = -call->error;
89 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
90 break;
91 case RXRPC_CALL_LOCAL_ERROR:
92 tmp = -call->error;
93 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
94 break;
95 default:
96 pr_err("Invalid terminal call state %u\n", call->completion);
97 BUG();
98 break;
99 }
100
101 trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
102 call->ackr_window - 1,
103 call->rx_pkt_offset, call->rx_pkt_len, ret);
104 return ret;
105 }
106
107 /*
108 * Discard a packet we've used up and advance the Rx window by one.
109 */
rxrpc_rotate_rx_window(struct rxrpc_call * call)110 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
111 {
112 struct rxrpc_skb_priv *sp;
113 struct sk_buff *skb;
114 rxrpc_serial_t serial;
115 rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
116 bool last;
117 int acked;
118
119 _enter("%d", call->debug_id);
120
121 skb = skb_dequeue(&call->recvmsg_queue);
122 rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
123
124 sp = rxrpc_skb(skb);
125 tseq = sp->hdr.seq;
126 serial = sp->hdr.serial;
127 last = sp->hdr.flags & RXRPC_LAST_PACKET;
128
129 /* Barrier against rxrpc_input_data(). */
130 if (after(tseq, call->rx_consumed))
131 smp_store_release(&call->rx_consumed, tseq);
132
133 rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
134
135 trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
136 serial, call->rx_consumed);
137
138 if (last)
139 set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
140
141 /* Check to see if there's an ACK that needs sending. */
142 acked = atomic_add_return(call->rx_consumed - old_consumed,
143 &call->ackr_nr_consumed);
144 if (acked > 8 &&
145 !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
146 rxrpc_poke_call(call, rxrpc_call_poke_idle);
147 }
148
149 /*
150 * Decrypt and verify a DATA packet.
151 */
rxrpc_verify_data(struct rxrpc_call * call,struct sk_buff * skb)152 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
153 {
154 struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
155
156 if (sp->flags & RXRPC_RX_VERIFIED)
157 return 0;
158 return call->security->verify_packet(call, skb);
159 }
160
161 /*
162 * Transcribe a call's user ID to a control message.
163 */
rxrpc_recvmsg_user_id(struct rxrpc_call * call,struct msghdr * msg,int flags)164 static int rxrpc_recvmsg_user_id(struct rxrpc_call *call, struct msghdr *msg,
165 int flags)
166 {
167 if (!test_bit(RXRPC_CALL_HAS_USERID, &call->flags))
168 return 0;
169
170 if (flags & MSG_CMSG_COMPAT) {
171 unsigned int id32 = call->user_call_ID;
172
173 return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
174 sizeof(unsigned int), &id32);
175 } else {
176 unsigned long idl = call->user_call_ID;
177
178 return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
179 sizeof(unsigned long), &idl);
180 }
181 }
182
183 /*
184 * Deal with a CHALLENGE packet.
185 */
rxrpc_recvmsg_challenge(struct socket * sock,struct msghdr * msg,struct sk_buff * challenge,unsigned int flags)186 static int rxrpc_recvmsg_challenge(struct socket *sock, struct msghdr *msg,
187 struct sk_buff *challenge, unsigned int flags)
188 {
189 struct rxrpc_skb_priv *sp = rxrpc_skb(challenge);
190 struct rxrpc_connection *conn = sp->chall.conn;
191
192 return conn->security->challenge_to_recvmsg(conn, challenge, msg);
193 }
194
195 /*
196 * Process OOB packets. Called with the socket locked.
197 */
rxrpc_recvmsg_oob(struct socket * sock,struct msghdr * msg,unsigned int flags)198 static int rxrpc_recvmsg_oob(struct socket *sock, struct msghdr *msg,
199 unsigned int flags)
200 {
201 struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
202 struct sk_buff *skb;
203 bool need_response = false;
204 int ret;
205
206 skb = skb_peek(&rx->recvmsg_oobq);
207 if (!skb)
208 return -EAGAIN;
209 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
210
211 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_OOB_ID, sizeof(u64),
212 &skb->skb_mstamp_ns);
213 if (ret < 0)
214 return ret;
215
216 switch ((enum rxrpc_oob_type)skb->mark) {
217 case RXRPC_OOB_CHALLENGE:
218 need_response = true;
219 ret = rxrpc_recvmsg_challenge(sock, msg, skb, flags);
220 break;
221 default:
222 WARN_ONCE(1, "recvmsg() can't process unknown OOB type %u\n",
223 skb->mark);
224 ret = -EIO;
225 break;
226 }
227
228 if (!(flags & MSG_PEEK))
229 skb_unlink(skb, &rx->recvmsg_oobq);
230 if (need_response)
231 rxrpc_add_pending_oob(rx, skb);
232 else
233 rxrpc_free_skb(skb, rxrpc_skb_put_oob);
234 return ret;
235 }
236
237 /*
238 * Deliver messages to a call. This keeps processing packets until the buffer
239 * is filled and we find either more DATA (returns 0) or the end of the DATA
240 * (returns 1). If more packets are required, it returns -EAGAIN and if the
241 * call has failed it returns -EIO.
242 */
rxrpc_recvmsg_data(struct socket * sock,struct rxrpc_call * call,struct msghdr * msg,struct iov_iter * iter,size_t len,int flags,size_t * _offset)243 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
244 struct msghdr *msg, struct iov_iter *iter,
245 size_t len, int flags, size_t *_offset)
246 {
247 struct rxrpc_skb_priv *sp;
248 struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
249 struct sk_buff *skb;
250 rxrpc_seq_t seq = 0;
251 size_t remain;
252 unsigned int rx_pkt_offset, rx_pkt_len;
253 int copy, ret = -EAGAIN, ret2;
254
255 rx_pkt_offset = call->rx_pkt_offset;
256 rx_pkt_len = call->rx_pkt_len;
257
258 if (rxrpc_call_has_failed(call)) {
259 seq = call->ackr_window - 1;
260 ret = -EIO;
261 goto done;
262 }
263
264 if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
265 seq = call->ackr_window - 1;
266 ret = 1;
267 goto done;
268 }
269
270 /* No one else can be removing stuff from the queue, so we shouldn't
271 * need the Rx lock to walk it.
272 */
273 skb = skb_peek(&call->recvmsg_queue);
274 while (skb) {
275 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
276 sp = rxrpc_skb(skb);
277 seq = sp->hdr.seq;
278
279 if (!(flags & MSG_PEEK))
280 trace_rxrpc_receive(call, rxrpc_receive_front,
281 sp->hdr.serial, seq);
282
283 if (msg)
284 sock_recv_timestamp(msg, sock->sk, skb);
285
286 if (rx_pkt_offset == 0) {
287 ret2 = rxrpc_verify_data(call, skb);
288 trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
289 sp->offset, sp->len, ret2);
290 if (ret2 < 0) {
291 ret = ret2;
292 goto out;
293 }
294 rx_pkt_offset = sp->offset;
295 rx_pkt_len = sp->len;
296 } else {
297 trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
298 rx_pkt_offset, rx_pkt_len, 0);
299 }
300
301 /* We have to handle short, empty and used-up DATA packets. */
302 remain = len - *_offset;
303 copy = rx_pkt_len;
304 if (copy > remain)
305 copy = remain;
306 if (copy > 0) {
307 ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
308 copy);
309 if (ret2 < 0) {
310 ret = ret2;
311 goto out;
312 }
313
314 /* handle piecemeal consumption of data packets */
315 rx_pkt_offset += copy;
316 rx_pkt_len -= copy;
317 *_offset += copy;
318 }
319
320 if (rx_pkt_len > 0) {
321 trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
322 rx_pkt_offset, rx_pkt_len, 0);
323 ASSERTCMP(*_offset, ==, len);
324 ret = 0;
325 break;
326 }
327
328 /* The whole packet has been transferred. */
329 if (sp->hdr.flags & RXRPC_LAST_PACKET)
330 ret = 1;
331 rx_pkt_offset = 0;
332 rx_pkt_len = 0;
333
334 skb = skb_peek_next(skb, &call->recvmsg_queue);
335
336 if (!(flags & MSG_PEEK))
337 rxrpc_rotate_rx_window(call);
338
339 if (!rx->app_ops &&
340 !skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
341 trace_rxrpc_recvdata(call, rxrpc_recvmsg_oobq, seq,
342 rx_pkt_offset, rx_pkt_len, ret);
343 break;
344 }
345 }
346
347 out:
348 if (!(flags & MSG_PEEK)) {
349 call->rx_pkt_offset = rx_pkt_offset;
350 call->rx_pkt_len = rx_pkt_len;
351 }
352
353 done:
354 trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
355 rx_pkt_offset, rx_pkt_len, ret);
356 if (ret == -EAGAIN)
357 set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
358 return ret;
359 }
360
361 /*
362 * Receive a message from an RxRPC socket
363 * - we need to be careful about two or more threads calling recvmsg
364 * simultaneously
365 */
rxrpc_recvmsg(struct socket * sock,struct msghdr * msg,size_t len,int flags)366 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
367 int flags)
368 {
369 struct rxrpc_call *call;
370 struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
371 struct list_head *l;
372 unsigned int call_debug_id = 0;
373 size_t copied = 0;
374 long timeo;
375 int ret;
376
377 DEFINE_WAIT(wait);
378
379 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
380
381 if (flags & (MSG_OOB | MSG_TRUNC))
382 return -EOPNOTSUPP;
383
384 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
385
386 try_again:
387 lock_sock(&rx->sk);
388
389 /* Return immediately if a client socket has no outstanding calls */
390 if (RB_EMPTY_ROOT(&rx->calls) &&
391 list_empty(&rx->recvmsg_q) &&
392 skb_queue_empty_lockless(&rx->recvmsg_oobq) &&
393 rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
394 release_sock(&rx->sk);
395 return -EAGAIN;
396 }
397
398 if (list_empty(&rx->recvmsg_q)) {
399 ret = -EWOULDBLOCK;
400 if (timeo == 0) {
401 call = NULL;
402 goto error_no_call;
403 }
404
405 release_sock(&rx->sk);
406
407 /* Wait for something to happen */
408 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
409 TASK_INTERRUPTIBLE);
410 ret = sock_error(&rx->sk);
411 if (ret)
412 goto wait_error;
413
414 if (list_empty(&rx->recvmsg_q) &&
415 skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
416 if (signal_pending(current))
417 goto wait_interrupted;
418 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
419 timeo = schedule_timeout(timeo);
420 }
421 finish_wait(sk_sleep(&rx->sk), &wait);
422 goto try_again;
423 }
424
425 /* Deal with OOB messages before we consider getting normal data. */
426 if (!skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
427 ret = rxrpc_recvmsg_oob(sock, msg, flags);
428 release_sock(&rx->sk);
429 if (ret == -EAGAIN)
430 goto try_again;
431 goto error_no_call;
432 }
433
434 /* Find the next call and dequeue it if we're not just peeking. If we
435 * do dequeue it, that comes with a ref that we will need to release.
436 * We also want to weed out calls that got requeued whilst we were
437 * shovelling data out.
438 */
439 spin_lock_irq(&rx->recvmsg_lock);
440 l = rx->recvmsg_q.next;
441 call = list_entry(l, struct rxrpc_call, recvmsg_link);
442
443 if (!rxrpc_call_is_complete(call) &&
444 skb_queue_empty(&call->recvmsg_queue) &&
445 skb_queue_empty(&rx->recvmsg_oobq)) {
446 list_del_init(&call->recvmsg_link);
447 spin_unlock_irq(&rx->recvmsg_lock);
448 release_sock(&rx->sk);
449 trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
450 rxrpc_put_call(call, rxrpc_call_put_recvmsg);
451 goto try_again;
452 }
453
454 rxrpc_see_call(call, rxrpc_call_see_recvmsg);
455 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
456 rxrpc_see_call(call, rxrpc_call_see_already_released);
457 list_del_init(&call->recvmsg_link);
458 spin_unlock_irq(&rx->recvmsg_lock);
459 release_sock(&rx->sk);
460 trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
461 rxrpc_put_call(call, rxrpc_call_put_recvmsg);
462 goto try_again;
463 }
464 if (!(flags & MSG_PEEK))
465 list_del_init(&call->recvmsg_link);
466 else
467 rxrpc_get_call(call, rxrpc_call_get_recvmsg);
468 spin_unlock_irq(&rx->recvmsg_lock);
469
470 call_debug_id = call->debug_id;
471 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
472
473 /* We're going to drop the socket lock, so we need to lock the call
474 * against interference by sendmsg.
475 */
476 if (!mutex_trylock(&call->user_mutex)) {
477 ret = -EWOULDBLOCK;
478 if (flags & MSG_DONTWAIT)
479 goto error_requeue_call;
480 ret = -ERESTARTSYS;
481 if (mutex_lock_interruptible(&call->user_mutex) < 0)
482 goto error_requeue_call;
483 }
484
485 release_sock(&rx->sk);
486
487 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
488 rxrpc_see_call(call, rxrpc_call_see_already_released);
489 mutex_unlock(&call->user_mutex);
490 if (!(flags & MSG_PEEK))
491 rxrpc_put_call(call, rxrpc_call_put_recvmsg);
492 goto try_again;
493 }
494
495 ret = rxrpc_recvmsg_user_id(call, msg, flags);
496 if (ret < 0)
497 goto error_unlock_call;
498
499 if (msg->msg_name && call->peer) {
500 size_t len = sizeof(call->dest_srx);
501
502 memcpy(msg->msg_name, &call->dest_srx, len);
503 msg->msg_namelen = len;
504 }
505
506 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
507 flags, &copied);
508 if (ret == -EAGAIN)
509 ret = 0;
510 if (ret == -EIO)
511 goto call_failed;
512 if (ret < 0)
513 goto error_unlock_call;
514
515 if (rxrpc_call_is_complete(call) &&
516 skb_queue_empty(&call->recvmsg_queue))
517 goto call_complete;
518 if (rxrpc_call_has_failed(call))
519 goto call_failed;
520
521 if (!skb_queue_empty(&call->recvmsg_queue))
522 rxrpc_notify_socket(call);
523 goto not_yet_complete;
524
525 call_failed:
526 rxrpc_purge_queue(&call->recvmsg_queue);
527 call_complete:
528 ret = rxrpc_recvmsg_term(call, msg);
529 if (ret < 0)
530 goto error_unlock_call;
531 if (!(flags & MSG_PEEK))
532 rxrpc_release_call(rx, call);
533 msg->msg_flags |= MSG_EOR;
534 ret = 1;
535
536 not_yet_complete:
537 if (ret == 0)
538 msg->msg_flags |= MSG_MORE;
539 else
540 msg->msg_flags &= ~MSG_MORE;
541 ret = copied;
542
543 error_unlock_call:
544 mutex_unlock(&call->user_mutex);
545 rxrpc_put_call(call, rxrpc_call_put_recvmsg);
546 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
547 return ret;
548
549 error_requeue_call:
550 if (!(flags & MSG_PEEK)) {
551 spin_lock_irq(&rx->recvmsg_lock);
552 list_add(&call->recvmsg_link, &rx->recvmsg_q);
553 spin_unlock_irq(&rx->recvmsg_lock);
554 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
555 } else {
556 rxrpc_put_call(call, rxrpc_call_put_recvmsg);
557 }
558 error_no_call:
559 release_sock(&rx->sk);
560 error_trace:
561 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
562 return ret;
563
564 wait_interrupted:
565 ret = sock_intr_errno(timeo);
566 wait_error:
567 finish_wait(sk_sleep(&rx->sk), &wait);
568 call = NULL;
569 goto error_trace;
570 }
571
572 /**
573 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
574 * @sock: The socket that the call exists on
575 * @call: The call to send data through
576 * @iter: The buffer to receive into
577 * @_len: The amount of data we want to receive (decreased on return)
578 * @want_more: True if more data is expected to be read
579 * @_abort: Where the abort code is stored if -ECONNABORTED is returned
580 * @_service: Where to store the actual service ID (may be upgraded)
581 *
582 * Allow a kernel service to receive data and pick up information about the
583 * state of a call. Note that *@_abort should also be initialised to %0.
584 *
585 * Note that we may return %-EAGAIN to drain empty packets at the end
586 * of the data, even if we've already copied over the requested data.
587 *
588 * Return: %0 if got what was asked for and there's more available, %1
589 * if we got what was asked for and we're at the end of the data and
590 * %-EAGAIN if we need more data.
591 */
rxrpc_kernel_recv_data(struct socket * sock,struct rxrpc_call * call,struct iov_iter * iter,size_t * _len,bool want_more,u32 * _abort,u16 * _service)592 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
593 struct iov_iter *iter, size_t *_len,
594 bool want_more, u32 *_abort, u16 *_service)
595 {
596 size_t offset = 0;
597 int ret;
598
599 _enter("{%d},%zu,%d", call->debug_id, *_len, want_more);
600
601 mutex_lock(&call->user_mutex);
602
603 ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
604 *_len -= offset;
605 if (ret == -EIO)
606 goto call_failed;
607 if (ret < 0)
608 goto out;
609
610 /* We can only reach here with a partially full buffer if we have
611 * reached the end of the data. We must otherwise have a full buffer
612 * or have been given -EAGAIN.
613 */
614 if (ret == 1) {
615 if (iov_iter_count(iter) > 0)
616 goto short_data;
617 if (!want_more)
618 goto read_phase_complete;
619 ret = 0;
620 goto out;
621 }
622
623 if (!want_more)
624 goto excess_data;
625 goto out;
626
627 read_phase_complete:
628 ret = 1;
629 out:
630 if (_service)
631 *_service = call->dest_srx.srx_service;
632 mutex_unlock(&call->user_mutex);
633 _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
634 return ret;
635
636 short_data:
637 trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
638 call->cid, call->call_id, call->rx_consumed,
639 0, -EBADMSG);
640 ret = -EBADMSG;
641 goto out;
642 excess_data:
643 trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
644 call->cid, call->call_id, call->rx_consumed,
645 0, -EMSGSIZE);
646 ret = -EMSGSIZE;
647 goto out;
648 call_failed:
649 *_abort = call->abort_code;
650 ret = call->error;
651 if (call->completion == RXRPC_CALL_SUCCEEDED) {
652 ret = 1;
653 if (iov_iter_count(iter) > 0)
654 ret = -ECONNRESET;
655 }
656 goto out;
657 }
658 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
659