1 /* Maintain an RxRPC server socket to do AFS communications through 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12 #include <linux/slab.h> 13 #include <net/sock.h> 14 #include <net/af_rxrpc.h> 15 #include <rxrpc/packet.h> 16 #include "internal.h" 17 #include "afs_cm.h" 18 19 static struct socket *afs_socket; /* my RxRPC socket */ 20 static struct workqueue_struct *afs_async_calls; 21 static atomic_t afs_outstanding_calls; 22 static atomic_t afs_outstanding_skbs; 23 24 static void afs_wake_up_call_waiter(struct afs_call *); 25 static int afs_wait_for_call_to_complete(struct afs_call *); 26 static void afs_wake_up_async_call(struct afs_call *); 27 static int afs_dont_wait_for_call_to_complete(struct afs_call *); 28 static void afs_process_async_call(struct afs_call *); 29 static void afs_rx_interceptor(struct sock *, unsigned long, struct sk_buff *); 30 static int afs_deliver_cm_op_id(struct afs_call *, struct sk_buff *, bool); 31 32 /* synchronous call management */ 33 const struct afs_wait_mode afs_sync_call = { 34 .rx_wakeup = afs_wake_up_call_waiter, 35 .wait = afs_wait_for_call_to_complete, 36 }; 37 38 /* asynchronous call management */ 39 const struct afs_wait_mode afs_async_call = { 40 .rx_wakeup = afs_wake_up_async_call, 41 .wait = afs_dont_wait_for_call_to_complete, 42 }; 43 44 /* asynchronous incoming call management */ 45 static const struct afs_wait_mode afs_async_incoming_call = { 46 .rx_wakeup = afs_wake_up_async_call, 47 }; 48 49 /* asynchronous incoming call initial processing */ 50 static const struct afs_call_type afs_RXCMxxxx = { 51 .name = "CB.xxxx", 52 .deliver = afs_deliver_cm_op_id, 53 .abort_to_error = afs_abort_to_error, 54 }; 55 56 static void afs_collect_incoming_call(struct work_struct *); 57 58 static struct sk_buff_head afs_incoming_calls; 59 static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call); 60 61 static void afs_async_workfn(struct work_struct *work) 62 { 63 struct afs_call *call = container_of(work, struct afs_call, async_work); 64 65 call->async_workfn(call); 66 } 67 68 /* 69 * open an RxRPC socket and bind it to be a server for callback notifications 70 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT 71 */ 72 int afs_open_socket(void) 73 { 74 struct sockaddr_rxrpc srx; 75 struct socket *socket; 76 int ret; 77 78 _enter(""); 79 80 skb_queue_head_init(&afs_incoming_calls); 81 82 afs_async_calls = create_singlethread_workqueue("kafsd"); 83 if (!afs_async_calls) { 84 _leave(" = -ENOMEM [wq]"); 85 return -ENOMEM; 86 } 87 88 ret = sock_create_kern(AF_RXRPC, SOCK_DGRAM, PF_INET, &socket); 89 if (ret < 0) { 90 destroy_workqueue(afs_async_calls); 91 _leave(" = %d [socket]", ret); 92 return ret; 93 } 94 95 socket->sk->sk_allocation = GFP_NOFS; 96 97 /* bind the callback manager's address to make this a server socket */ 98 srx.srx_family = AF_RXRPC; 99 srx.srx_service = CM_SERVICE; 100 srx.transport_type = SOCK_DGRAM; 101 srx.transport_len = sizeof(srx.transport.sin); 102 srx.transport.sin.sin_family = AF_INET; 103 srx.transport.sin.sin_port = htons(AFS_CM_PORT); 104 memset(&srx.transport.sin.sin_addr, 0, 105 sizeof(srx.transport.sin.sin_addr)); 106 107 ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx)); 108 if (ret < 0) { 109 sock_release(socket); 110 destroy_workqueue(afs_async_calls); 111 _leave(" = %d [bind]", ret); 112 return ret; 113 } 114 115 rxrpc_kernel_intercept_rx_messages(socket, afs_rx_interceptor); 116 117 afs_socket = socket; 118 _leave(" = 0"); 119 return 0; 120 } 121 122 /* 123 * close the RxRPC socket AFS was using 124 */ 125 void afs_close_socket(void) 126 { 127 _enter(""); 128 129 sock_release(afs_socket); 130 131 _debug("dework"); 132 destroy_workqueue(afs_async_calls); 133 134 ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0); 135 ASSERTCMP(atomic_read(&afs_outstanding_calls), ==, 0); 136 _leave(""); 137 } 138 139 /* 140 * note that the data in a socket buffer is now delivered and that the buffer 141 * should be freed 142 */ 143 static void afs_data_delivered(struct sk_buff *skb) 144 { 145 if (!skb) { 146 _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs)); 147 dump_stack(); 148 } else { 149 _debug("DLVR %p{%u} [%d]", 150 skb, skb->mark, atomic_read(&afs_outstanding_skbs)); 151 if (atomic_dec_return(&afs_outstanding_skbs) == -1) 152 BUG(); 153 rxrpc_kernel_data_delivered(skb); 154 } 155 } 156 157 /* 158 * free a socket buffer 159 */ 160 static void afs_free_skb(struct sk_buff *skb) 161 { 162 if (!skb) { 163 _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs)); 164 dump_stack(); 165 } else { 166 _debug("FREE %p{%u} [%d]", 167 skb, skb->mark, atomic_read(&afs_outstanding_skbs)); 168 if (atomic_dec_return(&afs_outstanding_skbs) == -1) 169 BUG(); 170 rxrpc_kernel_free_skb(skb); 171 } 172 } 173 174 /* 175 * free a call 176 */ 177 static void afs_free_call(struct afs_call *call) 178 { 179 _debug("DONE %p{%s} [%d]", 180 call, call->type->name, atomic_read(&afs_outstanding_calls)); 181 if (atomic_dec_return(&afs_outstanding_calls) == -1) 182 BUG(); 183 184 ASSERTCMP(call->rxcall, ==, NULL); 185 ASSERT(!work_pending(&call->async_work)); 186 ASSERT(skb_queue_empty(&call->rx_queue)); 187 ASSERT(call->type->name != NULL); 188 189 kfree(call->request); 190 kfree(call); 191 } 192 193 /* 194 * End a call but do not free it 195 */ 196 static void afs_end_call_nofree(struct afs_call *call) 197 { 198 if (call->rxcall) { 199 rxrpc_kernel_end_call(call->rxcall); 200 call->rxcall = NULL; 201 } 202 if (call->type->destructor) 203 call->type->destructor(call); 204 } 205 206 /* 207 * End a call and free it 208 */ 209 static void afs_end_call(struct afs_call *call) 210 { 211 afs_end_call_nofree(call); 212 afs_free_call(call); 213 } 214 215 /* 216 * allocate a call with flat request and reply buffers 217 */ 218 struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type, 219 size_t request_size, size_t reply_size) 220 { 221 struct afs_call *call; 222 223 call = kzalloc(sizeof(*call), GFP_NOFS); 224 if (!call) 225 goto nomem_call; 226 227 _debug("CALL %p{%s} [%d]", 228 call, type->name, atomic_read(&afs_outstanding_calls)); 229 atomic_inc(&afs_outstanding_calls); 230 231 call->type = type; 232 call->request_size = request_size; 233 call->reply_max = reply_size; 234 235 if (request_size) { 236 call->request = kmalloc(request_size, GFP_NOFS); 237 if (!call->request) 238 goto nomem_free; 239 } 240 241 if (reply_size) { 242 call->buffer = kmalloc(reply_size, GFP_NOFS); 243 if (!call->buffer) 244 goto nomem_free; 245 } 246 247 init_waitqueue_head(&call->waitq); 248 skb_queue_head_init(&call->rx_queue); 249 return call; 250 251 nomem_free: 252 afs_free_call(call); 253 nomem_call: 254 return NULL; 255 } 256 257 /* 258 * clean up a call with flat buffer 259 */ 260 void afs_flat_call_destructor(struct afs_call *call) 261 { 262 _enter(""); 263 264 kfree(call->request); 265 call->request = NULL; 266 kfree(call->buffer); 267 call->buffer = NULL; 268 } 269 270 /* 271 * attach the data from a bunch of pages on an inode to a call 272 */ 273 static int afs_send_pages(struct afs_call *call, struct msghdr *msg, 274 struct kvec *iov) 275 { 276 struct page *pages[8]; 277 unsigned count, n, loop, offset, to; 278 pgoff_t first = call->first, last = call->last; 279 int ret; 280 281 _enter(""); 282 283 offset = call->first_offset; 284 call->first_offset = 0; 285 286 do { 287 _debug("attach %lx-%lx", first, last); 288 289 count = last - first + 1; 290 if (count > ARRAY_SIZE(pages)) 291 count = ARRAY_SIZE(pages); 292 n = find_get_pages_contig(call->mapping, first, count, pages); 293 ASSERTCMP(n, ==, count); 294 295 loop = 0; 296 do { 297 msg->msg_flags = 0; 298 to = PAGE_SIZE; 299 if (first + loop >= last) 300 to = call->last_to; 301 else 302 msg->msg_flags = MSG_MORE; 303 iov->iov_base = kmap(pages[loop]) + offset; 304 iov->iov_len = to - offset; 305 offset = 0; 306 307 _debug("- range %u-%u%s", 308 offset, to, msg->msg_flags ? " [more]" : ""); 309 msg->msg_iov = (struct iovec *) iov; 310 msg->msg_iovlen = 1; 311 312 /* have to change the state *before* sending the last 313 * packet as RxRPC might give us the reply before it 314 * returns from sending the request */ 315 if (first + loop >= last) 316 call->state = AFS_CALL_AWAIT_REPLY; 317 ret = rxrpc_kernel_send_data(call->rxcall, msg, 318 to - offset); 319 kunmap(pages[loop]); 320 if (ret < 0) 321 break; 322 } while (++loop < count); 323 first += count; 324 325 for (loop = 0; loop < count; loop++) 326 put_page(pages[loop]); 327 if (ret < 0) 328 break; 329 } while (first <= last); 330 331 _leave(" = %d", ret); 332 return ret; 333 } 334 335 /* 336 * initiate a call 337 */ 338 int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp, 339 const struct afs_wait_mode *wait_mode) 340 { 341 struct sockaddr_rxrpc srx; 342 struct rxrpc_call *rxcall; 343 struct msghdr msg; 344 struct kvec iov[1]; 345 int ret; 346 struct sk_buff *skb; 347 348 _enter("%x,{%d},", addr->s_addr, ntohs(call->port)); 349 350 ASSERT(call->type != NULL); 351 ASSERT(call->type->name != NULL); 352 353 _debug("____MAKE %p{%s,%x} [%d]____", 354 call, call->type->name, key_serial(call->key), 355 atomic_read(&afs_outstanding_calls)); 356 357 call->wait_mode = wait_mode; 358 call->async_workfn = afs_process_async_call; 359 INIT_WORK(&call->async_work, afs_async_workfn); 360 361 memset(&srx, 0, sizeof(srx)); 362 srx.srx_family = AF_RXRPC; 363 srx.srx_service = call->service_id; 364 srx.transport_type = SOCK_DGRAM; 365 srx.transport_len = sizeof(srx.transport.sin); 366 srx.transport.sin.sin_family = AF_INET; 367 srx.transport.sin.sin_port = call->port; 368 memcpy(&srx.transport.sin.sin_addr, addr, 4); 369 370 /* create a call */ 371 rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key, 372 (unsigned long) call, gfp); 373 call->key = NULL; 374 if (IS_ERR(rxcall)) { 375 ret = PTR_ERR(rxcall); 376 goto error_kill_call; 377 } 378 379 call->rxcall = rxcall; 380 381 /* send the request */ 382 iov[0].iov_base = call->request; 383 iov[0].iov_len = call->request_size; 384 385 msg.msg_name = NULL; 386 msg.msg_namelen = 0; 387 msg.msg_iov = (struct iovec *) iov; 388 msg.msg_iovlen = 1; 389 msg.msg_control = NULL; 390 msg.msg_controllen = 0; 391 msg.msg_flags = (call->send_pages ? MSG_MORE : 0); 392 393 /* have to change the state *before* sending the last packet as RxRPC 394 * might give us the reply before it returns from sending the 395 * request */ 396 if (!call->send_pages) 397 call->state = AFS_CALL_AWAIT_REPLY; 398 ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size); 399 if (ret < 0) 400 goto error_do_abort; 401 402 if (call->send_pages) { 403 ret = afs_send_pages(call, &msg, iov); 404 if (ret < 0) 405 goto error_do_abort; 406 } 407 408 /* at this point, an async call may no longer exist as it may have 409 * already completed */ 410 return wait_mode->wait(call); 411 412 error_do_abort: 413 rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT); 414 while ((skb = skb_dequeue(&call->rx_queue))) 415 afs_free_skb(skb); 416 error_kill_call: 417 afs_end_call(call); 418 _leave(" = %d", ret); 419 return ret; 420 } 421 422 /* 423 * handles intercepted messages that were arriving in the socket's Rx queue 424 * - called with the socket receive queue lock held to ensure message ordering 425 * - called with softirqs disabled 426 */ 427 static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID, 428 struct sk_buff *skb) 429 { 430 struct afs_call *call = (struct afs_call *) user_call_ID; 431 432 _enter("%p,,%u", call, skb->mark); 433 434 _debug("ICPT %p{%u} [%d]", 435 skb, skb->mark, atomic_read(&afs_outstanding_skbs)); 436 437 ASSERTCMP(sk, ==, afs_socket->sk); 438 atomic_inc(&afs_outstanding_skbs); 439 440 if (!call) { 441 /* its an incoming call for our callback service */ 442 skb_queue_tail(&afs_incoming_calls, skb); 443 queue_work(afs_wq, &afs_collect_incoming_call_work); 444 } else { 445 /* route the messages directly to the appropriate call */ 446 skb_queue_tail(&call->rx_queue, skb); 447 call->wait_mode->rx_wakeup(call); 448 } 449 450 _leave(""); 451 } 452 453 /* 454 * deliver messages to a call 455 */ 456 static void afs_deliver_to_call(struct afs_call *call) 457 { 458 struct sk_buff *skb; 459 bool last; 460 u32 abort_code; 461 int ret; 462 463 _enter(""); 464 465 while ((call->state == AFS_CALL_AWAIT_REPLY || 466 call->state == AFS_CALL_AWAIT_OP_ID || 467 call->state == AFS_CALL_AWAIT_REQUEST || 468 call->state == AFS_CALL_AWAIT_ACK) && 469 (skb = skb_dequeue(&call->rx_queue))) { 470 switch (skb->mark) { 471 case RXRPC_SKB_MARK_DATA: 472 _debug("Rcv DATA"); 473 last = rxrpc_kernel_is_data_last(skb); 474 ret = call->type->deliver(call, skb, last); 475 switch (ret) { 476 case 0: 477 if (last && 478 call->state == AFS_CALL_AWAIT_REPLY) 479 call->state = AFS_CALL_COMPLETE; 480 break; 481 case -ENOTCONN: 482 abort_code = RX_CALL_DEAD; 483 goto do_abort; 484 case -ENOTSUPP: 485 abort_code = RX_INVALID_OPERATION; 486 goto do_abort; 487 default: 488 abort_code = RXGEN_CC_UNMARSHAL; 489 if (call->state != AFS_CALL_AWAIT_REPLY) 490 abort_code = RXGEN_SS_UNMARSHAL; 491 do_abort: 492 rxrpc_kernel_abort_call(call->rxcall, 493 abort_code); 494 call->error = ret; 495 call->state = AFS_CALL_ERROR; 496 break; 497 } 498 afs_data_delivered(skb); 499 skb = NULL; 500 continue; 501 case RXRPC_SKB_MARK_FINAL_ACK: 502 _debug("Rcv ACK"); 503 call->state = AFS_CALL_COMPLETE; 504 break; 505 case RXRPC_SKB_MARK_BUSY: 506 _debug("Rcv BUSY"); 507 call->error = -EBUSY; 508 call->state = AFS_CALL_BUSY; 509 break; 510 case RXRPC_SKB_MARK_REMOTE_ABORT: 511 abort_code = rxrpc_kernel_get_abort_code(skb); 512 call->error = call->type->abort_to_error(abort_code); 513 call->state = AFS_CALL_ABORTED; 514 _debug("Rcv ABORT %u -> %d", abort_code, call->error); 515 break; 516 case RXRPC_SKB_MARK_NET_ERROR: 517 call->error = -rxrpc_kernel_get_error_number(skb); 518 call->state = AFS_CALL_ERROR; 519 _debug("Rcv NET ERROR %d", call->error); 520 break; 521 case RXRPC_SKB_MARK_LOCAL_ERROR: 522 call->error = -rxrpc_kernel_get_error_number(skb); 523 call->state = AFS_CALL_ERROR; 524 _debug("Rcv LOCAL ERROR %d", call->error); 525 break; 526 default: 527 BUG(); 528 break; 529 } 530 531 afs_free_skb(skb); 532 } 533 534 /* make sure the queue is empty if the call is done with (we might have 535 * aborted the call early because of an unmarshalling error) */ 536 if (call->state >= AFS_CALL_COMPLETE) { 537 while ((skb = skb_dequeue(&call->rx_queue))) 538 afs_free_skb(skb); 539 if (call->incoming) 540 afs_end_call(call); 541 } 542 543 _leave(""); 544 } 545 546 /* 547 * wait synchronously for a call to complete 548 */ 549 static int afs_wait_for_call_to_complete(struct afs_call *call) 550 { 551 struct sk_buff *skb; 552 int ret; 553 554 DECLARE_WAITQUEUE(myself, current); 555 556 _enter(""); 557 558 add_wait_queue(&call->waitq, &myself); 559 for (;;) { 560 set_current_state(TASK_INTERRUPTIBLE); 561 562 /* deliver any messages that are in the queue */ 563 if (!skb_queue_empty(&call->rx_queue)) { 564 __set_current_state(TASK_RUNNING); 565 afs_deliver_to_call(call); 566 continue; 567 } 568 569 ret = call->error; 570 if (call->state >= AFS_CALL_COMPLETE) 571 break; 572 ret = -EINTR; 573 if (signal_pending(current)) 574 break; 575 schedule(); 576 } 577 578 remove_wait_queue(&call->waitq, &myself); 579 __set_current_state(TASK_RUNNING); 580 581 /* kill the call */ 582 if (call->state < AFS_CALL_COMPLETE) { 583 _debug("call incomplete"); 584 rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD); 585 while ((skb = skb_dequeue(&call->rx_queue))) 586 afs_free_skb(skb); 587 } 588 589 _debug("call complete"); 590 afs_end_call(call); 591 _leave(" = %d", ret); 592 return ret; 593 } 594 595 /* 596 * wake up a waiting call 597 */ 598 static void afs_wake_up_call_waiter(struct afs_call *call) 599 { 600 wake_up(&call->waitq); 601 } 602 603 /* 604 * wake up an asynchronous call 605 */ 606 static void afs_wake_up_async_call(struct afs_call *call) 607 { 608 _enter(""); 609 queue_work(afs_async_calls, &call->async_work); 610 } 611 612 /* 613 * put a call into asynchronous mode 614 * - mustn't touch the call descriptor as the call my have completed by the 615 * time we get here 616 */ 617 static int afs_dont_wait_for_call_to_complete(struct afs_call *call) 618 { 619 _enter(""); 620 return -EINPROGRESS; 621 } 622 623 /* 624 * delete an asynchronous call 625 */ 626 static void afs_delete_async_call(struct afs_call *call) 627 { 628 _enter(""); 629 630 afs_free_call(call); 631 632 _leave(""); 633 } 634 635 /* 636 * perform processing on an asynchronous call 637 * - on a multiple-thread workqueue this work item may try to run on several 638 * CPUs at the same time 639 */ 640 static void afs_process_async_call(struct afs_call *call) 641 { 642 _enter(""); 643 644 if (!skb_queue_empty(&call->rx_queue)) 645 afs_deliver_to_call(call); 646 647 if (call->state >= AFS_CALL_COMPLETE && call->wait_mode) { 648 if (call->wait_mode->async_complete) 649 call->wait_mode->async_complete(call->reply, 650 call->error); 651 call->reply = NULL; 652 653 /* kill the call */ 654 afs_end_call_nofree(call); 655 656 /* we can't just delete the call because the work item may be 657 * queued */ 658 call->async_workfn = afs_delete_async_call; 659 queue_work(afs_async_calls, &call->async_work); 660 } 661 662 _leave(""); 663 } 664 665 /* 666 * empty a socket buffer into a flat reply buffer 667 */ 668 void afs_transfer_reply(struct afs_call *call, struct sk_buff *skb) 669 { 670 size_t len = skb->len; 671 672 if (skb_copy_bits(skb, 0, call->buffer + call->reply_size, len) < 0) 673 BUG(); 674 call->reply_size += len; 675 } 676 677 /* 678 * accept the backlog of incoming calls 679 */ 680 static void afs_collect_incoming_call(struct work_struct *work) 681 { 682 struct rxrpc_call *rxcall; 683 struct afs_call *call = NULL; 684 struct sk_buff *skb; 685 686 while ((skb = skb_dequeue(&afs_incoming_calls))) { 687 _debug("new call"); 688 689 /* don't need the notification */ 690 afs_free_skb(skb); 691 692 if (!call) { 693 call = kzalloc(sizeof(struct afs_call), GFP_KERNEL); 694 if (!call) { 695 rxrpc_kernel_reject_call(afs_socket); 696 return; 697 } 698 699 call->async_workfn = afs_process_async_call; 700 INIT_WORK(&call->async_work, afs_async_workfn); 701 call->wait_mode = &afs_async_incoming_call; 702 call->type = &afs_RXCMxxxx; 703 init_waitqueue_head(&call->waitq); 704 skb_queue_head_init(&call->rx_queue); 705 call->state = AFS_CALL_AWAIT_OP_ID; 706 707 _debug("CALL %p{%s} [%d]", 708 call, call->type->name, 709 atomic_read(&afs_outstanding_calls)); 710 atomic_inc(&afs_outstanding_calls); 711 } 712 713 rxcall = rxrpc_kernel_accept_call(afs_socket, 714 (unsigned long) call); 715 if (!IS_ERR(rxcall)) { 716 call->rxcall = rxcall; 717 call = NULL; 718 } 719 } 720 721 if (call) 722 afs_free_call(call); 723 } 724 725 /* 726 * grab the operation ID from an incoming cache manager call 727 */ 728 static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb, 729 bool last) 730 { 731 size_t len = skb->len; 732 void *oibuf = (void *) &call->operation_ID; 733 734 _enter("{%u},{%zu},%d", call->offset, len, last); 735 736 ASSERTCMP(call->offset, <, 4); 737 738 /* the operation ID forms the first four bytes of the request data */ 739 len = min_t(size_t, len, 4 - call->offset); 740 if (skb_copy_bits(skb, 0, oibuf + call->offset, len) < 0) 741 BUG(); 742 if (!pskb_pull(skb, len)) 743 BUG(); 744 call->offset += len; 745 746 if (call->offset < 4) { 747 if (last) { 748 _leave(" = -EBADMSG [op ID short]"); 749 return -EBADMSG; 750 } 751 _leave(" = 0 [incomplete]"); 752 return 0; 753 } 754 755 call->state = AFS_CALL_AWAIT_REQUEST; 756 757 /* ask the cache manager to route the call (it'll change the call type 758 * if successful) */ 759 if (!afs_cm_incoming_call(call)) 760 return -ENOTSUPP; 761 762 /* pass responsibility for the remainer of this message off to the 763 * cache manager op */ 764 return call->type->deliver(call, skb, last); 765 } 766 767 /* 768 * send an empty reply 769 */ 770 void afs_send_empty_reply(struct afs_call *call) 771 { 772 struct msghdr msg; 773 struct iovec iov[1]; 774 775 _enter(""); 776 777 iov[0].iov_base = NULL; 778 iov[0].iov_len = 0; 779 msg.msg_name = NULL; 780 msg.msg_namelen = 0; 781 msg.msg_iov = iov; 782 msg.msg_iovlen = 0; 783 msg.msg_control = NULL; 784 msg.msg_controllen = 0; 785 msg.msg_flags = 0; 786 787 call->state = AFS_CALL_AWAIT_ACK; 788 switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) { 789 case 0: 790 _leave(" [replied]"); 791 return; 792 793 case -ENOMEM: 794 _debug("oom"); 795 rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT); 796 default: 797 afs_end_call(call); 798 _leave(" [error]"); 799 return; 800 } 801 } 802 803 /* 804 * send a simple reply 805 */ 806 void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len) 807 { 808 struct msghdr msg; 809 struct iovec iov[1]; 810 int n; 811 812 _enter(""); 813 814 iov[0].iov_base = (void *) buf; 815 iov[0].iov_len = len; 816 msg.msg_name = NULL; 817 msg.msg_namelen = 0; 818 msg.msg_iov = iov; 819 msg.msg_iovlen = 1; 820 msg.msg_control = NULL; 821 msg.msg_controllen = 0; 822 msg.msg_flags = 0; 823 824 call->state = AFS_CALL_AWAIT_ACK; 825 n = rxrpc_kernel_send_data(call->rxcall, &msg, len); 826 if (n >= 0) { 827 /* Success */ 828 _leave(" [replied]"); 829 return; 830 } 831 832 if (n == -ENOMEM) { 833 _debug("oom"); 834 rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT); 835 } 836 afs_end_call(call); 837 _leave(" [error]"); 838 } 839 840 /* 841 * extract a piece of data from the received data socket buffers 842 */ 843 int afs_extract_data(struct afs_call *call, struct sk_buff *skb, 844 bool last, void *buf, size_t count) 845 { 846 size_t len = skb->len; 847 848 _enter("{%u},{%zu},%d,,%zu", call->offset, len, last, count); 849 850 ASSERTCMP(call->offset, <, count); 851 852 len = min_t(size_t, len, count - call->offset); 853 if (skb_copy_bits(skb, 0, buf + call->offset, len) < 0 || 854 !pskb_pull(skb, len)) 855 BUG(); 856 call->offset += len; 857 858 if (call->offset < count) { 859 if (last) { 860 _leave(" = -EBADMSG [%d < %zu]", call->offset, count); 861 return -EBADMSG; 862 } 863 _leave(" = -EAGAIN"); 864 return -EAGAIN; 865 } 866 return 0; 867 } 868