xref: /linux/net/rxrpc/af_rxrpc.c (revision 0d240e7811c4ec1965760ee4643b5bbc9cfacbb3)
1 /* AF_RXRPC implementation
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11 
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
13 
14 #include <linux/module.h>
15 #include <linux/kernel.h>
16 #include <linux/net.h>
17 #include <linux/slab.h>
18 #include <linux/skbuff.h>
19 #include <linux/poll.h>
20 #include <linux/proc_fs.h>
21 #include <linux/key-type.h>
22 #include <net/net_namespace.h>
23 #include <net/sock.h>
24 #include <net/af_rxrpc.h>
25 #include "ar-internal.h"
26 
27 MODULE_DESCRIPTION("RxRPC network protocol");
28 MODULE_AUTHOR("Red Hat, Inc.");
29 MODULE_LICENSE("GPL");
30 MODULE_ALIAS_NETPROTO(PF_RXRPC);
31 
32 unsigned int rxrpc_debug; // = RXRPC_DEBUG_KPROTO;
33 module_param_named(debug, rxrpc_debug, uint, S_IWUSR | S_IRUGO);
34 MODULE_PARM_DESC(debug, "RxRPC debugging mask");
35 
36 static struct proto rxrpc_proto;
37 static const struct proto_ops rxrpc_rpc_ops;
38 
39 /* local epoch for detecting local-end reset */
40 u32 rxrpc_epoch;
41 
42 /* current debugging ID */
43 atomic_t rxrpc_debug_id;
44 
45 /* count of skbs currently in use */
46 atomic_t rxrpc_n_skbs;
47 
48 struct workqueue_struct *rxrpc_workqueue;
49 
50 static void rxrpc_sock_destructor(struct sock *);
51 
52 /*
53  * see if an RxRPC socket is currently writable
54  */
55 static inline int rxrpc_writable(struct sock *sk)
56 {
57 	return atomic_read(&sk->sk_wmem_alloc) < (size_t) sk->sk_sndbuf;
58 }
59 
60 /*
61  * wait for write bufferage to become available
62  */
63 static void rxrpc_write_space(struct sock *sk)
64 {
65 	_enter("%p", sk);
66 	rcu_read_lock();
67 	if (rxrpc_writable(sk)) {
68 		struct socket_wq *wq = rcu_dereference(sk->sk_wq);
69 
70 		if (skwq_has_sleeper(wq))
71 			wake_up_interruptible(&wq->wait);
72 		sk_wake_async(sk, SOCK_WAKE_SPACE, POLL_OUT);
73 	}
74 	rcu_read_unlock();
75 }
76 
77 /*
78  * validate an RxRPC address
79  */
80 static int rxrpc_validate_address(struct rxrpc_sock *rx,
81 				  struct sockaddr_rxrpc *srx,
82 				  int len)
83 {
84 	unsigned int tail;
85 
86 	if (len < sizeof(struct sockaddr_rxrpc))
87 		return -EINVAL;
88 
89 	if (srx->srx_family != AF_RXRPC)
90 		return -EAFNOSUPPORT;
91 
92 	if (srx->transport_type != SOCK_DGRAM)
93 		return -ESOCKTNOSUPPORT;
94 
95 	len -= offsetof(struct sockaddr_rxrpc, transport);
96 	if (srx->transport_len < sizeof(sa_family_t) ||
97 	    srx->transport_len > len)
98 		return -EINVAL;
99 
100 	if (srx->transport.family != rx->proto)
101 		return -EAFNOSUPPORT;
102 
103 	switch (srx->transport.family) {
104 	case AF_INET:
105 		if (srx->transport_len < sizeof(struct sockaddr_in))
106 			return -EINVAL;
107 		_debug("INET: %x @ %pI4",
108 		       ntohs(srx->transport.sin.sin_port),
109 		       &srx->transport.sin.sin_addr);
110 		tail = offsetof(struct sockaddr_rxrpc, transport.sin.__pad);
111 		break;
112 
113 	case AF_INET6:
114 	default:
115 		return -EAFNOSUPPORT;
116 	}
117 
118 	if (tail < len)
119 		memset((void *)srx + tail, 0, len - tail);
120 	return 0;
121 }
122 
123 /*
124  * bind a local address to an RxRPC socket
125  */
126 static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
127 {
128 	struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *)saddr;
129 	struct sock *sk = sock->sk;
130 	struct rxrpc_local *local;
131 	struct rxrpc_sock *rx = rxrpc_sk(sk), *prx;
132 	int ret;
133 
134 	_enter("%p,%p,%d", rx, saddr, len);
135 
136 	ret = rxrpc_validate_address(rx, srx, len);
137 	if (ret < 0)
138 		goto error;
139 
140 	lock_sock(&rx->sk);
141 
142 	if (rx->sk.sk_state != RXRPC_UNBOUND) {
143 		ret = -EINVAL;
144 		goto error_unlock;
145 	}
146 
147 	memcpy(&rx->srx, srx, sizeof(rx->srx));
148 
149 	local = rxrpc_lookup_local(&rx->srx);
150 	if (IS_ERR(local)) {
151 		ret = PTR_ERR(local);
152 		goto error_unlock;
153 	}
154 
155 	if (rx->srx.srx_service) {
156 		write_lock_bh(&local->services_lock);
157 		list_for_each_entry(prx, &local->services, listen_link) {
158 			if (prx->srx.srx_service == rx->srx.srx_service)
159 				goto service_in_use;
160 		}
161 
162 		rx->local = local;
163 		list_add_tail(&rx->listen_link, &local->services);
164 		write_unlock_bh(&local->services_lock);
165 
166 		rx->sk.sk_state = RXRPC_SERVER_BOUND;
167 	} else {
168 		rx->local = local;
169 		rx->sk.sk_state = RXRPC_CLIENT_BOUND;
170 	}
171 
172 	release_sock(&rx->sk);
173 	_leave(" = 0");
174 	return 0;
175 
176 service_in_use:
177 	write_unlock_bh(&local->services_lock);
178 	rxrpc_put_local(local);
179 	ret = -EADDRINUSE;
180 error_unlock:
181 	release_sock(&rx->sk);
182 error:
183 	_leave(" = %d", ret);
184 	return ret;
185 }
186 
187 /*
188  * set the number of pending calls permitted on a listening socket
189  */
190 static int rxrpc_listen(struct socket *sock, int backlog)
191 {
192 	struct sock *sk = sock->sk;
193 	struct rxrpc_sock *rx = rxrpc_sk(sk);
194 	unsigned int max;
195 	int ret;
196 
197 	_enter("%p,%d", rx, backlog);
198 
199 	lock_sock(&rx->sk);
200 
201 	switch (rx->sk.sk_state) {
202 	case RXRPC_UNBOUND:
203 		ret = -EADDRNOTAVAIL;
204 		break;
205 	case RXRPC_SERVER_BOUND:
206 		ASSERT(rx->local != NULL);
207 		max = READ_ONCE(rxrpc_max_backlog);
208 		ret = -EINVAL;
209 		if (backlog == INT_MAX)
210 			backlog = max;
211 		else if (backlog < 0 || backlog > max)
212 			break;
213 		sk->sk_max_ack_backlog = backlog;
214 		rx->sk.sk_state = RXRPC_SERVER_LISTENING;
215 		ret = 0;
216 		break;
217 	default:
218 		ret = -EBUSY;
219 		break;
220 	}
221 
222 	release_sock(&rx->sk);
223 	_leave(" = %d", ret);
224 	return ret;
225 }
226 
227 /*
228  * find a transport by address
229  */
230 struct rxrpc_transport *rxrpc_name_to_transport(struct rxrpc_sock *rx,
231 						struct sockaddr *addr,
232 						int addr_len, int flags,
233 						gfp_t gfp)
234 {
235 	struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *) addr;
236 	struct rxrpc_transport *trans;
237 	struct rxrpc_peer *peer;
238 
239 	_enter("%p,%p,%d,%d", rx, addr, addr_len, flags);
240 
241 	ASSERT(rx->local != NULL);
242 
243 	if (rx->srx.transport_type != srx->transport_type)
244 		return ERR_PTR(-ESOCKTNOSUPPORT);
245 	if (rx->srx.transport.family != srx->transport.family)
246 		return ERR_PTR(-EAFNOSUPPORT);
247 
248 	/* find a remote transport endpoint from the local one */
249 	peer = rxrpc_lookup_peer(rx->local, srx, gfp);
250 	if (IS_ERR(peer))
251 		return ERR_CAST(peer);
252 
253 	/* find a transport */
254 	trans = rxrpc_get_transport(rx->local, peer, gfp);
255 	rxrpc_put_peer(peer);
256 	_leave(" = %p", trans);
257 	return trans;
258 }
259 
260 /**
261  * rxrpc_kernel_begin_call - Allow a kernel service to begin a call
262  * @sock: The socket on which to make the call
263  * @srx: The address of the peer to contact
264  * @key: The security context to use (defaults to socket setting)
265  * @user_call_ID: The ID to use
266  *
267  * Allow a kernel service to begin a call on the nominated socket.  This just
268  * sets up all the internal tracking structures and allocates connection and
269  * call IDs as appropriate.  The call to be used is returned.
270  *
271  * The default socket destination address and security may be overridden by
272  * supplying @srx and @key.
273  */
274 struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *sock,
275 					   struct sockaddr_rxrpc *srx,
276 					   struct key *key,
277 					   unsigned long user_call_ID,
278 					   gfp_t gfp)
279 {
280 	struct rxrpc_conn_bundle *bundle;
281 	struct rxrpc_transport *trans;
282 	struct rxrpc_call *call;
283 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
284 
285 	_enter(",,%x,%lx", key_serial(key), user_call_ID);
286 
287 	lock_sock(&rx->sk);
288 
289 	trans = rxrpc_name_to_transport(rx, (struct sockaddr *)srx,
290 					sizeof(*srx), 0, gfp);
291 	if (IS_ERR(trans)) {
292 		call = ERR_CAST(trans);
293 		trans = NULL;
294 		goto out_notrans;
295 	}
296 
297 	if (!key)
298 		key = rx->key;
299 	if (key && !key->payload.data[0])
300 		key = NULL; /* a no-security key */
301 
302 	bundle = rxrpc_get_bundle(rx, trans, key, srx->srx_service, gfp);
303 	if (IS_ERR(bundle)) {
304 		call = ERR_CAST(bundle);
305 		goto out;
306 	}
307 
308 	call = rxrpc_new_client_call(rx, trans, bundle, user_call_ID, gfp);
309 	rxrpc_put_bundle(trans, bundle);
310 out:
311 	rxrpc_put_transport(trans);
312 out_notrans:
313 	release_sock(&rx->sk);
314 	_leave(" = %p", call);
315 	return call;
316 }
317 EXPORT_SYMBOL(rxrpc_kernel_begin_call);
318 
319 /**
320  * rxrpc_kernel_end_call - Allow a kernel service to end a call it was using
321  * @call: The call to end
322  *
323  * Allow a kernel service to end a call it was using.  The call must be
324  * complete before this is called (the call should be aborted if necessary).
325  */
326 void rxrpc_kernel_end_call(struct rxrpc_call *call)
327 {
328 	_enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
329 	rxrpc_remove_user_ID(call->socket, call);
330 	rxrpc_put_call(call);
331 }
332 EXPORT_SYMBOL(rxrpc_kernel_end_call);
333 
334 /**
335  * rxrpc_kernel_intercept_rx_messages - Intercept received RxRPC messages
336  * @sock: The socket to intercept received messages on
337  * @interceptor: The function to pass the messages to
338  *
339  * Allow a kernel service to intercept messages heading for the Rx queue on an
340  * RxRPC socket.  They get passed to the specified function instead.
341  * @interceptor should free the socket buffers it is given.  @interceptor is
342  * called with the socket receive queue spinlock held and softirqs disabled -
343  * this ensures that the messages will be delivered in the right order.
344  */
345 void rxrpc_kernel_intercept_rx_messages(struct socket *sock,
346 					rxrpc_interceptor_t interceptor)
347 {
348 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
349 
350 	_enter("");
351 	rx->interceptor = interceptor;
352 }
353 
354 EXPORT_SYMBOL(rxrpc_kernel_intercept_rx_messages);
355 
356 /*
357  * connect an RxRPC socket
358  * - this just targets it at a specific destination; no actual connection
359  *   negotiation takes place
360  */
361 static int rxrpc_connect(struct socket *sock, struct sockaddr *addr,
362 			 int addr_len, int flags)
363 {
364 	struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *)addr;
365 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
366 	int ret;
367 
368 	_enter("%p,%p,%d,%d", rx, addr, addr_len, flags);
369 
370 	ret = rxrpc_validate_address(rx, srx, addr_len);
371 	if (ret < 0) {
372 		_leave(" = %d [bad addr]", ret);
373 		return ret;
374 	}
375 
376 	lock_sock(&rx->sk);
377 
378 	ret = -EISCONN;
379 	if (test_bit(RXRPC_SOCK_CONNECTED, &rx->flags))
380 		goto error;
381 
382 	switch (rx->sk.sk_state) {
383 	case RXRPC_UNBOUND:
384 		rx->sk.sk_state = RXRPC_CLIENT_UNBOUND;
385 	case RXRPC_CLIENT_UNBOUND:
386 	case RXRPC_CLIENT_BOUND:
387 		break;
388 	default:
389 		ret = -EBUSY;
390 		goto error;
391 	}
392 
393 	rx->connect_srx = *srx;
394 	set_bit(RXRPC_SOCK_CONNECTED, &rx->flags);
395 	ret = 0;
396 
397 error:
398 	release_sock(&rx->sk);
399 	return ret;
400 }
401 
402 /*
403  * send a message through an RxRPC socket
404  * - in a client this does a number of things:
405  *   - finds/sets up a connection for the security specified (if any)
406  *   - initiates a call (ID in control data)
407  *   - ends the request phase of a call (if MSG_MORE is not set)
408  *   - sends a call data packet
409  *   - may send an abort (abort code in control data)
410  */
411 static int rxrpc_sendmsg(struct socket *sock, struct msghdr *m, size_t len)
412 {
413 	struct rxrpc_local *local;
414 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
415 	int ret;
416 
417 	_enter(",{%d},,%zu", rx->sk.sk_state, len);
418 
419 	if (m->msg_flags & MSG_OOB)
420 		return -EOPNOTSUPP;
421 
422 	if (m->msg_name) {
423 		ret = rxrpc_validate_address(rx, m->msg_name, m->msg_namelen);
424 		if (ret < 0) {
425 			_leave(" = %d [bad addr]", ret);
426 			return ret;
427 		}
428 	}
429 
430 	lock_sock(&rx->sk);
431 
432 	switch (rx->sk.sk_state) {
433 	case RXRPC_UNBOUND:
434 		local = rxrpc_lookup_local(&rx->srx);
435 		if (IS_ERR(local)) {
436 			ret = PTR_ERR(local);
437 			goto error_unlock;
438 		}
439 
440 		rx->local = local;
441 		rx->sk.sk_state = RXRPC_CLIENT_UNBOUND;
442 		/* Fall through */
443 
444 	case RXRPC_CLIENT_UNBOUND:
445 	case RXRPC_CLIENT_BOUND:
446 		if (!m->msg_name &&
447 		    test_bit(RXRPC_SOCK_CONNECTED, &rx->flags)) {
448 			m->msg_name = &rx->connect_srx;
449 			m->msg_namelen = sizeof(rx->connect_srx);
450 		}
451 	case RXRPC_SERVER_BOUND:
452 	case RXRPC_SERVER_LISTENING:
453 		ret = rxrpc_do_sendmsg(rx, m, len);
454 		break;
455 	default:
456 		ret = -EINVAL;
457 		break;
458 	}
459 
460 error_unlock:
461 	release_sock(&rx->sk);
462 	_leave(" = %d", ret);
463 	return ret;
464 }
465 
466 /*
467  * set RxRPC socket options
468  */
469 static int rxrpc_setsockopt(struct socket *sock, int level, int optname,
470 			    char __user *optval, unsigned int optlen)
471 {
472 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
473 	unsigned int min_sec_level;
474 	int ret;
475 
476 	_enter(",%d,%d,,%d", level, optname, optlen);
477 
478 	lock_sock(&rx->sk);
479 	ret = -EOPNOTSUPP;
480 
481 	if (level == SOL_RXRPC) {
482 		switch (optname) {
483 		case RXRPC_EXCLUSIVE_CONNECTION:
484 			ret = -EINVAL;
485 			if (optlen != 0)
486 				goto error;
487 			ret = -EISCONN;
488 			if (rx->sk.sk_state != RXRPC_UNBOUND)
489 				goto error;
490 			set_bit(RXRPC_SOCK_EXCLUSIVE_CONN, &rx->flags);
491 			goto success;
492 
493 		case RXRPC_SECURITY_KEY:
494 			ret = -EINVAL;
495 			if (rx->key)
496 				goto error;
497 			ret = -EISCONN;
498 			if (rx->sk.sk_state != RXRPC_UNBOUND)
499 				goto error;
500 			ret = rxrpc_request_key(rx, optval, optlen);
501 			goto error;
502 
503 		case RXRPC_SECURITY_KEYRING:
504 			ret = -EINVAL;
505 			if (rx->key)
506 				goto error;
507 			ret = -EISCONN;
508 			if (rx->sk.sk_state != RXRPC_UNBOUND)
509 				goto error;
510 			ret = rxrpc_server_keyring(rx, optval, optlen);
511 			goto error;
512 
513 		case RXRPC_MIN_SECURITY_LEVEL:
514 			ret = -EINVAL;
515 			if (optlen != sizeof(unsigned int))
516 				goto error;
517 			ret = -EISCONN;
518 			if (rx->sk.sk_state != RXRPC_UNBOUND)
519 				goto error;
520 			ret = get_user(min_sec_level,
521 				       (unsigned int __user *) optval);
522 			if (ret < 0)
523 				goto error;
524 			ret = -EINVAL;
525 			if (min_sec_level > RXRPC_SECURITY_MAX)
526 				goto error;
527 			rx->min_sec_level = min_sec_level;
528 			goto success;
529 
530 		default:
531 			break;
532 		}
533 	}
534 
535 success:
536 	ret = 0;
537 error:
538 	release_sock(&rx->sk);
539 	return ret;
540 }
541 
542 /*
543  * permit an RxRPC socket to be polled
544  */
545 static unsigned int rxrpc_poll(struct file *file, struct socket *sock,
546 			       poll_table *wait)
547 {
548 	unsigned int mask;
549 	struct sock *sk = sock->sk;
550 
551 	sock_poll_wait(file, sk_sleep(sk), wait);
552 	mask = 0;
553 
554 	/* the socket is readable if there are any messages waiting on the Rx
555 	 * queue */
556 	if (!skb_queue_empty(&sk->sk_receive_queue))
557 		mask |= POLLIN | POLLRDNORM;
558 
559 	/* the socket is writable if there is space to add new data to the
560 	 * socket; there is no guarantee that any particular call in progress
561 	 * on the socket may have space in the Tx ACK window */
562 	if (rxrpc_writable(sk))
563 		mask |= POLLOUT | POLLWRNORM;
564 
565 	return mask;
566 }
567 
568 /*
569  * create an RxRPC socket
570  */
571 static int rxrpc_create(struct net *net, struct socket *sock, int protocol,
572 			int kern)
573 {
574 	struct rxrpc_sock *rx;
575 	struct sock *sk;
576 
577 	_enter("%p,%d", sock, protocol);
578 
579 	if (!net_eq(net, &init_net))
580 		return -EAFNOSUPPORT;
581 
582 	/* we support transport protocol UDP/UDP6 only */
583 	if (protocol != PF_INET)
584 		return -EPROTONOSUPPORT;
585 
586 	if (sock->type != SOCK_DGRAM)
587 		return -ESOCKTNOSUPPORT;
588 
589 	sock->ops = &rxrpc_rpc_ops;
590 	sock->state = SS_UNCONNECTED;
591 
592 	sk = sk_alloc(net, PF_RXRPC, GFP_KERNEL, &rxrpc_proto, kern);
593 	if (!sk)
594 		return -ENOMEM;
595 
596 	sock_init_data(sock, sk);
597 	sk->sk_state		= RXRPC_UNBOUND;
598 	sk->sk_write_space	= rxrpc_write_space;
599 	sk->sk_max_ack_backlog	= 0;
600 	sk->sk_destruct		= rxrpc_sock_destructor;
601 
602 	rx = rxrpc_sk(sk);
603 	rx->proto = protocol;
604 	rx->calls = RB_ROOT;
605 
606 	INIT_LIST_HEAD(&rx->listen_link);
607 	INIT_LIST_HEAD(&rx->secureq);
608 	INIT_LIST_HEAD(&rx->acceptq);
609 	rwlock_init(&rx->call_lock);
610 	memset(&rx->srx, 0, sizeof(rx->srx));
611 
612 	_leave(" = 0 [%p]", rx);
613 	return 0;
614 }
615 
616 /*
617  * RxRPC socket destructor
618  */
619 static void rxrpc_sock_destructor(struct sock *sk)
620 {
621 	_enter("%p", sk);
622 
623 	rxrpc_purge_queue(&sk->sk_receive_queue);
624 
625 	WARN_ON(atomic_read(&sk->sk_wmem_alloc));
626 	WARN_ON(!sk_unhashed(sk));
627 	WARN_ON(sk->sk_socket);
628 
629 	if (!sock_flag(sk, SOCK_DEAD)) {
630 		printk("Attempt to release alive rxrpc socket: %p\n", sk);
631 		return;
632 	}
633 }
634 
635 /*
636  * release an RxRPC socket
637  */
638 static int rxrpc_release_sock(struct sock *sk)
639 {
640 	struct rxrpc_sock *rx = rxrpc_sk(sk);
641 
642 	_enter("%p{%d,%d}", sk, sk->sk_state, atomic_read(&sk->sk_refcnt));
643 
644 	/* declare the socket closed for business */
645 	sock_orphan(sk);
646 	sk->sk_shutdown = SHUTDOWN_MASK;
647 
648 	spin_lock_bh(&sk->sk_receive_queue.lock);
649 	sk->sk_state = RXRPC_CLOSE;
650 	spin_unlock_bh(&sk->sk_receive_queue.lock);
651 
652 	ASSERTCMP(rx->listen_link.next, !=, LIST_POISON1);
653 
654 	if (!list_empty(&rx->listen_link)) {
655 		write_lock_bh(&rx->local->services_lock);
656 		list_del(&rx->listen_link);
657 		write_unlock_bh(&rx->local->services_lock);
658 	}
659 
660 	/* try to flush out this socket */
661 	rxrpc_release_calls_on_socket(rx);
662 	flush_workqueue(rxrpc_workqueue);
663 	rxrpc_purge_queue(&sk->sk_receive_queue);
664 
665 	if (rx->conn) {
666 		rxrpc_put_connection(rx->conn);
667 		rx->conn = NULL;
668 	}
669 
670 	if (rx->local) {
671 		rxrpc_put_local(rx->local);
672 		rx->local = NULL;
673 	}
674 
675 	key_put(rx->key);
676 	rx->key = NULL;
677 	key_put(rx->securities);
678 	rx->securities = NULL;
679 	sock_put(sk);
680 
681 	_leave(" = 0");
682 	return 0;
683 }
684 
685 /*
686  * release an RxRPC BSD socket on close() or equivalent
687  */
688 static int rxrpc_release(struct socket *sock)
689 {
690 	struct sock *sk = sock->sk;
691 
692 	_enter("%p{%p}", sock, sk);
693 
694 	if (!sk)
695 		return 0;
696 
697 	sock->sk = NULL;
698 
699 	return rxrpc_release_sock(sk);
700 }
701 
702 /*
703  * RxRPC network protocol
704  */
705 static const struct proto_ops rxrpc_rpc_ops = {
706 	.family		= PF_RXRPC,
707 	.owner		= THIS_MODULE,
708 	.release	= rxrpc_release,
709 	.bind		= rxrpc_bind,
710 	.connect	= rxrpc_connect,
711 	.socketpair	= sock_no_socketpair,
712 	.accept		= sock_no_accept,
713 	.getname	= sock_no_getname,
714 	.poll		= rxrpc_poll,
715 	.ioctl		= sock_no_ioctl,
716 	.listen		= rxrpc_listen,
717 	.shutdown	= sock_no_shutdown,
718 	.setsockopt	= rxrpc_setsockopt,
719 	.getsockopt	= sock_no_getsockopt,
720 	.sendmsg	= rxrpc_sendmsg,
721 	.recvmsg	= rxrpc_recvmsg,
722 	.mmap		= sock_no_mmap,
723 	.sendpage	= sock_no_sendpage,
724 };
725 
726 static struct proto rxrpc_proto = {
727 	.name		= "RXRPC",
728 	.owner		= THIS_MODULE,
729 	.obj_size	= sizeof(struct rxrpc_sock),
730 	.max_header	= sizeof(struct rxrpc_wire_header),
731 };
732 
733 static const struct net_proto_family rxrpc_family_ops = {
734 	.family	= PF_RXRPC,
735 	.create = rxrpc_create,
736 	.owner	= THIS_MODULE,
737 };
738 
739 /*
740  * initialise and register the RxRPC protocol
741  */
742 static int __init af_rxrpc_init(void)
743 {
744 	int ret = -1;
745 
746 	BUILD_BUG_ON(sizeof(struct rxrpc_skb_priv) > FIELD_SIZEOF(struct sk_buff, cb));
747 
748 	rxrpc_epoch = get_seconds();
749 
750 	ret = -ENOMEM;
751 	rxrpc_call_jar = kmem_cache_create(
752 		"rxrpc_call_jar", sizeof(struct rxrpc_call), 0,
753 		SLAB_HWCACHE_ALIGN, NULL);
754 	if (!rxrpc_call_jar) {
755 		pr_notice("Failed to allocate call jar\n");
756 		goto error_call_jar;
757 	}
758 
759 	rxrpc_workqueue = alloc_workqueue("krxrpcd", 0, 1);
760 	if (!rxrpc_workqueue) {
761 		pr_notice("Failed to allocate work queue\n");
762 		goto error_work_queue;
763 	}
764 
765 	ret = rxrpc_init_security();
766 	if (ret < 0) {
767 		pr_crit("Cannot initialise security\n");
768 		goto error_security;
769 	}
770 
771 	ret = proto_register(&rxrpc_proto, 1);
772 	if (ret < 0) {
773 		pr_crit("Cannot register protocol\n");
774 		goto error_proto;
775 	}
776 
777 	ret = sock_register(&rxrpc_family_ops);
778 	if (ret < 0) {
779 		pr_crit("Cannot register socket family\n");
780 		goto error_sock;
781 	}
782 
783 	ret = register_key_type(&key_type_rxrpc);
784 	if (ret < 0) {
785 		pr_crit("Cannot register client key type\n");
786 		goto error_key_type;
787 	}
788 
789 	ret = register_key_type(&key_type_rxrpc_s);
790 	if (ret < 0) {
791 		pr_crit("Cannot register server key type\n");
792 		goto error_key_type_s;
793 	}
794 
795 	ret = rxrpc_sysctl_init();
796 	if (ret < 0) {
797 		pr_crit("Cannot register sysctls\n");
798 		goto error_sysctls;
799 	}
800 
801 #ifdef CONFIG_PROC_FS
802 	proc_create("rxrpc_calls", 0, init_net.proc_net, &rxrpc_call_seq_fops);
803 	proc_create("rxrpc_conns", 0, init_net.proc_net,
804 		    &rxrpc_connection_seq_fops);
805 #endif
806 	return 0;
807 
808 error_sysctls:
809 	unregister_key_type(&key_type_rxrpc_s);
810 error_key_type_s:
811 	unregister_key_type(&key_type_rxrpc);
812 error_key_type:
813 	sock_unregister(PF_RXRPC);
814 error_sock:
815 	proto_unregister(&rxrpc_proto);
816 error_proto:
817 	destroy_workqueue(rxrpc_workqueue);
818 error_security:
819 	rxrpc_exit_security();
820 error_work_queue:
821 	kmem_cache_destroy(rxrpc_call_jar);
822 error_call_jar:
823 	return ret;
824 }
825 
826 /*
827  * unregister the RxRPC protocol
828  */
829 static void __exit af_rxrpc_exit(void)
830 {
831 	_enter("");
832 	rxrpc_sysctl_exit();
833 	unregister_key_type(&key_type_rxrpc_s);
834 	unregister_key_type(&key_type_rxrpc);
835 	sock_unregister(PF_RXRPC);
836 	proto_unregister(&rxrpc_proto);
837 	rxrpc_destroy_all_calls();
838 	rxrpc_destroy_all_connections();
839 	rxrpc_destroy_all_transports();
840 
841 	ASSERTCMP(atomic_read(&rxrpc_n_skbs), ==, 0);
842 
843 	/* We need to flush the scheduled work twice because the local endpoint
844 	 * records involve a work item in their destruction as they can only be
845 	 * destroyed from process context.  However, a connection may have a
846 	 * work item outstanding - and this will pin the local endpoint record
847 	 * until the connection goes away.
848 	 *
849 	 * Peers don't pin locals and calls pin sockets - which prevents the
850 	 * module from being unloaded - so we should only need two flushes.
851 	 */
852 	_debug("flush scheduled work");
853 	flush_workqueue(rxrpc_workqueue);
854 	_debug("flush scheduled work 2");
855 	flush_workqueue(rxrpc_workqueue);
856 	_debug("synchronise RCU");
857 	rcu_barrier();
858 	_debug("destroy locals");
859 	rxrpc_destroy_all_locals();
860 
861 	remove_proc_entry("rxrpc_conns", init_net.proc_net);
862 	remove_proc_entry("rxrpc_calls", init_net.proc_net);
863 	destroy_workqueue(rxrpc_workqueue);
864 	rxrpc_exit_security();
865 	kmem_cache_destroy(rxrpc_call_jar);
866 	_leave("");
867 }
868 
869 module_init(af_rxrpc_init);
870 module_exit(af_rxrpc_exit);
871