1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC recvmsg() implementation 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/net.h> 11 #include <linux/skbuff.h> 12 #include <linux/export.h> 13 #include <linux/sched/signal.h> 14 15 #include <net/sock.h> 16 #include <net/af_rxrpc.h> 17 #include "ar-internal.h" 18 19 /* 20 * Post a call for attention by the socket or kernel service. Further 21 * notifications are suppressed by putting recvmsg_link on a dummy queue. 22 */ 23 void rxrpc_notify_socket(struct rxrpc_call *call) 24 { 25 struct rxrpc_sock *rx; 26 struct sock *sk; 27 28 _enter("%d", call->debug_id); 29 30 if (!list_empty(&call->recvmsg_link)) 31 return; 32 33 rcu_read_lock(); 34 35 rx = rcu_dereference(call->socket); 36 sk = &rx->sk; 37 if (rx && sk->sk_state < RXRPC_CLOSE) { 38 if (call->notify_rx) { 39 spin_lock(&call->notify_lock); 40 call->notify_rx(sk, call, call->user_call_ID); 41 spin_unlock(&call->notify_lock); 42 } else { 43 write_lock(&rx->recvmsg_lock); 44 if (list_empty(&call->recvmsg_link)) { 45 rxrpc_get_call(call, rxrpc_call_get_notify_socket); 46 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q); 47 } 48 write_unlock(&rx->recvmsg_lock); 49 50 if (!sock_flag(sk, SOCK_DEAD)) { 51 _debug("call %ps", sk->sk_data_ready); 52 sk->sk_data_ready(sk); 53 } 54 } 55 } 56 57 rcu_read_unlock(); 58 _leave(""); 59 } 60 61 /* 62 * Transition a call to the complete state. 63 */ 64 bool __rxrpc_set_call_completion(struct rxrpc_call *call, 65 enum rxrpc_call_completion compl, 66 u32 abort_code, 67 int error) 68 { 69 if (call->state < RXRPC_CALL_COMPLETE) { 70 call->abort_code = abort_code; 71 call->error = error; 72 call->completion = compl; 73 call->state = RXRPC_CALL_COMPLETE; 74 trace_rxrpc_call_complete(call); 75 wake_up(&call->waitq); 76 rxrpc_notify_socket(call); 77 return true; 78 } 79 return false; 80 } 81 82 bool rxrpc_set_call_completion(struct rxrpc_call *call, 83 enum rxrpc_call_completion compl, 84 u32 abort_code, 85 int error) 86 { 87 bool ret = false; 88 89 if (call->state < RXRPC_CALL_COMPLETE) { 90 write_lock(&call->state_lock); 91 ret = __rxrpc_set_call_completion(call, compl, abort_code, error); 92 write_unlock(&call->state_lock); 93 } 94 return ret; 95 } 96 97 /* 98 * Record that a call successfully completed. 99 */ 100 bool __rxrpc_call_completed(struct rxrpc_call *call) 101 { 102 return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0); 103 } 104 105 bool rxrpc_call_completed(struct rxrpc_call *call) 106 { 107 bool ret = false; 108 109 if (call->state < RXRPC_CALL_COMPLETE) { 110 write_lock(&call->state_lock); 111 ret = __rxrpc_call_completed(call); 112 write_unlock(&call->state_lock); 113 } 114 return ret; 115 } 116 117 /* 118 * Record that a call is locally aborted. 119 */ 120 bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call, 121 rxrpc_seq_t seq, u32 abort_code, int error) 122 { 123 trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq, 124 abort_code, error); 125 return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED, 126 abort_code, error); 127 } 128 129 bool rxrpc_abort_call(const char *why, struct rxrpc_call *call, 130 rxrpc_seq_t seq, u32 abort_code, int error) 131 { 132 bool ret; 133 134 write_lock(&call->state_lock); 135 ret = __rxrpc_abort_call(why, call, seq, abort_code, error); 136 write_unlock(&call->state_lock); 137 return ret; 138 } 139 140 /* 141 * Pass a call terminating message to userspace. 142 */ 143 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) 144 { 145 u32 tmp = 0; 146 int ret; 147 148 switch (call->completion) { 149 case RXRPC_CALL_SUCCEEDED: 150 ret = 0; 151 if (rxrpc_is_service_call(call)) 152 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp); 153 break; 154 case RXRPC_CALL_REMOTELY_ABORTED: 155 tmp = call->abort_code; 156 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 157 break; 158 case RXRPC_CALL_LOCALLY_ABORTED: 159 tmp = call->abort_code; 160 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 161 break; 162 case RXRPC_CALL_NETWORK_ERROR: 163 tmp = -call->error; 164 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp); 165 break; 166 case RXRPC_CALL_LOCAL_ERROR: 167 tmp = -call->error; 168 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp); 169 break; 170 default: 171 pr_err("Invalid terminal call state %u\n", call->state); 172 BUG(); 173 break; 174 } 175 176 trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, 177 lower_32_bits(atomic64_read(&call->ackr_window)) - 1, 178 call->rx_pkt_offset, call->rx_pkt_len, ret); 179 return ret; 180 } 181 182 /* 183 * End the packet reception phase. 184 */ 185 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) 186 { 187 rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); 188 189 _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]); 190 191 trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); 192 193 if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) 194 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); 195 196 write_lock(&call->state_lock); 197 198 switch (call->state) { 199 case RXRPC_CALL_CLIENT_RECV_REPLY: 200 __rxrpc_call_completed(call); 201 write_unlock(&call->state_lock); 202 break; 203 204 case RXRPC_CALL_SERVER_RECV_REQUEST: 205 call->state = RXRPC_CALL_SERVER_ACK_REQUEST; 206 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; 207 write_unlock(&call->state_lock); 208 rxrpc_propose_delay_ACK(call, serial, 209 rxrpc_propose_ack_processing_op); 210 break; 211 default: 212 write_unlock(&call->state_lock); 213 break; 214 } 215 } 216 217 /* 218 * Discard a packet we've used up and advance the Rx window by one. 219 */ 220 static void rxrpc_rotate_rx_window(struct rxrpc_call *call) 221 { 222 struct rxrpc_skb_priv *sp; 223 struct sk_buff *skb; 224 rxrpc_serial_t serial; 225 rxrpc_seq_t old_consumed = call->rx_consumed, tseq; 226 bool last; 227 int acked; 228 229 _enter("%d", call->debug_id); 230 231 skb = skb_dequeue(&call->recvmsg_queue); 232 rxrpc_see_skb(skb, rxrpc_skb_see_rotate); 233 234 sp = rxrpc_skb(skb); 235 tseq = sp->hdr.seq; 236 serial = sp->hdr.serial; 237 last = sp->hdr.flags & RXRPC_LAST_PACKET; 238 239 /* Barrier against rxrpc_input_data(). */ 240 if (after(tseq, call->rx_consumed)) 241 smp_store_release(&call->rx_consumed, tseq); 242 243 rxrpc_free_skb(skb, rxrpc_skb_put_rotate); 244 245 trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate, 246 serial, call->rx_consumed); 247 if (last) { 248 rxrpc_end_rx_phase(call, serial); 249 return; 250 } 251 252 /* Check to see if there's an ACK that needs sending. */ 253 acked = atomic_add_return(call->rx_consumed - old_consumed, 254 &call->ackr_nr_consumed); 255 if (acked > 2 && 256 !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags)) 257 rxrpc_poke_call(call, rxrpc_call_poke_idle); 258 } 259 260 /* 261 * Decrypt and verify a DATA packet. 262 */ 263 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb) 264 { 265 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 266 267 if (sp->flags & RXRPC_RX_VERIFIED) 268 return 0; 269 return call->security->verify_packet(call, skb); 270 } 271 272 /* 273 * Deliver messages to a call. This keeps processing packets until the buffer 274 * is filled and we find either more DATA (returns 0) or the end of the DATA 275 * (returns 1). If more packets are required, it returns -EAGAIN. 276 */ 277 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, 278 struct msghdr *msg, struct iov_iter *iter, 279 size_t len, int flags, size_t *_offset) 280 { 281 struct rxrpc_skb_priv *sp; 282 struct sk_buff *skb; 283 rxrpc_seq_t seq = 0; 284 size_t remain; 285 unsigned int rx_pkt_offset, rx_pkt_len; 286 int copy, ret = -EAGAIN, ret2; 287 288 rx_pkt_offset = call->rx_pkt_offset; 289 rx_pkt_len = call->rx_pkt_len; 290 291 if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) { 292 seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1; 293 ret = 1; 294 goto done; 295 } 296 297 /* No one else can be removing stuff from the queue, so we shouldn't 298 * need the Rx lock to walk it. 299 */ 300 skb = skb_peek(&call->recvmsg_queue); 301 while (skb) { 302 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg); 303 sp = rxrpc_skb(skb); 304 seq = sp->hdr.seq; 305 306 if (!(flags & MSG_PEEK)) 307 trace_rxrpc_receive(call, rxrpc_receive_front, 308 sp->hdr.serial, seq); 309 310 if (msg) 311 sock_recv_timestamp(msg, sock->sk, skb); 312 313 if (rx_pkt_offset == 0) { 314 ret2 = rxrpc_verify_data(call, skb); 315 rx_pkt_offset = sp->offset; 316 rx_pkt_len = sp->len; 317 trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq, 318 rx_pkt_offset, rx_pkt_len, ret2); 319 if (ret2 < 0) { 320 ret = ret2; 321 goto out; 322 } 323 } else { 324 trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq, 325 rx_pkt_offset, rx_pkt_len, 0); 326 } 327 328 /* We have to handle short, empty and used-up DATA packets. */ 329 remain = len - *_offset; 330 copy = rx_pkt_len; 331 if (copy > remain) 332 copy = remain; 333 if (copy > 0) { 334 ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter, 335 copy); 336 if (ret2 < 0) { 337 ret = ret2; 338 goto out; 339 } 340 341 /* handle piecemeal consumption of data packets */ 342 rx_pkt_offset += copy; 343 rx_pkt_len -= copy; 344 *_offset += copy; 345 } 346 347 if (rx_pkt_len > 0) { 348 trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq, 349 rx_pkt_offset, rx_pkt_len, 0); 350 ASSERTCMP(*_offset, ==, len); 351 ret = 0; 352 break; 353 } 354 355 /* The whole packet has been transferred. */ 356 if (sp->hdr.flags & RXRPC_LAST_PACKET) 357 ret = 1; 358 rx_pkt_offset = 0; 359 rx_pkt_len = 0; 360 361 skb = skb_peek_next(skb, &call->recvmsg_queue); 362 363 if (!(flags & MSG_PEEK)) 364 rxrpc_rotate_rx_window(call); 365 } 366 367 out: 368 if (!(flags & MSG_PEEK)) { 369 call->rx_pkt_offset = rx_pkt_offset; 370 call->rx_pkt_len = rx_pkt_len; 371 } 372 done: 373 trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq, 374 rx_pkt_offset, rx_pkt_len, ret); 375 if (ret == -EAGAIN) 376 set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags); 377 return ret; 378 } 379 380 /* 381 * Receive a message from an RxRPC socket 382 * - we need to be careful about two or more threads calling recvmsg 383 * simultaneously 384 */ 385 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 386 int flags) 387 { 388 struct rxrpc_call *call; 389 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 390 struct list_head *l; 391 unsigned int call_debug_id = 0; 392 size_t copied = 0; 393 long timeo; 394 int ret; 395 396 DEFINE_WAIT(wait); 397 398 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0); 399 400 if (flags & (MSG_OOB | MSG_TRUNC)) 401 return -EOPNOTSUPP; 402 403 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 404 405 try_again: 406 lock_sock(&rx->sk); 407 408 /* Return immediately if a client socket has no outstanding calls */ 409 if (RB_EMPTY_ROOT(&rx->calls) && 410 list_empty(&rx->recvmsg_q) && 411 rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 412 release_sock(&rx->sk); 413 return -EAGAIN; 414 } 415 416 if (list_empty(&rx->recvmsg_q)) { 417 ret = -EWOULDBLOCK; 418 if (timeo == 0) { 419 call = NULL; 420 goto error_no_call; 421 } 422 423 release_sock(&rx->sk); 424 425 /* Wait for something to happen */ 426 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 427 TASK_INTERRUPTIBLE); 428 ret = sock_error(&rx->sk); 429 if (ret) 430 goto wait_error; 431 432 if (list_empty(&rx->recvmsg_q)) { 433 if (signal_pending(current)) 434 goto wait_interrupted; 435 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0); 436 timeo = schedule_timeout(timeo); 437 } 438 finish_wait(sk_sleep(&rx->sk), &wait); 439 goto try_again; 440 } 441 442 /* Find the next call and dequeue it if we're not just peeking. If we 443 * do dequeue it, that comes with a ref that we will need to release. 444 */ 445 write_lock(&rx->recvmsg_lock); 446 l = rx->recvmsg_q.next; 447 call = list_entry(l, struct rxrpc_call, recvmsg_link); 448 if (!(flags & MSG_PEEK)) 449 list_del_init(&call->recvmsg_link); 450 else 451 rxrpc_get_call(call, rxrpc_call_get_recvmsg); 452 write_unlock(&rx->recvmsg_lock); 453 454 call_debug_id = call->debug_id; 455 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0); 456 457 /* We're going to drop the socket lock, so we need to lock the call 458 * against interference by sendmsg. 459 */ 460 if (!mutex_trylock(&call->user_mutex)) { 461 ret = -EWOULDBLOCK; 462 if (flags & MSG_DONTWAIT) 463 goto error_requeue_call; 464 ret = -ERESTARTSYS; 465 if (mutex_lock_interruptible(&call->user_mutex) < 0) 466 goto error_requeue_call; 467 } 468 469 release_sock(&rx->sk); 470 471 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) 472 BUG(); 473 474 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 475 if (flags & MSG_CMSG_COMPAT) { 476 unsigned int id32 = call->user_call_ID; 477 478 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 479 sizeof(unsigned int), &id32); 480 } else { 481 unsigned long idl = call->user_call_ID; 482 483 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 484 sizeof(unsigned long), &idl); 485 } 486 if (ret < 0) 487 goto error_unlock_call; 488 } 489 490 if (msg->msg_name && call->peer) { 491 size_t len = sizeof(call->dest_srx); 492 493 memcpy(msg->msg_name, &call->dest_srx, len); 494 msg->msg_namelen = len; 495 } 496 497 switch (READ_ONCE(call->state)) { 498 case RXRPC_CALL_CLIENT_RECV_REPLY: 499 case RXRPC_CALL_SERVER_RECV_REQUEST: 500 case RXRPC_CALL_SERVER_ACK_REQUEST: 501 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len, 502 flags, &copied); 503 if (ret == -EAGAIN) 504 ret = 0; 505 506 if (!skb_queue_empty(&call->recvmsg_queue)) 507 rxrpc_notify_socket(call); 508 break; 509 default: 510 ret = 0; 511 break; 512 } 513 514 if (ret < 0) 515 goto error_unlock_call; 516 517 if (call->state == RXRPC_CALL_COMPLETE) { 518 ret = rxrpc_recvmsg_term(call, msg); 519 if (ret < 0) 520 goto error_unlock_call; 521 if (!(flags & MSG_PEEK)) 522 rxrpc_release_call(rx, call); 523 msg->msg_flags |= MSG_EOR; 524 ret = 1; 525 } 526 527 if (ret == 0) 528 msg->msg_flags |= MSG_MORE; 529 else 530 msg->msg_flags &= ~MSG_MORE; 531 ret = copied; 532 533 error_unlock_call: 534 mutex_unlock(&call->user_mutex); 535 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 536 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret); 537 return ret; 538 539 error_requeue_call: 540 if (!(flags & MSG_PEEK)) { 541 write_lock(&rx->recvmsg_lock); 542 list_add(&call->recvmsg_link, &rx->recvmsg_q); 543 write_unlock(&rx->recvmsg_lock); 544 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0); 545 } else { 546 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 547 } 548 error_no_call: 549 release_sock(&rx->sk); 550 error_trace: 551 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret); 552 return ret; 553 554 wait_interrupted: 555 ret = sock_intr_errno(timeo); 556 wait_error: 557 finish_wait(sk_sleep(&rx->sk), &wait); 558 call = NULL; 559 goto error_trace; 560 } 561 562 /** 563 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info 564 * @sock: The socket that the call exists on 565 * @call: The call to send data through 566 * @iter: The buffer to receive into 567 * @_len: The amount of data we want to receive (decreased on return) 568 * @want_more: True if more data is expected to be read 569 * @_abort: Where the abort code is stored if -ECONNABORTED is returned 570 * @_service: Where to store the actual service ID (may be upgraded) 571 * 572 * Allow a kernel service to receive data and pick up information about the 573 * state of a call. Returns 0 if got what was asked for and there's more 574 * available, 1 if we got what was asked for and we're at the end of the data 575 * and -EAGAIN if we need more data. 576 * 577 * Note that we may return -EAGAIN to drain empty packets at the end of the 578 * data, even if we've already copied over the requested data. 579 * 580 * *_abort should also be initialised to 0. 581 */ 582 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, 583 struct iov_iter *iter, size_t *_len, 584 bool want_more, u32 *_abort, u16 *_service) 585 { 586 size_t offset = 0; 587 int ret; 588 589 _enter("{%d,%s},%zu,%d", 590 call->debug_id, rxrpc_call_states[call->state], 591 *_len, want_more); 592 593 ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING); 594 595 mutex_lock(&call->user_mutex); 596 597 switch (READ_ONCE(call->state)) { 598 case RXRPC_CALL_CLIENT_RECV_REPLY: 599 case RXRPC_CALL_SERVER_RECV_REQUEST: 600 case RXRPC_CALL_SERVER_ACK_REQUEST: 601 ret = rxrpc_recvmsg_data(sock, call, NULL, iter, 602 *_len, 0, &offset); 603 *_len -= offset; 604 if (ret < 0) 605 goto out; 606 607 /* We can only reach here with a partially full buffer if we 608 * have reached the end of the data. We must otherwise have a 609 * full buffer or have been given -EAGAIN. 610 */ 611 if (ret == 1) { 612 if (iov_iter_count(iter) > 0) 613 goto short_data; 614 if (!want_more) 615 goto read_phase_complete; 616 ret = 0; 617 goto out; 618 } 619 620 if (!want_more) 621 goto excess_data; 622 goto out; 623 624 case RXRPC_CALL_COMPLETE: 625 goto call_complete; 626 627 default: 628 ret = -EINPROGRESS; 629 goto out; 630 } 631 632 read_phase_complete: 633 ret = 1; 634 out: 635 if (_service) 636 *_service = call->dest_srx.srx_service; 637 mutex_unlock(&call->user_mutex); 638 _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); 639 return ret; 640 641 short_data: 642 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data")); 643 ret = -EBADMSG; 644 goto out; 645 excess_data: 646 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data")); 647 ret = -EMSGSIZE; 648 goto out; 649 call_complete: 650 *_abort = call->abort_code; 651 ret = call->error; 652 if (call->completion == RXRPC_CALL_SUCCEEDED) { 653 ret = 1; 654 if (iov_iter_count(iter) > 0) 655 ret = -ECONNRESET; 656 } 657 goto out; 658 } 659 EXPORT_SYMBOL(rxrpc_kernel_recv_data); 660