1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC recvmsg() implementation 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/net.h> 11 #include <linux/skbuff.h> 12 #include <linux/export.h> 13 #include <linux/sched/signal.h> 14 15 #include <net/sock.h> 16 #include <net/af_rxrpc.h> 17 #include "ar-internal.h" 18 19 /* 20 * Post a call for attention by the socket or kernel service. Further 21 * notifications are suppressed by putting recvmsg_link on a dummy queue. 22 */ 23 void rxrpc_notify_socket(struct rxrpc_call *call) 24 { 25 struct rxrpc_sock *rx; 26 struct sock *sk; 27 28 _enter("%d", call->debug_id); 29 30 if (!list_empty(&call->recvmsg_link)) 31 return; 32 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 33 rxrpc_see_call(call, rxrpc_call_see_notify_released); 34 return; 35 } 36 37 rcu_read_lock(); 38 39 rx = rcu_dereference(call->socket); 40 sk = &rx->sk; 41 if (rx && sk->sk_state < RXRPC_CLOSE) { 42 if (call->notify_rx) { 43 spin_lock_irq(&call->notify_lock); 44 call->notify_rx(sk, call, call->user_call_ID); 45 spin_unlock_irq(&call->notify_lock); 46 } else { 47 spin_lock_irq(&rx->recvmsg_lock); 48 if (list_empty(&call->recvmsg_link)) { 49 rxrpc_get_call(call, rxrpc_call_get_notify_socket); 50 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q); 51 } 52 spin_unlock_irq(&rx->recvmsg_lock); 53 54 if (!sock_flag(sk, SOCK_DEAD)) { 55 _debug("call %ps", sk->sk_data_ready); 56 sk->sk_data_ready(sk); 57 } 58 } 59 } 60 61 rcu_read_unlock(); 62 _leave(""); 63 } 64 65 /* 66 * Pass a call terminating message to userspace. 67 */ 68 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) 69 { 70 u32 tmp = 0; 71 int ret; 72 73 switch (call->completion) { 74 case RXRPC_CALL_SUCCEEDED: 75 ret = 0; 76 if (rxrpc_is_service_call(call)) 77 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp); 78 break; 79 case RXRPC_CALL_REMOTELY_ABORTED: 80 tmp = call->abort_code; 81 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 82 break; 83 case RXRPC_CALL_LOCALLY_ABORTED: 84 tmp = call->abort_code; 85 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 86 break; 87 case RXRPC_CALL_NETWORK_ERROR: 88 tmp = -call->error; 89 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp); 90 break; 91 case RXRPC_CALL_LOCAL_ERROR: 92 tmp = -call->error; 93 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp); 94 break; 95 default: 96 pr_err("Invalid terminal call state %u\n", call->completion); 97 BUG(); 98 break; 99 } 100 101 trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, 102 call->ackr_window - 1, 103 call->rx_pkt_offset, call->rx_pkt_len, ret); 104 return ret; 105 } 106 107 /* 108 * Discard a packet we've used up and advance the Rx window by one. 109 */ 110 static void rxrpc_rotate_rx_window(struct rxrpc_call *call) 111 { 112 struct rxrpc_skb_priv *sp; 113 struct sk_buff *skb; 114 rxrpc_serial_t serial; 115 rxrpc_seq_t old_consumed = call->rx_consumed, tseq; 116 bool last; 117 int acked; 118 119 _enter("%d", call->debug_id); 120 121 skb = skb_dequeue(&call->recvmsg_queue); 122 rxrpc_see_skb(skb, rxrpc_skb_see_rotate); 123 124 sp = rxrpc_skb(skb); 125 tseq = sp->hdr.seq; 126 serial = sp->hdr.serial; 127 last = sp->hdr.flags & RXRPC_LAST_PACKET; 128 129 /* Barrier against rxrpc_input_data(). */ 130 if (after(tseq, call->rx_consumed)) 131 smp_store_release(&call->rx_consumed, tseq); 132 133 rxrpc_free_skb(skb, rxrpc_skb_put_rotate); 134 135 trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate, 136 serial, call->rx_consumed); 137 138 if (last) 139 set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags); 140 141 /* Check to see if there's an ACK that needs sending. */ 142 acked = atomic_add_return(call->rx_consumed - old_consumed, 143 &call->ackr_nr_consumed); 144 if (acked > 8 && 145 !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags)) 146 rxrpc_poke_call(call, rxrpc_call_poke_idle); 147 } 148 149 /* 150 * Decrypt and verify a DATA packet. The content of the packet is pulled out 151 * into a flat buffer rather than decrypting in place in the skbuff. This also 152 * has the advantage of aligning the buffer correctly for the crypto routines. 153 * 154 * We keep track of the sequence number of the packet currently decrypted into 155 * the buffer in ->rx_dec_seq. If MSG_PEEK is used and steps onto a new 156 * packet, subsequent recvmsg() calls will have to go back and re-decrypt the 157 * current packet. 158 */ 159 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb) 160 { 161 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 162 int ret; 163 164 if (sp->len > call->rx_dec_bsize || !call->rx_dec_buffer) { 165 /* Make sure we can hold a 1412-byte jumbo subpacket and make 166 * sure that the buffer size is aligned to a crypto blocksize. 167 */ 168 size_t size = clamp(round_up(sp->len, 32), 2048, 65535); 169 void *buffer = krealloc(call->rx_dec_buffer, size, GFP_NOFS); 170 171 if (!buffer) 172 return -ENOMEM; 173 call->rx_dec_buffer = buffer; 174 call->rx_dec_bsize = size; 175 } 176 177 ret = -EFAULT; 178 if (skb_copy_bits(skb, sp->offset, call->rx_dec_buffer, sp->len) < 0) 179 goto err; 180 181 call->rx_dec_offset = 0; 182 call->rx_dec_len = sp->len; 183 call->rx_dec_seq = sp->hdr.seq; 184 ret = call->security->verify_packet(call, skb); 185 if (ret < 0) 186 goto err; 187 return 0; 188 189 err: 190 kfree(call->rx_dec_buffer); 191 call->rx_dec_buffer = NULL; 192 call->rx_dec_bsize = 0; 193 call->rx_dec_offset = 0; 194 call->rx_dec_len = 0; 195 return ret; 196 } 197 198 /* 199 * Transcribe a call's user ID to a control message. 200 */ 201 static int rxrpc_recvmsg_user_id(struct rxrpc_call *call, struct msghdr *msg, 202 int flags) 203 { 204 if (!test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) 205 return 0; 206 207 if (flags & MSG_CMSG_COMPAT) { 208 unsigned int id32 = call->user_call_ID; 209 210 return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 211 sizeof(unsigned int), &id32); 212 } else { 213 unsigned long idl = call->user_call_ID; 214 215 return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 216 sizeof(unsigned long), &idl); 217 } 218 } 219 220 /* 221 * Deal with a CHALLENGE packet. 222 */ 223 static int rxrpc_recvmsg_challenge(struct socket *sock, struct msghdr *msg, 224 struct sk_buff *challenge, unsigned int flags) 225 { 226 struct rxrpc_skb_priv *sp = rxrpc_skb(challenge); 227 struct rxrpc_connection *conn = sp->chall.conn; 228 229 return conn->security->challenge_to_recvmsg(conn, challenge, msg); 230 } 231 232 /* 233 * Process OOB packets. Called with the socket locked. 234 */ 235 static int rxrpc_recvmsg_oob(struct socket *sock, struct msghdr *msg, 236 unsigned int flags) 237 { 238 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 239 struct sk_buff *skb; 240 bool need_response = false; 241 int ret; 242 243 skb = skb_peek(&rx->recvmsg_oobq); 244 if (!skb) 245 return -EAGAIN; 246 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg); 247 248 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_OOB_ID, sizeof(u64), 249 &skb->skb_mstamp_ns); 250 if (ret < 0) 251 return ret; 252 253 switch ((enum rxrpc_oob_type)skb->mark) { 254 case RXRPC_OOB_CHALLENGE: 255 need_response = true; 256 ret = rxrpc_recvmsg_challenge(sock, msg, skb, flags); 257 break; 258 default: 259 WARN_ONCE(1, "recvmsg() can't process unknown OOB type %u\n", 260 skb->mark); 261 ret = -EIO; 262 break; 263 } 264 265 if (!(flags & MSG_PEEK)) { 266 skb_unlink(skb, &rx->recvmsg_oobq); 267 if (need_response) 268 rxrpc_add_pending_oob(rx, skb); 269 else 270 rxrpc_free_skb(skb, rxrpc_skb_put_oob); 271 } 272 return ret; 273 } 274 275 /* 276 * Deliver messages to a call. This keeps processing packets until the buffer 277 * is filled and we find either more DATA (returns 0) or the end of the DATA 278 * (returns 1). If more packets are required, it returns -EAGAIN and if the 279 * call has failed it returns -EIO. 280 */ 281 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, 282 struct msghdr *msg, struct iov_iter *iter, 283 size_t len, int flags, size_t *_offset) 284 { 285 struct rxrpc_skb_priv *sp; 286 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 287 struct sk_buff *skb; 288 rxrpc_seq_t seq = 0; 289 size_t remain; 290 unsigned int rx_pkt_offset, rx_pkt_len; 291 int copy, ret = -EAGAIN, ret2; 292 293 rx_pkt_offset = call->rx_pkt_offset; 294 rx_pkt_len = call->rx_pkt_len; 295 296 if (rxrpc_call_has_failed(call)) { 297 seq = call->ackr_window - 1; 298 ret = -EIO; 299 goto done; 300 } 301 302 if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) { 303 seq = call->ackr_window - 1; 304 ret = 1; 305 goto done; 306 } 307 308 /* No one else can be removing stuff from the queue, so we shouldn't 309 * need the Rx lock to walk it. 310 */ 311 skb = skb_peek(&call->recvmsg_queue); 312 while (skb) { 313 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg); 314 sp = rxrpc_skb(skb); 315 seq = sp->hdr.seq; 316 317 if (!(flags & MSG_PEEK)) 318 trace_rxrpc_receive(call, rxrpc_receive_front, 319 sp->hdr.serial, seq); 320 321 if (msg) 322 sock_recv_timestamp(msg, sock->sk, skb); 323 324 if (call->rx_dec_seq != sp->hdr.seq || 325 !call->rx_dec_buffer) { 326 ret2 = rxrpc_verify_data(call, skb); 327 trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq, 328 call->rx_dec_offset, 329 call->rx_dec_len, ret2); 330 if (ret2 < 0) { 331 ret = ret2; 332 goto out; 333 } 334 } 335 336 if (rx_pkt_offset == USHRT_MAX) { 337 rx_pkt_offset = call->rx_dec_offset; 338 rx_pkt_len = call->rx_dec_len; 339 } else { 340 trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq, 341 rx_pkt_offset, rx_pkt_len, 0); 342 } 343 344 /* We have to handle short, empty and used-up DATA packets. */ 345 remain = len - *_offset; 346 copy = rx_pkt_len; 347 if (copy > remain) 348 copy = remain; 349 if (copy > 0) { 350 ret2 = copy_to_iter(call->rx_dec_buffer + rx_pkt_offset, 351 copy, iter); 352 if (ret2 != copy) { 353 ret = -EFAULT; 354 goto out; 355 } 356 357 /* handle piecemeal consumption of data packets */ 358 rx_pkt_offset += copy; 359 rx_pkt_len -= copy; 360 *_offset += copy; 361 } 362 363 if (rx_pkt_len > 0) { 364 trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq, 365 rx_pkt_offset, rx_pkt_len, 0); 366 ASSERTCMP(*_offset, ==, len); 367 ret = 0; 368 break; 369 } 370 371 /* The whole packet has been transferred. */ 372 if (sp->hdr.flags & RXRPC_LAST_PACKET) 373 ret = 1; 374 rx_pkt_offset = USHRT_MAX; 375 rx_pkt_len = 0; 376 377 skb = skb_peek_next(skb, &call->recvmsg_queue); 378 379 if (!(flags & MSG_PEEK)) 380 rxrpc_rotate_rx_window(call); 381 382 if (!rx->app_ops && 383 !skb_queue_empty_lockless(&rx->recvmsg_oobq)) { 384 trace_rxrpc_recvdata(call, rxrpc_recvmsg_oobq, seq, 385 rx_pkt_offset, rx_pkt_len, ret); 386 break; 387 } 388 } 389 390 out: 391 if (!(flags & MSG_PEEK)) { 392 call->rx_pkt_offset = rx_pkt_offset; 393 call->rx_pkt_len = rx_pkt_len; 394 } 395 396 done: 397 trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq, 398 rx_pkt_offset, rx_pkt_len, ret); 399 if (ret == -EAGAIN) 400 set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags); 401 return ret; 402 } 403 404 /* 405 * Receive a message from an RxRPC socket 406 * - we need to be careful about two or more threads calling recvmsg 407 * simultaneously 408 */ 409 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 410 int flags) 411 { 412 struct rxrpc_call *call; 413 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 414 struct list_head *l; 415 unsigned int call_debug_id = 0; 416 size_t copied = 0; 417 long timeo; 418 int ret; 419 420 DEFINE_WAIT(wait); 421 422 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0); 423 424 if (flags & (MSG_OOB | MSG_TRUNC)) 425 return -EOPNOTSUPP; 426 427 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 428 429 try_again: 430 lock_sock(&rx->sk); 431 432 /* Return immediately if a client socket has no outstanding calls */ 433 if (RB_EMPTY_ROOT(&rx->calls) && 434 list_empty(&rx->recvmsg_q) && 435 skb_queue_empty_lockless(&rx->recvmsg_oobq) && 436 rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 437 release_sock(&rx->sk); 438 return -EAGAIN; 439 } 440 441 if (list_empty(&rx->recvmsg_q)) { 442 ret = -EWOULDBLOCK; 443 if (timeo == 0) { 444 call = NULL; 445 goto error_no_call; 446 } 447 448 release_sock(&rx->sk); 449 450 /* Wait for something to happen */ 451 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 452 TASK_INTERRUPTIBLE); 453 ret = sock_error(&rx->sk); 454 if (ret) 455 goto wait_error; 456 457 if (list_empty(&rx->recvmsg_q) && 458 skb_queue_empty_lockless(&rx->recvmsg_oobq)) { 459 if (signal_pending(current)) 460 goto wait_interrupted; 461 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0); 462 timeo = schedule_timeout(timeo); 463 } 464 finish_wait(sk_sleep(&rx->sk), &wait); 465 goto try_again; 466 } 467 468 /* Deal with OOB messages before we consider getting normal data. */ 469 if (!skb_queue_empty_lockless(&rx->recvmsg_oobq)) { 470 ret = rxrpc_recvmsg_oob(sock, msg, flags); 471 release_sock(&rx->sk); 472 if (ret == -EAGAIN) 473 goto try_again; 474 goto error_no_call; 475 } 476 477 /* Find the next call and dequeue it if we're not just peeking. If we 478 * do dequeue it, that comes with a ref that we will need to release. 479 * We also want to weed out calls that got requeued whilst we were 480 * shovelling data out. 481 */ 482 spin_lock_irq(&rx->recvmsg_lock); 483 l = rx->recvmsg_q.next; 484 call = list_entry(l, struct rxrpc_call, recvmsg_link); 485 486 if (!rxrpc_call_is_complete(call) && 487 skb_queue_empty(&call->recvmsg_queue) && 488 skb_queue_empty(&rx->recvmsg_oobq)) { 489 list_del_init(&call->recvmsg_link); 490 spin_unlock_irq(&rx->recvmsg_lock); 491 release_sock(&rx->sk); 492 trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0); 493 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 494 goto try_again; 495 } 496 497 rxrpc_see_call(call, rxrpc_call_see_recvmsg); 498 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 499 rxrpc_see_call(call, rxrpc_call_see_already_released); 500 list_del_init(&call->recvmsg_link); 501 spin_unlock_irq(&rx->recvmsg_lock); 502 release_sock(&rx->sk); 503 trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0); 504 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 505 goto try_again; 506 } 507 if (!(flags & MSG_PEEK)) 508 list_del_init(&call->recvmsg_link); 509 else 510 rxrpc_get_call(call, rxrpc_call_get_recvmsg); 511 spin_unlock_irq(&rx->recvmsg_lock); 512 513 call_debug_id = call->debug_id; 514 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0); 515 516 /* We're going to drop the socket lock, so we need to lock the call 517 * against interference by sendmsg. 518 */ 519 if (!mutex_trylock(&call->user_mutex)) { 520 ret = -EWOULDBLOCK; 521 if (flags & MSG_DONTWAIT) 522 goto error_requeue_call; 523 ret = -ERESTARTSYS; 524 if (mutex_lock_interruptible(&call->user_mutex) < 0) 525 goto error_requeue_call; 526 } 527 528 release_sock(&rx->sk); 529 530 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 531 rxrpc_see_call(call, rxrpc_call_see_already_released); 532 mutex_unlock(&call->user_mutex); 533 if (!(flags & MSG_PEEK)) 534 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 535 goto try_again; 536 } 537 538 ret = rxrpc_recvmsg_user_id(call, msg, flags); 539 if (ret < 0) 540 goto error_unlock_call; 541 542 if (msg->msg_name && call->peer) { 543 size_t len = sizeof(call->dest_srx); 544 545 memcpy(msg->msg_name, &call->dest_srx, len); 546 msg->msg_namelen = len; 547 } 548 549 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len, 550 flags, &copied); 551 if (ret == -EAGAIN) 552 ret = 0; 553 if (ret == -EIO) 554 goto call_failed; 555 if (ret < 0) 556 goto error_unlock_call; 557 558 if (rxrpc_call_is_complete(call) && 559 skb_queue_empty(&call->recvmsg_queue)) 560 goto call_complete; 561 if (rxrpc_call_has_failed(call)) 562 goto call_failed; 563 564 if (!(flags & MSG_PEEK) && 565 !skb_queue_empty(&call->recvmsg_queue)) 566 rxrpc_notify_socket(call); 567 goto not_yet_complete; 568 569 call_failed: 570 rxrpc_purge_queue(&call->recvmsg_queue); 571 call_complete: 572 ret = rxrpc_recvmsg_term(call, msg); 573 if (ret < 0) 574 goto error_unlock_call; 575 if (!(flags & MSG_PEEK)) 576 rxrpc_release_call(rx, call); 577 msg->msg_flags |= MSG_EOR; 578 ret = 1; 579 580 not_yet_complete: 581 if (ret == 0) 582 msg->msg_flags |= MSG_MORE; 583 else 584 msg->msg_flags &= ~MSG_MORE; 585 ret = copied; 586 587 error_unlock_call: 588 mutex_unlock(&call->user_mutex); 589 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 590 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret); 591 return ret; 592 593 error_requeue_call: 594 if (!(flags & MSG_PEEK)) { 595 spin_lock_irq(&rx->recvmsg_lock); 596 if (list_empty(&call->recvmsg_link)) { 597 list_add(&call->recvmsg_link, &rx->recvmsg_q); 598 rxrpc_see_call(call, rxrpc_call_see_recvmsg_requeue); 599 spin_unlock_irq(&rx->recvmsg_lock); 600 } else if (list_is_first(&call->recvmsg_link, &rx->recvmsg_q)) { 601 spin_unlock_irq(&rx->recvmsg_lock); 602 rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_first); 603 } else { 604 list_move(&call->recvmsg_link, &rx->recvmsg_q); 605 spin_unlock_irq(&rx->recvmsg_lock); 606 rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_move); 607 } 608 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0); 609 } else { 610 rxrpc_put_call(call, rxrpc_call_put_recvmsg_peek_nowait); 611 } 612 error_no_call: 613 release_sock(&rx->sk); 614 error_trace: 615 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret); 616 return ret; 617 618 wait_interrupted: 619 ret = sock_intr_errno(timeo); 620 wait_error: 621 finish_wait(sk_sleep(&rx->sk), &wait); 622 call = NULL; 623 goto error_trace; 624 } 625 626 /** 627 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info 628 * @sock: The socket that the call exists on 629 * @call: The call to send data through 630 * @iter: The buffer to receive into 631 * @_len: The amount of data we want to receive (decreased on return) 632 * @want_more: True if more data is expected to be read 633 * @_abort: Where the abort code is stored if -ECONNABORTED is returned 634 * @_service: Where to store the actual service ID (may be upgraded) 635 * 636 * Allow a kernel service to receive data and pick up information about the 637 * state of a call. Note that *@_abort should also be initialised to %0. 638 * 639 * Note that we may return %-EAGAIN to drain empty packets at the end 640 * of the data, even if we've already copied over the requested data. 641 * 642 * Return: %0 if got what was asked for and there's more available, %1 643 * if we got what was asked for and we're at the end of the data and 644 * %-EAGAIN if we need more data. 645 */ 646 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, 647 struct iov_iter *iter, size_t *_len, 648 bool want_more, u32 *_abort, u16 *_service) 649 { 650 size_t offset = 0; 651 int ret; 652 653 _enter("{%d},%zu,%d", call->debug_id, *_len, want_more); 654 655 mutex_lock(&call->user_mutex); 656 657 ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset); 658 *_len -= offset; 659 if (ret == -EIO) 660 goto call_failed; 661 if (ret < 0) 662 goto out; 663 664 /* We can only reach here with a partially full buffer if we have 665 * reached the end of the data. We must otherwise have a full buffer 666 * or have been given -EAGAIN. 667 */ 668 if (ret == 1) { 669 if (iov_iter_count(iter) > 0) 670 goto short_data; 671 if (!want_more) 672 goto read_phase_complete; 673 ret = 0; 674 goto out; 675 } 676 677 if (!want_more) 678 goto excess_data; 679 goto out; 680 681 read_phase_complete: 682 ret = 1; 683 out: 684 if (_service) 685 *_service = call->dest_srx.srx_service; 686 mutex_unlock(&call->user_mutex); 687 _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); 688 return ret; 689 690 short_data: 691 trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data, 692 call->cid, call->call_id, call->rx_consumed, 693 0, -EBADMSG); 694 ret = -EBADMSG; 695 goto out; 696 excess_data: 697 trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data, 698 call->cid, call->call_id, call->rx_consumed, 699 0, -EMSGSIZE); 700 ret = -EMSGSIZE; 701 goto out; 702 call_failed: 703 *_abort = call->abort_code; 704 ret = call->error; 705 if (call->completion == RXRPC_CALL_SUCCEEDED) { 706 ret = 1; 707 if (iov_iter_count(iter) > 0) 708 ret = -ECONNRESET; 709 } 710 goto out; 711 } 712 EXPORT_SYMBOL(rxrpc_kernel_recv_data); 713