1 /* RxRPC recvmsg() implementation 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 13 14 #include <linux/net.h> 15 #include <linux/skbuff.h> 16 #include <linux/export.h> 17 #include <net/sock.h> 18 #include <net/af_rxrpc.h> 19 #include "ar-internal.h" 20 21 /* 22 * removal a call's user ID from the socket tree to make the user ID available 23 * again and so that it won't be seen again in association with that call 24 */ 25 void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call) 26 { 27 _debug("RELEASE CALL %d", call->debug_id); 28 29 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 30 write_lock_bh(&rx->call_lock); 31 rb_erase(&call->sock_node, &call->socket->calls); 32 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags); 33 write_unlock_bh(&rx->call_lock); 34 } 35 36 read_lock_bh(&call->state_lock); 37 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 38 !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events)) 39 rxrpc_queue_call(call); 40 read_unlock_bh(&call->state_lock); 41 } 42 43 /* 44 * receive a message from an RxRPC socket 45 * - we need to be careful about two or more threads calling recvmsg 46 * simultaneously 47 */ 48 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 49 int flags) 50 { 51 struct rxrpc_skb_priv *sp; 52 struct rxrpc_call *call = NULL, *continue_call = NULL; 53 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 54 struct sk_buff *skb; 55 long timeo; 56 int copy, ret, ullen, offset, copied = 0; 57 u32 abort_code; 58 59 DEFINE_WAIT(wait); 60 61 _enter(",,,%zu,%d", len, flags); 62 63 if (flags & (MSG_OOB | MSG_TRUNC)) 64 return -EOPNOTSUPP; 65 66 ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long); 67 68 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 69 msg->msg_flags |= MSG_MORE; 70 71 lock_sock(&rx->sk); 72 73 for (;;) { 74 /* return immediately if a client socket has no outstanding 75 * calls */ 76 if (RB_EMPTY_ROOT(&rx->calls)) { 77 if (copied) 78 goto out; 79 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 80 release_sock(&rx->sk); 81 if (continue_call) 82 rxrpc_put_call(continue_call); 83 return -ENODATA; 84 } 85 } 86 87 /* get the next message on the Rx queue */ 88 skb = skb_peek(&rx->sk.sk_receive_queue); 89 if (!skb) { 90 /* nothing remains on the queue */ 91 if (copied && 92 (flags & MSG_PEEK || timeo == 0)) 93 goto out; 94 95 /* wait for a message to turn up */ 96 release_sock(&rx->sk); 97 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 98 TASK_INTERRUPTIBLE); 99 ret = sock_error(&rx->sk); 100 if (ret) 101 goto wait_error; 102 103 if (skb_queue_empty(&rx->sk.sk_receive_queue)) { 104 if (signal_pending(current)) 105 goto wait_interrupted; 106 timeo = schedule_timeout(timeo); 107 } 108 finish_wait(sk_sleep(&rx->sk), &wait); 109 lock_sock(&rx->sk); 110 continue; 111 } 112 113 peek_next_packet: 114 sp = rxrpc_skb(skb); 115 call = sp->call; 116 ASSERT(call != NULL); 117 118 _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]); 119 120 /* make sure we wait for the state to be updated in this call */ 121 spin_lock_bh(&call->lock); 122 spin_unlock_bh(&call->lock); 123 124 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 125 _debug("packet from released call"); 126 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 127 BUG(); 128 rxrpc_free_skb(skb); 129 continue; 130 } 131 132 /* determine whether to continue last data receive */ 133 if (continue_call) { 134 _debug("maybe cont"); 135 if (call != continue_call || 136 skb->mark != RXRPC_SKB_MARK_DATA) { 137 release_sock(&rx->sk); 138 rxrpc_put_call(continue_call); 139 _leave(" = %d [noncont]", copied); 140 return copied; 141 } 142 } 143 144 rxrpc_get_call(call); 145 146 /* copy the peer address and timestamp */ 147 if (!continue_call) { 148 if (msg->msg_name) { 149 size_t len = 150 sizeof(call->conn->params.peer->srx); 151 memcpy(msg->msg_name, 152 &call->conn->params.peer->srx, len); 153 msg->msg_namelen = len; 154 } 155 sock_recv_timestamp(msg, &rx->sk, skb); 156 } 157 158 /* receive the message */ 159 if (skb->mark != RXRPC_SKB_MARK_DATA) 160 goto receive_non_data_message; 161 162 _debug("recvmsg DATA #%u { %d, %d }", 163 sp->hdr.seq, skb->len, sp->offset); 164 165 if (!continue_call) { 166 /* only set the control data once per recvmsg() */ 167 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 168 ullen, &call->user_call_ID); 169 if (ret < 0) 170 goto copy_error; 171 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 172 } 173 174 ASSERTCMP(sp->hdr.seq, >=, call->rx_data_recv); 175 ASSERTCMP(sp->hdr.seq, <=, call->rx_data_recv + 1); 176 call->rx_data_recv = sp->hdr.seq; 177 178 ASSERTCMP(sp->hdr.seq, >, call->rx_data_eaten); 179 180 offset = sp->offset; 181 copy = skb->len - offset; 182 if (copy > len - copied) 183 copy = len - copied; 184 185 ret = skb_copy_datagram_msg(skb, offset, msg, copy); 186 187 if (ret < 0) 188 goto copy_error; 189 190 /* handle piecemeal consumption of data packets */ 191 _debug("copied %d+%d", copy, copied); 192 193 offset += copy; 194 copied += copy; 195 196 if (!(flags & MSG_PEEK)) 197 sp->offset = offset; 198 199 if (sp->offset < skb->len) { 200 _debug("buffer full"); 201 ASSERTCMP(copied, ==, len); 202 break; 203 } 204 205 /* we transferred the whole data packet */ 206 if (sp->hdr.flags & RXRPC_LAST_PACKET) { 207 _debug("last"); 208 if (rxrpc_conn_is_client(call->conn)) { 209 /* last byte of reply received */ 210 ret = copied; 211 goto terminal_message; 212 } 213 214 /* last bit of request received */ 215 if (!(flags & MSG_PEEK)) { 216 _debug("eat packet"); 217 if (skb_dequeue(&rx->sk.sk_receive_queue) != 218 skb) 219 BUG(); 220 rxrpc_free_skb(skb); 221 } 222 msg->msg_flags &= ~MSG_MORE; 223 break; 224 } 225 226 /* move on to the next data message */ 227 _debug("next"); 228 if (!continue_call) 229 continue_call = sp->call; 230 else 231 rxrpc_put_call(call); 232 call = NULL; 233 234 if (flags & MSG_PEEK) { 235 _debug("peek next"); 236 skb = skb->next; 237 if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue) 238 break; 239 goto peek_next_packet; 240 } 241 242 _debug("eat packet"); 243 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 244 BUG(); 245 rxrpc_free_skb(skb); 246 } 247 248 /* end of non-terminal data packet reception for the moment */ 249 _debug("end rcv data"); 250 out: 251 release_sock(&rx->sk); 252 if (call) 253 rxrpc_put_call(call); 254 if (continue_call) 255 rxrpc_put_call(continue_call); 256 _leave(" = %d [data]", copied); 257 return copied; 258 259 /* handle non-DATA messages such as aborts, incoming connections and 260 * final ACKs */ 261 receive_non_data_message: 262 _debug("non-data"); 263 264 if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) { 265 _debug("RECV NEW CALL"); 266 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code); 267 if (ret < 0) 268 goto copy_error; 269 if (!(flags & MSG_PEEK)) { 270 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 271 BUG(); 272 rxrpc_free_skb(skb); 273 } 274 goto out; 275 } 276 277 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 278 ullen, &call->user_call_ID); 279 if (ret < 0) 280 goto copy_error; 281 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 282 283 switch (skb->mark) { 284 case RXRPC_SKB_MARK_DATA: 285 BUG(); 286 case RXRPC_SKB_MARK_FINAL_ACK: 287 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code); 288 break; 289 case RXRPC_SKB_MARK_BUSY: 290 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code); 291 break; 292 case RXRPC_SKB_MARK_REMOTE_ABORT: 293 abort_code = call->remote_abort; 294 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 295 break; 296 case RXRPC_SKB_MARK_LOCAL_ABORT: 297 abort_code = call->local_abort; 298 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 299 break; 300 case RXRPC_SKB_MARK_NET_ERROR: 301 _debug("RECV NET ERROR %d", sp->error); 302 abort_code = sp->error; 303 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code); 304 break; 305 case RXRPC_SKB_MARK_LOCAL_ERROR: 306 _debug("RECV LOCAL ERROR %d", sp->error); 307 abort_code = sp->error; 308 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, 309 &abort_code); 310 break; 311 default: 312 pr_err("Unknown packet mark %u\n", skb->mark); 313 BUG(); 314 break; 315 } 316 317 if (ret < 0) 318 goto copy_error; 319 320 terminal_message: 321 _debug("terminal"); 322 msg->msg_flags &= ~MSG_MORE; 323 msg->msg_flags |= MSG_EOR; 324 325 if (!(flags & MSG_PEEK)) { 326 _net("free terminal skb %p", skb); 327 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 328 BUG(); 329 rxrpc_free_skb(skb); 330 rxrpc_remove_user_ID(rx, call); 331 } 332 333 release_sock(&rx->sk); 334 rxrpc_put_call(call); 335 if (continue_call) 336 rxrpc_put_call(continue_call); 337 _leave(" = %d", ret); 338 return ret; 339 340 copy_error: 341 _debug("copy error"); 342 release_sock(&rx->sk); 343 rxrpc_put_call(call); 344 if (continue_call) 345 rxrpc_put_call(continue_call); 346 _leave(" = %d", ret); 347 return ret; 348 349 wait_interrupted: 350 ret = sock_intr_errno(timeo); 351 wait_error: 352 finish_wait(sk_sleep(&rx->sk), &wait); 353 if (continue_call) 354 rxrpc_put_call(continue_call); 355 if (copied) 356 copied = ret; 357 _leave(" = %d [waitfail %d]", copied, ret); 358 return copied; 359 360 } 361 362 /** 363 * rxrpc_kernel_data_delivered - Record delivery of data message 364 * @skb: Message holding data 365 * 366 * Record the delivery of a data message. This permits RxRPC to keep its 367 * tracking correct. The socket buffer will be deleted. 368 */ 369 void rxrpc_kernel_data_delivered(struct sk_buff *skb) 370 { 371 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 372 struct rxrpc_call *call = sp->call; 373 374 ASSERTCMP(sp->hdr.seq, >=, call->rx_data_recv); 375 ASSERTCMP(sp->hdr.seq, <=, call->rx_data_recv + 1); 376 call->rx_data_recv = sp->hdr.seq; 377 378 ASSERTCMP(sp->hdr.seq, >, call->rx_data_eaten); 379 rxrpc_free_skb(skb); 380 } 381 382 EXPORT_SYMBOL(rxrpc_kernel_data_delivered); 383 384 /** 385 * rxrpc_kernel_is_data_last - Determine if data message is last one 386 * @skb: Message holding data 387 * 388 * Determine if data message is last one for the parent call. 389 */ 390 bool rxrpc_kernel_is_data_last(struct sk_buff *skb) 391 { 392 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 393 394 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); 395 396 return sp->hdr.flags & RXRPC_LAST_PACKET; 397 } 398 399 EXPORT_SYMBOL(rxrpc_kernel_is_data_last); 400 401 /** 402 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message 403 * @skb: Message indicating an abort 404 * 405 * Get the abort code from an RxRPC abort message. 406 */ 407 u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) 408 { 409 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 410 411 switch (skb->mark) { 412 case RXRPC_SKB_MARK_REMOTE_ABORT: 413 return sp->call->remote_abort; 414 case RXRPC_SKB_MARK_LOCAL_ABORT: 415 return sp->call->local_abort; 416 default: 417 BUG(); 418 } 419 } 420 421 EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); 422 423 /** 424 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message 425 * @skb: Message indicating an error 426 * 427 * Get the error number from an RxRPC error message. 428 */ 429 int rxrpc_kernel_get_error_number(struct sk_buff *skb) 430 { 431 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 432 433 return sp->error; 434 } 435 436 EXPORT_SYMBOL(rxrpc_kernel_get_error_number); 437