1 /* Maintain an RxRPC server socket to do AFS communications through 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12 #include <net/sock.h> 13 #include <net/af_rxrpc.h> 14 #include <rxrpc/packet.h> 15 #include "internal.h" 16 #include "afs_cm.h" 17 18 static struct socket *afs_socket; /* my RxRPC socket */ 19 static struct workqueue_struct *afs_async_calls; 20 static atomic_t afs_outstanding_calls; 21 static atomic_t afs_outstanding_skbs; 22 23 static void afs_wake_up_call_waiter(struct afs_call *); 24 static int afs_wait_for_call_to_complete(struct afs_call *); 25 static void afs_wake_up_async_call(struct afs_call *); 26 static int afs_dont_wait_for_call_to_complete(struct afs_call *); 27 static void afs_process_async_call(struct work_struct *); 28 static void afs_rx_interceptor(struct sock *, unsigned long, struct sk_buff *); 29 static int afs_deliver_cm_op_id(struct afs_call *, struct sk_buff *, bool); 30 31 /* synchronous call management */ 32 const struct afs_wait_mode afs_sync_call = { 33 .rx_wakeup = afs_wake_up_call_waiter, 34 .wait = afs_wait_for_call_to_complete, 35 }; 36 37 /* asynchronous call management */ 38 const struct afs_wait_mode afs_async_call = { 39 .rx_wakeup = afs_wake_up_async_call, 40 .wait = afs_dont_wait_for_call_to_complete, 41 }; 42 43 /* asynchronous incoming call management */ 44 static const struct afs_wait_mode afs_async_incoming_call = { 45 .rx_wakeup = afs_wake_up_async_call, 46 }; 47 48 /* asynchronous incoming call initial processing */ 49 static const struct afs_call_type afs_RXCMxxxx = { 50 .name = "CB.xxxx", 51 .deliver = afs_deliver_cm_op_id, 52 .abort_to_error = afs_abort_to_error, 53 }; 54 55 static void afs_collect_incoming_call(struct work_struct *); 56 57 static struct sk_buff_head afs_incoming_calls; 58 static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call); 59 60 /* 61 * open an RxRPC socket and bind it to be a server for callback notifications 62 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT 63 */ 64 int afs_open_socket(void) 65 { 66 struct sockaddr_rxrpc srx; 67 struct socket *socket; 68 int ret; 69 70 _enter(""); 71 72 skb_queue_head_init(&afs_incoming_calls); 73 74 afs_async_calls = create_singlethread_workqueue("kafsd"); 75 if (!afs_async_calls) { 76 _leave(" = -ENOMEM [wq]"); 77 return -ENOMEM; 78 } 79 80 ret = sock_create_kern(AF_RXRPC, SOCK_DGRAM, PF_INET, &socket); 81 if (ret < 0) { 82 destroy_workqueue(afs_async_calls); 83 _leave(" = %d [socket]", ret); 84 return ret; 85 } 86 87 socket->sk->sk_allocation = GFP_NOFS; 88 89 /* bind the callback manager's address to make this a server socket */ 90 srx.srx_family = AF_RXRPC; 91 srx.srx_service = CM_SERVICE; 92 srx.transport_type = SOCK_DGRAM; 93 srx.transport_len = sizeof(srx.transport.sin); 94 srx.transport.sin.sin_family = AF_INET; 95 srx.transport.sin.sin_port = htons(AFS_CM_PORT); 96 memset(&srx.transport.sin.sin_addr, 0, 97 sizeof(srx.transport.sin.sin_addr)); 98 99 ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx)); 100 if (ret < 0) { 101 sock_release(socket); 102 _leave(" = %d [bind]", ret); 103 return ret; 104 } 105 106 rxrpc_kernel_intercept_rx_messages(socket, afs_rx_interceptor); 107 108 afs_socket = socket; 109 _leave(" = 0"); 110 return 0; 111 } 112 113 /* 114 * close the RxRPC socket AFS was using 115 */ 116 void afs_close_socket(void) 117 { 118 _enter(""); 119 120 sock_release(afs_socket); 121 122 _debug("dework"); 123 destroy_workqueue(afs_async_calls); 124 125 ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0); 126 ASSERTCMP(atomic_read(&afs_outstanding_calls), ==, 0); 127 _leave(""); 128 } 129 130 /* 131 * note that the data in a socket buffer is now delivered and that the buffer 132 * should be freed 133 */ 134 static void afs_data_delivered(struct sk_buff *skb) 135 { 136 if (!skb) { 137 _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs)); 138 dump_stack(); 139 } else { 140 _debug("DLVR %p{%u} [%d]", 141 skb, skb->mark, atomic_read(&afs_outstanding_skbs)); 142 if (atomic_dec_return(&afs_outstanding_skbs) == -1) 143 BUG(); 144 rxrpc_kernel_data_delivered(skb); 145 } 146 } 147 148 /* 149 * free a socket buffer 150 */ 151 static void afs_free_skb(struct sk_buff *skb) 152 { 153 if (!skb) { 154 _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs)); 155 dump_stack(); 156 } else { 157 _debug("FREE %p{%u} [%d]", 158 skb, skb->mark, atomic_read(&afs_outstanding_skbs)); 159 if (atomic_dec_return(&afs_outstanding_skbs) == -1) 160 BUG(); 161 rxrpc_kernel_free_skb(skb); 162 } 163 } 164 165 /* 166 * free a call 167 */ 168 static void afs_free_call(struct afs_call *call) 169 { 170 _debug("DONE %p{%s} [%d]", 171 call, call->type->name, atomic_read(&afs_outstanding_calls)); 172 if (atomic_dec_return(&afs_outstanding_calls) == -1) 173 BUG(); 174 175 ASSERTCMP(call->rxcall, ==, NULL); 176 ASSERT(!work_pending(&call->async_work)); 177 ASSERT(skb_queue_empty(&call->rx_queue)); 178 ASSERT(call->type->name != NULL); 179 180 kfree(call->request); 181 kfree(call); 182 } 183 184 /* 185 * allocate a call with flat request and reply buffers 186 */ 187 struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type, 188 size_t request_size, size_t reply_size) 189 { 190 struct afs_call *call; 191 192 call = kzalloc(sizeof(*call), GFP_NOFS); 193 if (!call) 194 goto nomem_call; 195 196 _debug("CALL %p{%s} [%d]", 197 call, type->name, atomic_read(&afs_outstanding_calls)); 198 atomic_inc(&afs_outstanding_calls); 199 200 call->type = type; 201 call->request_size = request_size; 202 call->reply_max = reply_size; 203 204 if (request_size) { 205 call->request = kmalloc(request_size, GFP_NOFS); 206 if (!call->request) 207 goto nomem_free; 208 } 209 210 if (reply_size) { 211 call->buffer = kmalloc(reply_size, GFP_NOFS); 212 if (!call->buffer) 213 goto nomem_free; 214 } 215 216 init_waitqueue_head(&call->waitq); 217 skb_queue_head_init(&call->rx_queue); 218 return call; 219 220 nomem_free: 221 afs_free_call(call); 222 nomem_call: 223 return NULL; 224 } 225 226 /* 227 * clean up a call with flat buffer 228 */ 229 void afs_flat_call_destructor(struct afs_call *call) 230 { 231 _enter(""); 232 233 kfree(call->request); 234 call->request = NULL; 235 kfree(call->buffer); 236 call->buffer = NULL; 237 } 238 239 /* 240 * initiate a call 241 */ 242 int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp, 243 const struct afs_wait_mode *wait_mode) 244 { 245 struct sockaddr_rxrpc srx; 246 struct rxrpc_call *rxcall; 247 struct msghdr msg; 248 struct kvec iov[1]; 249 int ret; 250 251 _enter("%x,{%d},", addr->s_addr, ntohs(call->port)); 252 253 ASSERT(call->type != NULL); 254 ASSERT(call->type->name != NULL); 255 256 _debug("MAKE %p{%s} [%d]", 257 call, call->type->name, atomic_read(&afs_outstanding_calls)); 258 259 call->wait_mode = wait_mode; 260 INIT_WORK(&call->async_work, afs_process_async_call); 261 262 memset(&srx, 0, sizeof(srx)); 263 srx.srx_family = AF_RXRPC; 264 srx.srx_service = call->service_id; 265 srx.transport_type = SOCK_DGRAM; 266 srx.transport_len = sizeof(srx.transport.sin); 267 srx.transport.sin.sin_family = AF_INET; 268 srx.transport.sin.sin_port = call->port; 269 memcpy(&srx.transport.sin.sin_addr, addr, 4); 270 271 /* create a call */ 272 rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key, 273 (unsigned long) call, gfp); 274 call->key = NULL; 275 if (IS_ERR(rxcall)) { 276 ret = PTR_ERR(rxcall); 277 goto error_kill_call; 278 } 279 280 call->rxcall = rxcall; 281 282 /* send the request */ 283 iov[0].iov_base = call->request; 284 iov[0].iov_len = call->request_size; 285 286 msg.msg_name = NULL; 287 msg.msg_namelen = 0; 288 msg.msg_iov = (struct iovec *) iov; 289 msg.msg_iovlen = 1; 290 msg.msg_control = NULL; 291 msg.msg_controllen = 0; 292 msg.msg_flags = 0; 293 294 /* have to change the state *before* sending the last packet as RxRPC 295 * might give us the reply before it returns from sending the 296 * request */ 297 call->state = AFS_CALL_AWAIT_REPLY; 298 ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size); 299 if (ret < 0) 300 goto error_do_abort; 301 302 /* at this point, an async call may no longer exist as it may have 303 * already completed */ 304 return wait_mode->wait(call); 305 306 error_do_abort: 307 rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT); 308 rxrpc_kernel_end_call(rxcall); 309 call->rxcall = NULL; 310 error_kill_call: 311 call->type->destructor(call); 312 afs_free_call(call); 313 _leave(" = %d", ret); 314 return ret; 315 } 316 317 /* 318 * handles intercepted messages that were arriving in the socket's Rx queue 319 * - called with the socket receive queue lock held to ensure message ordering 320 * - called with softirqs disabled 321 */ 322 static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID, 323 struct sk_buff *skb) 324 { 325 struct afs_call *call = (struct afs_call *) user_call_ID; 326 327 _enter("%p,,%u", call, skb->mark); 328 329 _debug("ICPT %p{%u} [%d]", 330 skb, skb->mark, atomic_read(&afs_outstanding_skbs)); 331 332 ASSERTCMP(sk, ==, afs_socket->sk); 333 atomic_inc(&afs_outstanding_skbs); 334 335 if (!call) { 336 /* its an incoming call for our callback service */ 337 skb_queue_tail(&afs_incoming_calls, skb); 338 schedule_work(&afs_collect_incoming_call_work); 339 } else { 340 /* route the messages directly to the appropriate call */ 341 skb_queue_tail(&call->rx_queue, skb); 342 call->wait_mode->rx_wakeup(call); 343 } 344 345 _leave(""); 346 } 347 348 /* 349 * deliver messages to a call 350 */ 351 static void afs_deliver_to_call(struct afs_call *call) 352 { 353 struct sk_buff *skb; 354 bool last; 355 u32 abort_code; 356 int ret; 357 358 _enter(""); 359 360 while ((call->state == AFS_CALL_AWAIT_REPLY || 361 call->state == AFS_CALL_AWAIT_OP_ID || 362 call->state == AFS_CALL_AWAIT_REQUEST || 363 call->state == AFS_CALL_AWAIT_ACK) && 364 (skb = skb_dequeue(&call->rx_queue))) { 365 switch (skb->mark) { 366 case RXRPC_SKB_MARK_DATA: 367 _debug("Rcv DATA"); 368 last = rxrpc_kernel_is_data_last(skb); 369 ret = call->type->deliver(call, skb, last); 370 switch (ret) { 371 case 0: 372 if (last && 373 call->state == AFS_CALL_AWAIT_REPLY) 374 call->state = AFS_CALL_COMPLETE; 375 break; 376 case -ENOTCONN: 377 abort_code = RX_CALL_DEAD; 378 goto do_abort; 379 case -ENOTSUPP: 380 abort_code = RX_INVALID_OPERATION; 381 goto do_abort; 382 default: 383 abort_code = RXGEN_CC_UNMARSHAL; 384 if (call->state != AFS_CALL_AWAIT_REPLY) 385 abort_code = RXGEN_SS_UNMARSHAL; 386 do_abort: 387 rxrpc_kernel_abort_call(call->rxcall, 388 abort_code); 389 call->error = ret; 390 call->state = AFS_CALL_ERROR; 391 break; 392 } 393 afs_data_delivered(skb); 394 skb = NULL; 395 continue; 396 case RXRPC_SKB_MARK_FINAL_ACK: 397 _debug("Rcv ACK"); 398 call->state = AFS_CALL_COMPLETE; 399 break; 400 case RXRPC_SKB_MARK_BUSY: 401 _debug("Rcv BUSY"); 402 call->error = -EBUSY; 403 call->state = AFS_CALL_BUSY; 404 break; 405 case RXRPC_SKB_MARK_REMOTE_ABORT: 406 abort_code = rxrpc_kernel_get_abort_code(skb); 407 call->error = call->type->abort_to_error(abort_code); 408 call->state = AFS_CALL_ABORTED; 409 _debug("Rcv ABORT %u -> %d", abort_code, call->error); 410 break; 411 case RXRPC_SKB_MARK_NET_ERROR: 412 call->error = -rxrpc_kernel_get_error_number(skb); 413 call->state = AFS_CALL_ERROR; 414 _debug("Rcv NET ERROR %d", call->error); 415 break; 416 case RXRPC_SKB_MARK_LOCAL_ERROR: 417 call->error = -rxrpc_kernel_get_error_number(skb); 418 call->state = AFS_CALL_ERROR; 419 _debug("Rcv LOCAL ERROR %d", call->error); 420 break; 421 default: 422 BUG(); 423 break; 424 } 425 426 afs_free_skb(skb); 427 } 428 429 /* make sure the queue is empty if the call is done with (we might have 430 * aborted the call early because of an unmarshalling error) */ 431 if (call->state >= AFS_CALL_COMPLETE) { 432 while ((skb = skb_dequeue(&call->rx_queue))) 433 afs_free_skb(skb); 434 if (call->incoming) { 435 rxrpc_kernel_end_call(call->rxcall); 436 call->rxcall = NULL; 437 call->type->destructor(call); 438 afs_free_call(call); 439 } 440 } 441 442 _leave(""); 443 } 444 445 /* 446 * wait synchronously for a call to complete 447 */ 448 static int afs_wait_for_call_to_complete(struct afs_call *call) 449 { 450 struct sk_buff *skb; 451 int ret; 452 453 DECLARE_WAITQUEUE(myself, current); 454 455 _enter(""); 456 457 add_wait_queue(&call->waitq, &myself); 458 for (;;) { 459 set_current_state(TASK_INTERRUPTIBLE); 460 461 /* deliver any messages that are in the queue */ 462 if (!skb_queue_empty(&call->rx_queue)) { 463 __set_current_state(TASK_RUNNING); 464 afs_deliver_to_call(call); 465 continue; 466 } 467 468 ret = call->error; 469 if (call->state >= AFS_CALL_COMPLETE) 470 break; 471 ret = -EINTR; 472 if (signal_pending(current)) 473 break; 474 schedule(); 475 } 476 477 remove_wait_queue(&call->waitq, &myself); 478 __set_current_state(TASK_RUNNING); 479 480 /* kill the call */ 481 if (call->state < AFS_CALL_COMPLETE) { 482 _debug("call incomplete"); 483 rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD); 484 while ((skb = skb_dequeue(&call->rx_queue))) 485 afs_free_skb(skb); 486 } 487 488 _debug("call complete"); 489 rxrpc_kernel_end_call(call->rxcall); 490 call->rxcall = NULL; 491 call->type->destructor(call); 492 afs_free_call(call); 493 _leave(" = %d", ret); 494 return ret; 495 } 496 497 /* 498 * wake up a waiting call 499 */ 500 static void afs_wake_up_call_waiter(struct afs_call *call) 501 { 502 wake_up(&call->waitq); 503 } 504 505 /* 506 * wake up an asynchronous call 507 */ 508 static void afs_wake_up_async_call(struct afs_call *call) 509 { 510 _enter(""); 511 queue_work(afs_async_calls, &call->async_work); 512 } 513 514 /* 515 * put a call into asynchronous mode 516 * - mustn't touch the call descriptor as the call my have completed by the 517 * time we get here 518 */ 519 static int afs_dont_wait_for_call_to_complete(struct afs_call *call) 520 { 521 _enter(""); 522 return -EINPROGRESS; 523 } 524 525 /* 526 * delete an asynchronous call 527 */ 528 static void afs_delete_async_call(struct work_struct *work) 529 { 530 struct afs_call *call = 531 container_of(work, struct afs_call, async_work); 532 533 _enter(""); 534 535 afs_free_call(call); 536 537 _leave(""); 538 } 539 540 /* 541 * perform processing on an asynchronous call 542 * - on a multiple-thread workqueue this work item may try to run on several 543 * CPUs at the same time 544 */ 545 static void afs_process_async_call(struct work_struct *work) 546 { 547 struct afs_call *call = 548 container_of(work, struct afs_call, async_work); 549 550 _enter(""); 551 552 if (!skb_queue_empty(&call->rx_queue)) 553 afs_deliver_to_call(call); 554 555 if (call->state >= AFS_CALL_COMPLETE && call->wait_mode) { 556 if (call->wait_mode->async_complete) 557 call->wait_mode->async_complete(call->reply, 558 call->error); 559 call->reply = NULL; 560 561 /* kill the call */ 562 rxrpc_kernel_end_call(call->rxcall); 563 call->rxcall = NULL; 564 if (call->type->destructor) 565 call->type->destructor(call); 566 567 /* we can't just delete the call because the work item may be 568 * queued */ 569 PREPARE_WORK(&call->async_work, afs_delete_async_call); 570 queue_work(afs_async_calls, &call->async_work); 571 } 572 573 _leave(""); 574 } 575 576 /* 577 * empty a socket buffer into a flat reply buffer 578 */ 579 void afs_transfer_reply(struct afs_call *call, struct sk_buff *skb) 580 { 581 size_t len = skb->len; 582 583 if (skb_copy_bits(skb, 0, call->buffer + call->reply_size, len) < 0) 584 BUG(); 585 call->reply_size += len; 586 } 587 588 /* 589 * accept the backlog of incoming calls 590 */ 591 static void afs_collect_incoming_call(struct work_struct *work) 592 { 593 struct rxrpc_call *rxcall; 594 struct afs_call *call = NULL; 595 struct sk_buff *skb; 596 597 while ((skb = skb_dequeue(&afs_incoming_calls))) { 598 _debug("new call"); 599 600 /* don't need the notification */ 601 afs_free_skb(skb); 602 603 if (!call) { 604 call = kzalloc(sizeof(struct afs_call), GFP_KERNEL); 605 if (!call) { 606 rxrpc_kernel_reject_call(afs_socket); 607 return; 608 } 609 610 INIT_WORK(&call->async_work, afs_process_async_call); 611 call->wait_mode = &afs_async_incoming_call; 612 call->type = &afs_RXCMxxxx; 613 init_waitqueue_head(&call->waitq); 614 skb_queue_head_init(&call->rx_queue); 615 call->state = AFS_CALL_AWAIT_OP_ID; 616 617 _debug("CALL %p{%s} [%d]", 618 call, call->type->name, 619 atomic_read(&afs_outstanding_calls)); 620 atomic_inc(&afs_outstanding_calls); 621 } 622 623 rxcall = rxrpc_kernel_accept_call(afs_socket, 624 (unsigned long) call); 625 if (!IS_ERR(rxcall)) { 626 call->rxcall = rxcall; 627 call = NULL; 628 } 629 } 630 631 if (call) 632 afs_free_call(call); 633 } 634 635 /* 636 * grab the operation ID from an incoming cache manager call 637 */ 638 static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb, 639 bool last) 640 { 641 size_t len = skb->len; 642 void *oibuf = (void *) &call->operation_ID; 643 644 _enter("{%u},{%zu},%d", call->offset, len, last); 645 646 ASSERTCMP(call->offset, <, 4); 647 648 /* the operation ID forms the first four bytes of the request data */ 649 len = min_t(size_t, len, 4 - call->offset); 650 if (skb_copy_bits(skb, 0, oibuf + call->offset, len) < 0) 651 BUG(); 652 if (!pskb_pull(skb, len)) 653 BUG(); 654 call->offset += len; 655 656 if (call->offset < 4) { 657 if (last) { 658 _leave(" = -EBADMSG [op ID short]"); 659 return -EBADMSG; 660 } 661 _leave(" = 0 [incomplete]"); 662 return 0; 663 } 664 665 call->state = AFS_CALL_AWAIT_REQUEST; 666 667 /* ask the cache manager to route the call (it'll change the call type 668 * if successful) */ 669 if (!afs_cm_incoming_call(call)) 670 return -ENOTSUPP; 671 672 /* pass responsibility for the remainer of this message off to the 673 * cache manager op */ 674 return call->type->deliver(call, skb, last); 675 } 676 677 /* 678 * send an empty reply 679 */ 680 void afs_send_empty_reply(struct afs_call *call) 681 { 682 struct msghdr msg; 683 struct iovec iov[1]; 684 685 _enter(""); 686 687 iov[0].iov_base = NULL; 688 iov[0].iov_len = 0; 689 msg.msg_name = NULL; 690 msg.msg_namelen = 0; 691 msg.msg_iov = iov; 692 msg.msg_iovlen = 0; 693 msg.msg_control = NULL; 694 msg.msg_controllen = 0; 695 msg.msg_flags = 0; 696 697 call->state = AFS_CALL_AWAIT_ACK; 698 switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) { 699 case 0: 700 _leave(" [replied]"); 701 return; 702 703 case -ENOMEM: 704 _debug("oom"); 705 rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT); 706 default: 707 rxrpc_kernel_end_call(call->rxcall); 708 call->rxcall = NULL; 709 call->type->destructor(call); 710 afs_free_call(call); 711 _leave(" [error]"); 712 return; 713 } 714 } 715 716 /* 717 * send a simple reply 718 */ 719 void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len) 720 { 721 struct msghdr msg; 722 struct iovec iov[1]; 723 724 _enter(""); 725 726 iov[0].iov_base = (void *) buf; 727 iov[0].iov_len = len; 728 msg.msg_name = NULL; 729 msg.msg_namelen = 0; 730 msg.msg_iov = iov; 731 msg.msg_iovlen = 1; 732 msg.msg_control = NULL; 733 msg.msg_controllen = 0; 734 msg.msg_flags = 0; 735 736 call->state = AFS_CALL_AWAIT_ACK; 737 switch (rxrpc_kernel_send_data(call->rxcall, &msg, len)) { 738 case 0: 739 _leave(" [replied]"); 740 return; 741 742 case -ENOMEM: 743 _debug("oom"); 744 rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT); 745 default: 746 rxrpc_kernel_end_call(call->rxcall); 747 call->rxcall = NULL; 748 call->type->destructor(call); 749 afs_free_call(call); 750 _leave(" [error]"); 751 return; 752 } 753 } 754 755 /* 756 * extract a piece of data from the received data socket buffers 757 */ 758 int afs_extract_data(struct afs_call *call, struct sk_buff *skb, 759 bool last, void *buf, size_t count) 760 { 761 size_t len = skb->len; 762 763 _enter("{%u},{%zu},%d,,%zu", call->offset, len, last, count); 764 765 ASSERTCMP(call->offset, <, count); 766 767 len = min_t(size_t, len, count - call->offset); 768 if (skb_copy_bits(skb, 0, buf + call->offset, len) < 0 || 769 !pskb_pull(skb, len)) 770 BUG(); 771 call->offset += len; 772 773 if (call->offset < count) { 774 if (last) { 775 _leave(" = -EBADMSG [%d < %zu]", call->offset, count); 776 return -EBADMSG; 777 } 778 _leave(" = -EAGAIN"); 779 return -EAGAIN; 780 } 781 return 0; 782 } 783