1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC recvmsg() implementation 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/net.h> 11 #include <linux/skbuff.h> 12 #include <linux/export.h> 13 #include <linux/sched/signal.h> 14 15 #include <net/sock.h> 16 #include <net/af_rxrpc.h> 17 #include "ar-internal.h" 18 19 /* 20 * Post a call for attention by the socket or kernel service. Further 21 * notifications are suppressed by putting recvmsg_link on a dummy queue. 22 */ 23 void rxrpc_notify_socket(struct rxrpc_call *call) 24 { 25 struct rxrpc_sock *rx; 26 struct sock *sk; 27 28 _enter("%d", call->debug_id); 29 30 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 31 rxrpc_see_call(call, rxrpc_call_see_notify_released); 32 return; 33 } 34 35 rcu_read_lock(); 36 37 rx = rcu_dereference(call->socket); 38 sk = &rx->sk; 39 if (rx && sk->sk_state < RXRPC_CLOSE) { 40 if (call->notify_rx) { 41 spin_lock_irq(&call->notify_lock); 42 call->notify_rx(sk, call, call->user_call_ID); 43 spin_unlock_irq(&call->notify_lock); 44 } else { 45 spin_lock_irq(&rx->recvmsg_lock); 46 if (list_empty(&call->recvmsg_link)) { 47 rxrpc_get_call(call, rxrpc_call_get_notify_socket); 48 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q); 49 } 50 spin_unlock_irq(&rx->recvmsg_lock); 51 52 if (!sock_flag(sk, SOCK_DEAD)) { 53 _debug("call %ps", sk->sk_data_ready); 54 sk->sk_data_ready(sk); 55 } 56 } 57 } 58 59 rcu_read_unlock(); 60 _leave(""); 61 } 62 63 /* 64 * Pass a call terminating message to userspace. 65 */ 66 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) 67 { 68 u32 tmp = 0; 69 int ret; 70 71 switch (call->completion) { 72 case RXRPC_CALL_SUCCEEDED: 73 ret = 0; 74 if (rxrpc_is_service_call(call)) 75 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp); 76 break; 77 case RXRPC_CALL_REMOTELY_ABORTED: 78 tmp = call->abort_code; 79 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 80 break; 81 case RXRPC_CALL_LOCALLY_ABORTED: 82 tmp = call->abort_code; 83 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 84 break; 85 case RXRPC_CALL_NETWORK_ERROR: 86 tmp = -call->error; 87 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp); 88 break; 89 case RXRPC_CALL_LOCAL_ERROR: 90 tmp = -call->error; 91 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp); 92 break; 93 default: 94 pr_err("Invalid terminal call state %u\n", call->completion); 95 BUG(); 96 break; 97 } 98 99 trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, 100 call->ackr_window - 1, 101 call->rx_pkt_offset, call->rx_pkt_len, ret); 102 return ret; 103 } 104 105 /* 106 * Discard a packet we've used up and advance the Rx window by one. 107 */ 108 static void rxrpc_rotate_rx_window(struct rxrpc_call *call) 109 { 110 struct rxrpc_skb_priv *sp; 111 struct sk_buff *skb; 112 rxrpc_serial_t serial; 113 rxrpc_seq_t old_consumed = call->rx_consumed, tseq; 114 bool last; 115 int acked; 116 117 _enter("%d", call->debug_id); 118 119 skb = skb_dequeue(&call->recvmsg_queue); 120 rxrpc_see_skb(skb, rxrpc_skb_see_rotate); 121 122 sp = rxrpc_skb(skb); 123 tseq = sp->hdr.seq; 124 serial = sp->hdr.serial; 125 last = sp->hdr.flags & RXRPC_LAST_PACKET; 126 127 /* Barrier against rxrpc_input_data(). */ 128 if (after(tseq, call->rx_consumed)) 129 smp_store_release(&call->rx_consumed, tseq); 130 131 rxrpc_free_skb(skb, rxrpc_skb_put_rotate); 132 133 trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate, 134 serial, call->rx_consumed); 135 136 if (last) 137 set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags); 138 139 /* Check to see if there's an ACK that needs sending. */ 140 acked = atomic_add_return(call->rx_consumed - old_consumed, 141 &call->ackr_nr_consumed); 142 if (acked > 8 && 143 !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags)) 144 rxrpc_poke_call(call, rxrpc_call_poke_idle); 145 } 146 147 /* 148 * Decrypt and verify a DATA packet. The content of the packet is pulled out 149 * into a flat buffer rather than decrypting in place in the skbuff. This also 150 * has the advantage of aligning the buffer correctly for the crypto routines. 151 * 152 * We keep track of the sequence number of the packet currently decrypted into 153 * the buffer in ->rx_dec_seq. If MSG_PEEK is used and steps onto a new 154 * packet, subsequent recvmsg() calls will have to go back and re-decrypt the 155 * current packet. 156 */ 157 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb) 158 { 159 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 160 int ret; 161 162 if (sp->len > call->rx_dec_bsize || !call->rx_dec_buffer) { 163 /* Make sure we can hold a 1412-byte jumbo subpacket and make 164 * sure that the buffer size is aligned to a crypto blocksize. 165 */ 166 size_t size = clamp(round_up(sp->len, 32), 2048, 65535); 167 void *buffer = krealloc(call->rx_dec_buffer, size, GFP_NOFS); 168 169 if (!buffer) 170 return -ENOMEM; 171 call->rx_dec_buffer = buffer; 172 call->rx_dec_bsize = size; 173 } 174 175 ret = -EFAULT; 176 if (skb_copy_bits(skb, sp->offset, call->rx_dec_buffer, sp->len) < 0) 177 goto err; 178 179 call->rx_dec_offset = 0; 180 call->rx_dec_len = sp->len; 181 call->rx_dec_seq = sp->hdr.seq; 182 ret = call->security->verify_packet(call, skb); 183 if (ret < 0) 184 goto err; 185 return 0; 186 187 err: 188 kfree(call->rx_dec_buffer); 189 call->rx_dec_buffer = NULL; 190 call->rx_dec_bsize = 0; 191 call->rx_dec_offset = 0; 192 call->rx_dec_len = 0; 193 return ret; 194 } 195 196 /* 197 * Transcribe a call's user ID to a control message. 198 */ 199 static int rxrpc_recvmsg_user_id(struct rxrpc_call *call, struct msghdr *msg, 200 int flags) 201 { 202 if (!test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) 203 return 0; 204 205 if (flags & MSG_CMSG_COMPAT) { 206 unsigned int id32 = call->user_call_ID; 207 208 return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 209 sizeof(unsigned int), &id32); 210 } else { 211 unsigned long idl = call->user_call_ID; 212 213 return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 214 sizeof(unsigned long), &idl); 215 } 216 } 217 218 /* 219 * Deal with a CHALLENGE packet. 220 */ 221 static int rxrpc_recvmsg_challenge(struct socket *sock, struct msghdr *msg, 222 struct sk_buff *challenge, unsigned int flags) 223 { 224 struct rxrpc_skb_priv *sp = rxrpc_skb(challenge); 225 struct rxrpc_connection *conn = sp->chall.conn; 226 227 return conn->security->challenge_to_recvmsg(conn, challenge, msg); 228 } 229 230 /* 231 * Process OOB packets. Called with the socket locked. 232 */ 233 static int rxrpc_recvmsg_oob(struct socket *sock, struct msghdr *msg, 234 unsigned int flags) 235 { 236 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 237 struct sk_buff *skb; 238 bool need_response = false; 239 int ret; 240 241 skb = skb_peek(&rx->recvmsg_oobq); 242 if (!skb) 243 return -EAGAIN; 244 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg); 245 246 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_OOB_ID, sizeof(u64), 247 &skb->skb_mstamp_ns); 248 if (ret < 0) 249 return ret; 250 251 switch ((enum rxrpc_oob_type)skb->mark) { 252 case RXRPC_OOB_CHALLENGE: 253 need_response = true; 254 ret = rxrpc_recvmsg_challenge(sock, msg, skb, flags); 255 break; 256 default: 257 WARN_ONCE(1, "recvmsg() can't process unknown OOB type %u\n", 258 skb->mark); 259 ret = -EIO; 260 break; 261 } 262 263 if (!(flags & MSG_PEEK)) { 264 skb_unlink(skb, &rx->recvmsg_oobq); 265 if (need_response) 266 rxrpc_add_pending_oob(rx, skb); 267 else 268 rxrpc_free_skb(skb, rxrpc_skb_put_oob); 269 } 270 return ret; 271 } 272 273 /* 274 * Deliver messages to a call. This keeps processing packets until the buffer 275 * is filled and we find either more DATA (returns 0) or the end of the DATA 276 * (returns 1). If more packets are required, it returns -EAGAIN and if the 277 * call has failed it returns -EIO. 278 */ 279 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, 280 struct msghdr *msg, struct iov_iter *iter, 281 size_t len, int flags, size_t *_offset) 282 { 283 struct rxrpc_skb_priv *sp; 284 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 285 struct sk_buff *skb; 286 rxrpc_seq_t seq = 0; 287 size_t remain; 288 unsigned int rx_pkt_offset, rx_pkt_len; 289 int copy, ret = -EAGAIN, ret2; 290 291 rx_pkt_offset = call->rx_pkt_offset; 292 rx_pkt_len = call->rx_pkt_len; 293 294 if (rxrpc_call_has_failed(call)) { 295 seq = call->ackr_window - 1; 296 ret = -EIO; 297 goto done; 298 } 299 300 if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) { 301 seq = call->ackr_window - 1; 302 ret = 1; 303 goto done; 304 } 305 306 /* No one else can be removing stuff from the queue, so we shouldn't 307 * need the Rx lock to walk it. 308 */ 309 skb = skb_peek(&call->recvmsg_queue); 310 while (skb) { 311 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg); 312 sp = rxrpc_skb(skb); 313 seq = sp->hdr.seq; 314 315 if (!(flags & MSG_PEEK)) 316 trace_rxrpc_receive(call, rxrpc_receive_front, 317 sp->hdr.serial, seq); 318 319 if (msg) 320 sock_recv_timestamp(msg, sock->sk, skb); 321 322 if (call->rx_dec_seq != sp->hdr.seq || 323 !call->rx_dec_buffer) { 324 ret2 = rxrpc_verify_data(call, skb); 325 trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq, 326 call->rx_dec_offset, 327 call->rx_dec_len, ret2); 328 if (ret2 < 0) { 329 ret = ret2; 330 goto out; 331 } 332 } 333 334 if (rx_pkt_offset == USHRT_MAX) { 335 rx_pkt_offset = call->rx_dec_offset; 336 rx_pkt_len = call->rx_dec_len; 337 } else { 338 trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq, 339 rx_pkt_offset, rx_pkt_len, 0); 340 } 341 342 /* We have to handle short, empty and used-up DATA packets. */ 343 remain = len - *_offset; 344 copy = rx_pkt_len; 345 if (copy > remain) 346 copy = remain; 347 if (copy > 0) { 348 ret2 = copy_to_iter(call->rx_dec_buffer + rx_pkt_offset, 349 copy, iter); 350 if (ret2 != copy) { 351 ret = -EFAULT; 352 goto out; 353 } 354 355 /* handle piecemeal consumption of data packets */ 356 rx_pkt_offset += copy; 357 rx_pkt_len -= copy; 358 *_offset += copy; 359 } 360 361 if (rx_pkt_len > 0) { 362 trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq, 363 rx_pkt_offset, rx_pkt_len, 0); 364 ASSERTCMP(*_offset, ==, len); 365 ret = 0; 366 break; 367 } 368 369 /* The whole packet has been transferred. */ 370 if (sp->hdr.flags & RXRPC_LAST_PACKET) 371 ret = 1; 372 rx_pkt_offset = USHRT_MAX; 373 rx_pkt_len = 0; 374 375 skb = skb_peek_next(skb, &call->recvmsg_queue); 376 377 if (!(flags & MSG_PEEK)) 378 rxrpc_rotate_rx_window(call); 379 380 if (!rx->app_ops && 381 !skb_queue_empty_lockless(&rx->recvmsg_oobq)) { 382 trace_rxrpc_recvdata(call, rxrpc_recvmsg_oobq, seq, 383 rx_pkt_offset, rx_pkt_len, ret); 384 break; 385 } 386 } 387 388 out: 389 if (!(flags & MSG_PEEK)) { 390 call->rx_pkt_offset = rx_pkt_offset; 391 call->rx_pkt_len = rx_pkt_len; 392 } 393 394 done: 395 trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq, 396 rx_pkt_offset, rx_pkt_len, ret); 397 if (ret == -EAGAIN) 398 set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags); 399 return ret; 400 } 401 402 /* 403 * Receive a message from an RxRPC socket 404 * - we need to be careful about two or more threads calling recvmsg 405 * simultaneously 406 */ 407 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 408 int flags) 409 { 410 struct rxrpc_call *call; 411 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 412 struct list_head *l; 413 unsigned int call_debug_id = 0; 414 size_t copied = 0; 415 long timeo; 416 int ret; 417 418 DEFINE_WAIT(wait); 419 420 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0); 421 422 if (flags & (MSG_OOB | MSG_TRUNC)) 423 return -EOPNOTSUPP; 424 425 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 426 427 try_again: 428 lock_sock(&rx->sk); 429 430 /* Return immediately if a client socket has no outstanding calls */ 431 if (RB_EMPTY_ROOT(&rx->calls) && 432 list_empty(&rx->recvmsg_q) && 433 skb_queue_empty_lockless(&rx->recvmsg_oobq) && 434 rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 435 release_sock(&rx->sk); 436 return -EAGAIN; 437 } 438 439 if (list_empty(&rx->recvmsg_q) && 440 skb_queue_empty_lockless(&rx->recvmsg_oobq)) { 441 ret = -EWOULDBLOCK; 442 if (timeo == 0) { 443 call = NULL; 444 goto error_no_call; 445 } 446 447 release_sock(&rx->sk); 448 449 /* Wait for something to happen */ 450 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 451 TASK_INTERRUPTIBLE); 452 ret = sock_error(&rx->sk); 453 if (ret) 454 goto wait_error; 455 456 if (list_empty(&rx->recvmsg_q) && 457 skb_queue_empty_lockless(&rx->recvmsg_oobq)) { 458 if (signal_pending(current)) 459 goto wait_interrupted; 460 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0); 461 timeo = schedule_timeout(timeo); 462 } 463 finish_wait(sk_sleep(&rx->sk), &wait); 464 goto try_again; 465 } 466 467 /* Deal with OOB messages before we consider getting normal data. */ 468 if (!skb_queue_empty_lockless(&rx->recvmsg_oobq)) { 469 ret = rxrpc_recvmsg_oob(sock, msg, flags); 470 release_sock(&rx->sk); 471 if (ret == -EAGAIN) 472 goto try_again; 473 goto error_trace; 474 } 475 476 /* Find the next call and dequeue it if we're not just peeking. If we 477 * do dequeue it, that comes with a ref that we will need to release. 478 * We also want to weed out calls that got requeued whilst we were 479 * shovelling data out. 480 */ 481 spin_lock_irq(&rx->recvmsg_lock); 482 l = rx->recvmsg_q.next; 483 call = list_entry(l, struct rxrpc_call, recvmsg_link); 484 485 if (!rxrpc_call_is_complete(call) && 486 skb_queue_empty(&call->recvmsg_queue) && 487 skb_queue_empty(&rx->recvmsg_oobq)) { 488 list_del_init(&call->recvmsg_link); 489 spin_unlock_irq(&rx->recvmsg_lock); 490 release_sock(&rx->sk); 491 trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0); 492 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 493 goto try_again; 494 } 495 496 rxrpc_see_call(call, rxrpc_call_see_recvmsg); 497 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 498 rxrpc_see_call(call, rxrpc_call_see_already_released); 499 list_del_init(&call->recvmsg_link); 500 spin_unlock_irq(&rx->recvmsg_lock); 501 release_sock(&rx->sk); 502 trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0); 503 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 504 goto try_again; 505 } 506 if (!(flags & MSG_PEEK)) 507 list_del_init(&call->recvmsg_link); 508 else 509 rxrpc_get_call(call, rxrpc_call_get_recvmsg); 510 spin_unlock_irq(&rx->recvmsg_lock); 511 512 call_debug_id = call->debug_id; 513 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0); 514 515 /* We're going to drop the socket lock, so we need to lock the call 516 * against interference by sendmsg. 517 */ 518 if (!mutex_trylock(&call->user_mutex)) { 519 ret = -EWOULDBLOCK; 520 if (flags & MSG_DONTWAIT) 521 goto error_requeue_call; 522 ret = -ERESTARTSYS; 523 if (mutex_lock_interruptible(&call->user_mutex) < 0) 524 goto error_requeue_call; 525 } 526 527 release_sock(&rx->sk); 528 529 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 530 rxrpc_see_call(call, rxrpc_call_see_already_released); 531 mutex_unlock(&call->user_mutex); 532 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 533 goto try_again; 534 } 535 536 ret = rxrpc_recvmsg_user_id(call, msg, flags); 537 if (ret < 0) 538 goto error_unlock_call; 539 540 if (msg->msg_name && call->peer) { 541 size_t len = sizeof(call->dest_srx); 542 543 memcpy(msg->msg_name, &call->dest_srx, len); 544 msg->msg_namelen = len; 545 } 546 547 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len, 548 flags, &copied); 549 if (ret == -EAGAIN) 550 ret = 0; 551 if (ret == -EIO) 552 goto call_failed; 553 if (ret < 0) 554 goto error_unlock_call; 555 556 if (rxrpc_call_is_complete(call) && 557 skb_queue_empty(&call->recvmsg_queue)) 558 goto call_complete; 559 if (rxrpc_call_has_failed(call)) 560 goto call_failed; 561 562 if (!(flags & MSG_PEEK) && 563 !skb_queue_empty(&call->recvmsg_queue)) 564 rxrpc_notify_socket(call); 565 goto not_yet_complete; 566 567 call_failed: 568 rxrpc_purge_queue(&call->recvmsg_queue); 569 call_complete: 570 ret = rxrpc_recvmsg_term(call, msg); 571 if (ret < 0) 572 goto error_unlock_call; 573 if (!(flags & MSG_PEEK)) 574 rxrpc_release_call(rx, call); 575 msg->msg_flags |= MSG_EOR; 576 ret = 1; 577 578 not_yet_complete: 579 if (ret == 0) 580 msg->msg_flags |= MSG_MORE; 581 else 582 msg->msg_flags &= ~MSG_MORE; 583 ret = copied; 584 585 error_unlock_call: 586 mutex_unlock(&call->user_mutex); 587 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 588 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret); 589 return ret; 590 591 error_requeue_call: 592 if (!(flags & MSG_PEEK)) { 593 spin_lock_irq(&rx->recvmsg_lock); 594 if (list_empty(&call->recvmsg_link)) { 595 list_add(&call->recvmsg_link, &rx->recvmsg_q); 596 rxrpc_see_call(call, rxrpc_call_see_recvmsg_requeue); 597 spin_unlock_irq(&rx->recvmsg_lock); 598 } else if (list_is_first(&call->recvmsg_link, &rx->recvmsg_q)) { 599 spin_unlock_irq(&rx->recvmsg_lock); 600 rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_first); 601 } else { 602 list_move(&call->recvmsg_link, &rx->recvmsg_q); 603 spin_unlock_irq(&rx->recvmsg_lock); 604 rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_move); 605 } 606 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0); 607 } else { 608 rxrpc_put_call(call, rxrpc_call_put_recvmsg_peek_nowait); 609 } 610 error_no_call: 611 release_sock(&rx->sk); 612 error_trace: 613 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret); 614 return ret; 615 616 wait_interrupted: 617 ret = sock_intr_errno(timeo); 618 wait_error: 619 finish_wait(sk_sleep(&rx->sk), &wait); 620 call = NULL; 621 goto error_trace; 622 } 623 624 /** 625 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info 626 * @sock: The socket that the call exists on 627 * @call: The call to send data through 628 * @iter: The buffer to receive into 629 * @_len: The amount of data we want to receive (decreased on return) 630 * @want_more: True if more data is expected to be read 631 * @_abort: Where the abort code is stored if -ECONNABORTED is returned 632 * @_service: Where to store the actual service ID (may be upgraded) 633 * 634 * Allow a kernel service to receive data and pick up information about the 635 * state of a call. Note that *@_abort should also be initialised to %0. 636 * 637 * Note that we may return %-EAGAIN to drain empty packets at the end 638 * of the data, even if we've already copied over the requested data. 639 * 640 * Return: %0 if got what was asked for and there's more available, %1 641 * if we got what was asked for and we're at the end of the data and 642 * %-EAGAIN if we need more data. 643 */ 644 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, 645 struct iov_iter *iter, size_t *_len, 646 bool want_more, u32 *_abort, u16 *_service) 647 { 648 size_t offset = 0; 649 int ret; 650 651 _enter("{%d},%zu,%d", call->debug_id, *_len, want_more); 652 653 mutex_lock(&call->user_mutex); 654 655 ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset); 656 *_len -= offset; 657 if (ret == -EIO) 658 goto call_failed; 659 if (ret < 0) 660 goto out; 661 662 /* We can only reach here with a partially full buffer if we have 663 * reached the end of the data. We must otherwise have a full buffer 664 * or have been given -EAGAIN. 665 */ 666 if (ret == 1) { 667 if (iov_iter_count(iter) > 0) 668 goto short_data; 669 if (!want_more) 670 goto read_phase_complete; 671 ret = 0; 672 goto out; 673 } 674 675 if (!want_more) 676 goto excess_data; 677 goto out; 678 679 read_phase_complete: 680 ret = 1; 681 out: 682 if (_service) 683 *_service = call->dest_srx.srx_service; 684 mutex_unlock(&call->user_mutex); 685 _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); 686 return ret; 687 688 short_data: 689 trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data, 690 call->cid, call->call_id, call->rx_consumed, 691 0, -EBADMSG); 692 ret = -EBADMSG; 693 goto out; 694 excess_data: 695 trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data, 696 call->cid, call->call_id, call->rx_consumed, 697 0, -EMSGSIZE); 698 ret = -EMSGSIZE; 699 goto out; 700 call_failed: 701 *_abort = call->abort_code; 702 ret = call->error; 703 if (call->completion == RXRPC_CALL_SUCCEEDED) { 704 ret = 1; 705 if (iov_iter_count(iter) > 0) 706 ret = -ECONNRESET; 707 } 708 goto out; 709 } 710 EXPORT_SYMBOL(rxrpc_kernel_recv_data); 711