xref: /linux/fs/afs/rxrpc.c (revision 0883c2c06fb5bcf5b9e008270827e63c09a88c1e)
1 /* Maintain an RxRPC server socket to do AFS communications through
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11 
12 #include <linux/slab.h>
13 #include <net/sock.h>
14 #include <net/af_rxrpc.h>
15 #include <rxrpc/packet.h>
16 #include "internal.h"
17 #include "afs_cm.h"
18 
19 static struct socket *afs_socket; /* my RxRPC socket */
20 static struct workqueue_struct *afs_async_calls;
21 static atomic_t afs_outstanding_calls;
22 static atomic_t afs_outstanding_skbs;
23 
24 static void afs_wake_up_call_waiter(struct afs_call *);
25 static int afs_wait_for_call_to_complete(struct afs_call *);
26 static void afs_wake_up_async_call(struct afs_call *);
27 static int afs_dont_wait_for_call_to_complete(struct afs_call *);
28 static void afs_process_async_call(struct afs_call *);
29 static void afs_rx_interceptor(struct sock *, unsigned long, struct sk_buff *);
30 static int afs_deliver_cm_op_id(struct afs_call *, struct sk_buff *, bool);
31 
32 /* synchronous call management */
33 const struct afs_wait_mode afs_sync_call = {
34 	.rx_wakeup	= afs_wake_up_call_waiter,
35 	.wait		= afs_wait_for_call_to_complete,
36 };
37 
38 /* asynchronous call management */
39 const struct afs_wait_mode afs_async_call = {
40 	.rx_wakeup	= afs_wake_up_async_call,
41 	.wait		= afs_dont_wait_for_call_to_complete,
42 };
43 
44 /* asynchronous incoming call management */
45 static const struct afs_wait_mode afs_async_incoming_call = {
46 	.rx_wakeup	= afs_wake_up_async_call,
47 };
48 
49 /* asynchronous incoming call initial processing */
50 static const struct afs_call_type afs_RXCMxxxx = {
51 	.name		= "CB.xxxx",
52 	.deliver	= afs_deliver_cm_op_id,
53 	.abort_to_error	= afs_abort_to_error,
54 };
55 
56 static void afs_collect_incoming_call(struct work_struct *);
57 
58 static struct sk_buff_head afs_incoming_calls;
59 static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
60 
61 static void afs_async_workfn(struct work_struct *work)
62 {
63 	struct afs_call *call = container_of(work, struct afs_call, async_work);
64 
65 	call->async_workfn(call);
66 }
67 
68 static int afs_wait_atomic_t(atomic_t *p)
69 {
70 	schedule();
71 	return 0;
72 }
73 
74 /*
75  * open an RxRPC socket and bind it to be a server for callback notifications
76  * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
77  */
78 int afs_open_socket(void)
79 {
80 	struct sockaddr_rxrpc srx;
81 	struct socket *socket;
82 	int ret;
83 
84 	_enter("");
85 
86 	skb_queue_head_init(&afs_incoming_calls);
87 
88 	afs_async_calls = create_singlethread_workqueue("kafsd");
89 	if (!afs_async_calls) {
90 		_leave(" = -ENOMEM [wq]");
91 		return -ENOMEM;
92 	}
93 
94 	ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET, &socket);
95 	if (ret < 0) {
96 		destroy_workqueue(afs_async_calls);
97 		_leave(" = %d [socket]", ret);
98 		return ret;
99 	}
100 
101 	socket->sk->sk_allocation = GFP_NOFS;
102 
103 	/* bind the callback manager's address to make this a server socket */
104 	srx.srx_family			= AF_RXRPC;
105 	srx.srx_service			= CM_SERVICE;
106 	srx.transport_type		= SOCK_DGRAM;
107 	srx.transport_len		= sizeof(srx.transport.sin);
108 	srx.transport.sin.sin_family	= AF_INET;
109 	srx.transport.sin.sin_port	= htons(AFS_CM_PORT);
110 	memset(&srx.transport.sin.sin_addr, 0,
111 	       sizeof(srx.transport.sin.sin_addr));
112 
113 	ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
114 	if (ret < 0) {
115 		sock_release(socket);
116 		destroy_workqueue(afs_async_calls);
117 		_leave(" = %d [bind]", ret);
118 		return ret;
119 	}
120 
121 	rxrpc_kernel_intercept_rx_messages(socket, afs_rx_interceptor);
122 
123 	afs_socket = socket;
124 	_leave(" = 0");
125 	return 0;
126 }
127 
128 /*
129  * close the RxRPC socket AFS was using
130  */
131 void afs_close_socket(void)
132 {
133 	_enter("");
134 
135 	wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
136 			 TASK_UNINTERRUPTIBLE);
137 	_debug("no outstanding calls");
138 
139 	sock_release(afs_socket);
140 
141 	_debug("dework");
142 	destroy_workqueue(afs_async_calls);
143 
144 	ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0);
145 	_leave("");
146 }
147 
148 /*
149  * note that the data in a socket buffer is now delivered and that the buffer
150  * should be freed
151  */
152 static void afs_data_delivered(struct sk_buff *skb)
153 {
154 	if (!skb) {
155 		_debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs));
156 		dump_stack();
157 	} else {
158 		_debug("DLVR %p{%u} [%d]",
159 		       skb, skb->mark, atomic_read(&afs_outstanding_skbs));
160 		if (atomic_dec_return(&afs_outstanding_skbs) == -1)
161 			BUG();
162 		rxrpc_kernel_data_delivered(skb);
163 	}
164 }
165 
166 /*
167  * free a socket buffer
168  */
169 static void afs_free_skb(struct sk_buff *skb)
170 {
171 	if (!skb) {
172 		_debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs));
173 		dump_stack();
174 	} else {
175 		_debug("FREE %p{%u} [%d]",
176 		       skb, skb->mark, atomic_read(&afs_outstanding_skbs));
177 		if (atomic_dec_return(&afs_outstanding_skbs) == -1)
178 			BUG();
179 		rxrpc_kernel_free_skb(skb);
180 	}
181 }
182 
183 /*
184  * free a call
185  */
186 static void afs_free_call(struct afs_call *call)
187 {
188 	_debug("DONE %p{%s} [%d]",
189 	       call, call->type->name, atomic_read(&afs_outstanding_calls));
190 
191 	ASSERTCMP(call->rxcall, ==, NULL);
192 	ASSERT(!work_pending(&call->async_work));
193 	ASSERT(skb_queue_empty(&call->rx_queue));
194 	ASSERT(call->type->name != NULL);
195 
196 	kfree(call->request);
197 	kfree(call);
198 
199 	if (atomic_dec_and_test(&afs_outstanding_calls))
200 		wake_up_atomic_t(&afs_outstanding_calls);
201 }
202 
203 /*
204  * End a call but do not free it
205  */
206 static void afs_end_call_nofree(struct afs_call *call)
207 {
208 	if (call->rxcall) {
209 		rxrpc_kernel_end_call(call->rxcall);
210 		call->rxcall = NULL;
211 	}
212 	if (call->type->destructor)
213 		call->type->destructor(call);
214 }
215 
216 /*
217  * End a call and free it
218  */
219 static void afs_end_call(struct afs_call *call)
220 {
221 	afs_end_call_nofree(call);
222 	afs_free_call(call);
223 }
224 
225 /*
226  * allocate a call with flat request and reply buffers
227  */
228 struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
229 				     size_t request_size, size_t reply_size)
230 {
231 	struct afs_call *call;
232 
233 	call = kzalloc(sizeof(*call), GFP_NOFS);
234 	if (!call)
235 		goto nomem_call;
236 
237 	_debug("CALL %p{%s} [%d]",
238 	       call, type->name, atomic_read(&afs_outstanding_calls));
239 	atomic_inc(&afs_outstanding_calls);
240 
241 	call->type = type;
242 	call->request_size = request_size;
243 	call->reply_max = reply_size;
244 
245 	if (request_size) {
246 		call->request = kmalloc(request_size, GFP_NOFS);
247 		if (!call->request)
248 			goto nomem_free;
249 	}
250 
251 	if (reply_size) {
252 		call->buffer = kmalloc(reply_size, GFP_NOFS);
253 		if (!call->buffer)
254 			goto nomem_free;
255 	}
256 
257 	init_waitqueue_head(&call->waitq);
258 	skb_queue_head_init(&call->rx_queue);
259 	return call;
260 
261 nomem_free:
262 	afs_free_call(call);
263 nomem_call:
264 	return NULL;
265 }
266 
267 /*
268  * clean up a call with flat buffer
269  */
270 void afs_flat_call_destructor(struct afs_call *call)
271 {
272 	_enter("");
273 
274 	kfree(call->request);
275 	call->request = NULL;
276 	kfree(call->buffer);
277 	call->buffer = NULL;
278 }
279 
280 /*
281  * attach the data from a bunch of pages on an inode to a call
282  */
283 static int afs_send_pages(struct afs_call *call, struct msghdr *msg,
284 			  struct kvec *iov)
285 {
286 	struct page *pages[8];
287 	unsigned count, n, loop, offset, to;
288 	pgoff_t first = call->first, last = call->last;
289 	int ret;
290 
291 	_enter("");
292 
293 	offset = call->first_offset;
294 	call->first_offset = 0;
295 
296 	do {
297 		_debug("attach %lx-%lx", first, last);
298 
299 		count = last - first + 1;
300 		if (count > ARRAY_SIZE(pages))
301 			count = ARRAY_SIZE(pages);
302 		n = find_get_pages_contig(call->mapping, first, count, pages);
303 		ASSERTCMP(n, ==, count);
304 
305 		loop = 0;
306 		do {
307 			msg->msg_flags = 0;
308 			to = PAGE_SIZE;
309 			if (first + loop >= last)
310 				to = call->last_to;
311 			else
312 				msg->msg_flags = MSG_MORE;
313 			iov->iov_base = kmap(pages[loop]) + offset;
314 			iov->iov_len = to - offset;
315 			offset = 0;
316 
317 			_debug("- range %u-%u%s",
318 			       offset, to, msg->msg_flags ? " [more]" : "");
319 			iov_iter_kvec(&msg->msg_iter, WRITE | ITER_KVEC,
320 				      iov, 1, to - offset);
321 
322 			/* have to change the state *before* sending the last
323 			 * packet as RxRPC might give us the reply before it
324 			 * returns from sending the request */
325 			if (first + loop >= last)
326 				call->state = AFS_CALL_AWAIT_REPLY;
327 			ret = rxrpc_kernel_send_data(call->rxcall, msg,
328 						     to - offset);
329 			kunmap(pages[loop]);
330 			if (ret < 0)
331 				break;
332 		} while (++loop < count);
333 		first += count;
334 
335 		for (loop = 0; loop < count; loop++)
336 			put_page(pages[loop]);
337 		if (ret < 0)
338 			break;
339 	} while (first <= last);
340 
341 	_leave(" = %d", ret);
342 	return ret;
343 }
344 
345 /*
346  * initiate a call
347  */
348 int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
349 		  const struct afs_wait_mode *wait_mode)
350 {
351 	struct sockaddr_rxrpc srx;
352 	struct rxrpc_call *rxcall;
353 	struct msghdr msg;
354 	struct kvec iov[1];
355 	int ret;
356 	struct sk_buff *skb;
357 
358 	_enter("%x,{%d},", addr->s_addr, ntohs(call->port));
359 
360 	ASSERT(call->type != NULL);
361 	ASSERT(call->type->name != NULL);
362 
363 	_debug("____MAKE %p{%s,%x} [%d]____",
364 	       call, call->type->name, key_serial(call->key),
365 	       atomic_read(&afs_outstanding_calls));
366 
367 	call->wait_mode = wait_mode;
368 	call->async_workfn = afs_process_async_call;
369 	INIT_WORK(&call->async_work, afs_async_workfn);
370 
371 	memset(&srx, 0, sizeof(srx));
372 	srx.srx_family = AF_RXRPC;
373 	srx.srx_service = call->service_id;
374 	srx.transport_type = SOCK_DGRAM;
375 	srx.transport_len = sizeof(srx.transport.sin);
376 	srx.transport.sin.sin_family = AF_INET;
377 	srx.transport.sin.sin_port = call->port;
378 	memcpy(&srx.transport.sin.sin_addr, addr, 4);
379 
380 	/* create a call */
381 	rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
382 					 (unsigned long) call, gfp);
383 	call->key = NULL;
384 	if (IS_ERR(rxcall)) {
385 		ret = PTR_ERR(rxcall);
386 		goto error_kill_call;
387 	}
388 
389 	call->rxcall = rxcall;
390 
391 	/* send the request */
392 	iov[0].iov_base	= call->request;
393 	iov[0].iov_len	= call->request_size;
394 
395 	msg.msg_name		= NULL;
396 	msg.msg_namelen		= 0;
397 	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
398 		      call->request_size);
399 	msg.msg_control		= NULL;
400 	msg.msg_controllen	= 0;
401 	msg.msg_flags		= (call->send_pages ? MSG_MORE : 0);
402 
403 	/* have to change the state *before* sending the last packet as RxRPC
404 	 * might give us the reply before it returns from sending the
405 	 * request */
406 	if (!call->send_pages)
407 		call->state = AFS_CALL_AWAIT_REPLY;
408 	ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size);
409 	if (ret < 0)
410 		goto error_do_abort;
411 
412 	if (call->send_pages) {
413 		ret = afs_send_pages(call, &msg, iov);
414 		if (ret < 0)
415 			goto error_do_abort;
416 	}
417 
418 	/* at this point, an async call may no longer exist as it may have
419 	 * already completed */
420 	return wait_mode->wait(call);
421 
422 error_do_abort:
423 	rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT);
424 	while ((skb = skb_dequeue(&call->rx_queue)))
425 		afs_free_skb(skb);
426 error_kill_call:
427 	afs_end_call(call);
428 	_leave(" = %d", ret);
429 	return ret;
430 }
431 
432 /*
433  * Handles intercepted messages that were arriving in the socket's Rx queue.
434  *
435  * Called from the AF_RXRPC call processor in waitqueue process context.  For
436  * each call, it is guaranteed this will be called in order of packet to be
437  * delivered.
438  */
439 static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID,
440 			       struct sk_buff *skb)
441 {
442 	struct afs_call *call = (struct afs_call *) user_call_ID;
443 
444 	_enter("%p,,%u", call, skb->mark);
445 
446 	_debug("ICPT %p{%u} [%d]",
447 	       skb, skb->mark, atomic_read(&afs_outstanding_skbs));
448 
449 	ASSERTCMP(sk, ==, afs_socket->sk);
450 	atomic_inc(&afs_outstanding_skbs);
451 
452 	if (!call) {
453 		/* its an incoming call for our callback service */
454 		skb_queue_tail(&afs_incoming_calls, skb);
455 		queue_work(afs_wq, &afs_collect_incoming_call_work);
456 	} else {
457 		/* route the messages directly to the appropriate call */
458 		skb_queue_tail(&call->rx_queue, skb);
459 		call->wait_mode->rx_wakeup(call);
460 	}
461 
462 	_leave("");
463 }
464 
465 /*
466  * deliver messages to a call
467  */
468 static void afs_deliver_to_call(struct afs_call *call)
469 {
470 	struct sk_buff *skb;
471 	bool last;
472 	u32 abort_code;
473 	int ret;
474 
475 	_enter("");
476 
477 	while ((call->state == AFS_CALL_AWAIT_REPLY ||
478 		call->state == AFS_CALL_AWAIT_OP_ID ||
479 		call->state == AFS_CALL_AWAIT_REQUEST ||
480 		call->state == AFS_CALL_AWAIT_ACK) &&
481 	       (skb = skb_dequeue(&call->rx_queue))) {
482 		switch (skb->mark) {
483 		case RXRPC_SKB_MARK_DATA:
484 			_debug("Rcv DATA");
485 			last = rxrpc_kernel_is_data_last(skb);
486 			ret = call->type->deliver(call, skb, last);
487 			switch (ret) {
488 			case 0:
489 				if (last &&
490 				    call->state == AFS_CALL_AWAIT_REPLY)
491 					call->state = AFS_CALL_COMPLETE;
492 				break;
493 			case -ENOTCONN:
494 				abort_code = RX_CALL_DEAD;
495 				goto do_abort;
496 			case -ENOTSUPP:
497 				abort_code = RX_INVALID_OPERATION;
498 				goto do_abort;
499 			default:
500 				abort_code = RXGEN_CC_UNMARSHAL;
501 				if (call->state != AFS_CALL_AWAIT_REPLY)
502 					abort_code = RXGEN_SS_UNMARSHAL;
503 			do_abort:
504 				rxrpc_kernel_abort_call(call->rxcall,
505 							abort_code);
506 				call->error = ret;
507 				call->state = AFS_CALL_ERROR;
508 				break;
509 			}
510 			afs_data_delivered(skb);
511 			skb = NULL;
512 			continue;
513 		case RXRPC_SKB_MARK_FINAL_ACK:
514 			_debug("Rcv ACK");
515 			call->state = AFS_CALL_COMPLETE;
516 			break;
517 		case RXRPC_SKB_MARK_BUSY:
518 			_debug("Rcv BUSY");
519 			call->error = -EBUSY;
520 			call->state = AFS_CALL_BUSY;
521 			break;
522 		case RXRPC_SKB_MARK_REMOTE_ABORT:
523 			abort_code = rxrpc_kernel_get_abort_code(skb);
524 			call->error = call->type->abort_to_error(abort_code);
525 			call->state = AFS_CALL_ABORTED;
526 			_debug("Rcv ABORT %u -> %d", abort_code, call->error);
527 			break;
528 		case RXRPC_SKB_MARK_LOCAL_ABORT:
529 			abort_code = rxrpc_kernel_get_abort_code(skb);
530 			call->error = call->type->abort_to_error(abort_code);
531 			call->state = AFS_CALL_ABORTED;
532 			_debug("Loc ABORT %u -> %d", abort_code, call->error);
533 			break;
534 		case RXRPC_SKB_MARK_NET_ERROR:
535 			call->error = -rxrpc_kernel_get_error_number(skb);
536 			call->state = AFS_CALL_ERROR;
537 			_debug("Rcv NET ERROR %d", call->error);
538 			break;
539 		case RXRPC_SKB_MARK_LOCAL_ERROR:
540 			call->error = -rxrpc_kernel_get_error_number(skb);
541 			call->state = AFS_CALL_ERROR;
542 			_debug("Rcv LOCAL ERROR %d", call->error);
543 			break;
544 		default:
545 			BUG();
546 			break;
547 		}
548 
549 		afs_free_skb(skb);
550 	}
551 
552 	/* make sure the queue is empty if the call is done with (we might have
553 	 * aborted the call early because of an unmarshalling error) */
554 	if (call->state >= AFS_CALL_COMPLETE) {
555 		while ((skb = skb_dequeue(&call->rx_queue)))
556 			afs_free_skb(skb);
557 		if (call->incoming)
558 			afs_end_call(call);
559 	}
560 
561 	_leave("");
562 }
563 
564 /*
565  * wait synchronously for a call to complete
566  */
567 static int afs_wait_for_call_to_complete(struct afs_call *call)
568 {
569 	struct sk_buff *skb;
570 	int ret;
571 
572 	DECLARE_WAITQUEUE(myself, current);
573 
574 	_enter("");
575 
576 	add_wait_queue(&call->waitq, &myself);
577 	for (;;) {
578 		set_current_state(TASK_INTERRUPTIBLE);
579 
580 		/* deliver any messages that are in the queue */
581 		if (!skb_queue_empty(&call->rx_queue)) {
582 			__set_current_state(TASK_RUNNING);
583 			afs_deliver_to_call(call);
584 			continue;
585 		}
586 
587 		ret = call->error;
588 		if (call->state >= AFS_CALL_COMPLETE)
589 			break;
590 		ret = -EINTR;
591 		if (signal_pending(current))
592 			break;
593 		schedule();
594 	}
595 
596 	remove_wait_queue(&call->waitq, &myself);
597 	__set_current_state(TASK_RUNNING);
598 
599 	/* kill the call */
600 	if (call->state < AFS_CALL_COMPLETE) {
601 		_debug("call incomplete");
602 		rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD);
603 		while ((skb = skb_dequeue(&call->rx_queue)))
604 			afs_free_skb(skb);
605 	}
606 
607 	_debug("call complete");
608 	afs_end_call(call);
609 	_leave(" = %d", ret);
610 	return ret;
611 }
612 
613 /*
614  * wake up a waiting call
615  */
616 static void afs_wake_up_call_waiter(struct afs_call *call)
617 {
618 	wake_up(&call->waitq);
619 }
620 
621 /*
622  * wake up an asynchronous call
623  */
624 static void afs_wake_up_async_call(struct afs_call *call)
625 {
626 	_enter("");
627 	queue_work(afs_async_calls, &call->async_work);
628 }
629 
630 /*
631  * put a call into asynchronous mode
632  * - mustn't touch the call descriptor as the call my have completed by the
633  *   time we get here
634  */
635 static int afs_dont_wait_for_call_to_complete(struct afs_call *call)
636 {
637 	_enter("");
638 	return -EINPROGRESS;
639 }
640 
641 /*
642  * delete an asynchronous call
643  */
644 static void afs_delete_async_call(struct afs_call *call)
645 {
646 	_enter("");
647 
648 	afs_free_call(call);
649 
650 	_leave("");
651 }
652 
653 /*
654  * perform processing on an asynchronous call
655  * - on a multiple-thread workqueue this work item may try to run on several
656  *   CPUs at the same time
657  */
658 static void afs_process_async_call(struct afs_call *call)
659 {
660 	_enter("");
661 
662 	if (!skb_queue_empty(&call->rx_queue))
663 		afs_deliver_to_call(call);
664 
665 	if (call->state >= AFS_CALL_COMPLETE && call->wait_mode) {
666 		if (call->wait_mode->async_complete)
667 			call->wait_mode->async_complete(call->reply,
668 							call->error);
669 		call->reply = NULL;
670 
671 		/* kill the call */
672 		afs_end_call_nofree(call);
673 
674 		/* we can't just delete the call because the work item may be
675 		 * queued */
676 		call->async_workfn = afs_delete_async_call;
677 		queue_work(afs_async_calls, &call->async_work);
678 	}
679 
680 	_leave("");
681 }
682 
683 /*
684  * empty a socket buffer into a flat reply buffer
685  */
686 void afs_transfer_reply(struct afs_call *call, struct sk_buff *skb)
687 {
688 	size_t len = skb->len;
689 
690 	if (skb_copy_bits(skb, 0, call->buffer + call->reply_size, len) < 0)
691 		BUG();
692 	call->reply_size += len;
693 }
694 
695 /*
696  * accept the backlog of incoming calls
697  */
698 static void afs_collect_incoming_call(struct work_struct *work)
699 {
700 	struct rxrpc_call *rxcall;
701 	struct afs_call *call = NULL;
702 	struct sk_buff *skb;
703 
704 	while ((skb = skb_dequeue(&afs_incoming_calls))) {
705 		_debug("new call");
706 
707 		/* don't need the notification */
708 		afs_free_skb(skb);
709 
710 		if (!call) {
711 			call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
712 			if (!call) {
713 				rxrpc_kernel_reject_call(afs_socket);
714 				return;
715 			}
716 
717 			call->async_workfn = afs_process_async_call;
718 			INIT_WORK(&call->async_work, afs_async_workfn);
719 			call->wait_mode = &afs_async_incoming_call;
720 			call->type = &afs_RXCMxxxx;
721 			init_waitqueue_head(&call->waitq);
722 			skb_queue_head_init(&call->rx_queue);
723 			call->state = AFS_CALL_AWAIT_OP_ID;
724 
725 			_debug("CALL %p{%s} [%d]",
726 			       call, call->type->name,
727 			       atomic_read(&afs_outstanding_calls));
728 			atomic_inc(&afs_outstanding_calls);
729 		}
730 
731 		rxcall = rxrpc_kernel_accept_call(afs_socket,
732 						  (unsigned long) call);
733 		if (!IS_ERR(rxcall)) {
734 			call->rxcall = rxcall;
735 			call = NULL;
736 		}
737 	}
738 
739 	if (call)
740 		afs_free_call(call);
741 }
742 
743 /*
744  * grab the operation ID from an incoming cache manager call
745  */
746 static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb,
747 				bool last)
748 {
749 	size_t len = skb->len;
750 	void *oibuf = (void *) &call->operation_ID;
751 
752 	_enter("{%u},{%zu},%d", call->offset, len, last);
753 
754 	ASSERTCMP(call->offset, <, 4);
755 
756 	/* the operation ID forms the first four bytes of the request data */
757 	len = min_t(size_t, len, 4 - call->offset);
758 	if (skb_copy_bits(skb, 0, oibuf + call->offset, len) < 0)
759 		BUG();
760 	if (!pskb_pull(skb, len))
761 		BUG();
762 	call->offset += len;
763 
764 	if (call->offset < 4) {
765 		if (last) {
766 			_leave(" = -EBADMSG [op ID short]");
767 			return -EBADMSG;
768 		}
769 		_leave(" = 0 [incomplete]");
770 		return 0;
771 	}
772 
773 	call->state = AFS_CALL_AWAIT_REQUEST;
774 
775 	/* ask the cache manager to route the call (it'll change the call type
776 	 * if successful) */
777 	if (!afs_cm_incoming_call(call))
778 		return -ENOTSUPP;
779 
780 	/* pass responsibility for the remainer of this message off to the
781 	 * cache manager op */
782 	return call->type->deliver(call, skb, last);
783 }
784 
785 /*
786  * send an empty reply
787  */
788 void afs_send_empty_reply(struct afs_call *call)
789 {
790 	struct msghdr msg;
791 
792 	_enter("");
793 
794 	msg.msg_name		= NULL;
795 	msg.msg_namelen		= 0;
796 	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
797 	msg.msg_control		= NULL;
798 	msg.msg_controllen	= 0;
799 	msg.msg_flags		= 0;
800 
801 	call->state = AFS_CALL_AWAIT_ACK;
802 	switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) {
803 	case 0:
804 		_leave(" [replied]");
805 		return;
806 
807 	case -ENOMEM:
808 		_debug("oom");
809 		rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
810 	default:
811 		afs_end_call(call);
812 		_leave(" [error]");
813 		return;
814 	}
815 }
816 
817 /*
818  * send a simple reply
819  */
820 void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
821 {
822 	struct msghdr msg;
823 	struct kvec iov[1];
824 	int n;
825 
826 	_enter("");
827 
828 	iov[0].iov_base		= (void *) buf;
829 	iov[0].iov_len		= len;
830 	msg.msg_name		= NULL;
831 	msg.msg_namelen		= 0;
832 	iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
833 	msg.msg_control		= NULL;
834 	msg.msg_controllen	= 0;
835 	msg.msg_flags		= 0;
836 
837 	call->state = AFS_CALL_AWAIT_ACK;
838 	n = rxrpc_kernel_send_data(call->rxcall, &msg, len);
839 	if (n >= 0) {
840 		/* Success */
841 		_leave(" [replied]");
842 		return;
843 	}
844 
845 	if (n == -ENOMEM) {
846 		_debug("oom");
847 		rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
848 	}
849 	afs_end_call(call);
850 	_leave(" [error]");
851 }
852 
853 /*
854  * extract a piece of data from the received data socket buffers
855  */
856 int afs_extract_data(struct afs_call *call, struct sk_buff *skb,
857 		     bool last, void *buf, size_t count)
858 {
859 	size_t len = skb->len;
860 
861 	_enter("{%u},{%zu},%d,,%zu", call->offset, len, last, count);
862 
863 	ASSERTCMP(call->offset, <, count);
864 
865 	len = min_t(size_t, len, count - call->offset);
866 	if (skb_copy_bits(skb, 0, buf + call->offset, len) < 0 ||
867 	    !pskb_pull(skb, len))
868 		BUG();
869 	call->offset += len;
870 
871 	if (call->offset < count) {
872 		if (last) {
873 			_leave(" = -EBADMSG [%d < %zu]", call->offset, count);
874 			return -EBADMSG;
875 		}
876 		_leave(" = -EAGAIN");
877 		return -EAGAIN;
878 	}
879 	return 0;
880 }
881