xref: /linux/net/rxrpc/call_object.c (revision 0d240e7811c4ec1965760ee4643b5bbc9cfacbb3)
1 /* RxRPC individual remote procedure call handling
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11 
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
13 
14 #include <linux/slab.h>
15 #include <linux/module.h>
16 #include <linux/circ_buf.h>
17 #include <linux/hashtable.h>
18 #include <linux/spinlock_types.h>
19 #include <net/sock.h>
20 #include <net/af_rxrpc.h>
21 #include "ar-internal.h"
22 
23 /*
24  * Maximum lifetime of a call (in jiffies).
25  */
26 unsigned int rxrpc_max_call_lifetime = 60 * HZ;
27 
28 /*
29  * Time till dead call expires after last use (in jiffies).
30  */
31 unsigned int rxrpc_dead_call_expiry = 2 * HZ;
32 
33 const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = {
34 	[RXRPC_CALL_CLIENT_SEND_REQUEST]	= "ClSndReq",
35 	[RXRPC_CALL_CLIENT_AWAIT_REPLY]		= "ClAwtRpl",
36 	[RXRPC_CALL_CLIENT_RECV_REPLY]		= "ClRcvRpl",
37 	[RXRPC_CALL_CLIENT_FINAL_ACK]		= "ClFnlACK",
38 	[RXRPC_CALL_SERVER_SECURING]		= "SvSecure",
39 	[RXRPC_CALL_SERVER_ACCEPTING]		= "SvAccept",
40 	[RXRPC_CALL_SERVER_RECV_REQUEST]	= "SvRcvReq",
41 	[RXRPC_CALL_SERVER_ACK_REQUEST]		= "SvAckReq",
42 	[RXRPC_CALL_SERVER_SEND_REPLY]		= "SvSndRpl",
43 	[RXRPC_CALL_SERVER_AWAIT_ACK]		= "SvAwtACK",
44 	[RXRPC_CALL_COMPLETE]			= "Complete",
45 	[RXRPC_CALL_SERVER_BUSY]		= "SvBusy  ",
46 	[RXRPC_CALL_REMOTELY_ABORTED]		= "RmtAbort",
47 	[RXRPC_CALL_LOCALLY_ABORTED]		= "LocAbort",
48 	[RXRPC_CALL_NETWORK_ERROR]		= "NetError",
49 	[RXRPC_CALL_DEAD]			= "Dead    ",
50 };
51 
52 struct kmem_cache *rxrpc_call_jar;
53 LIST_HEAD(rxrpc_calls);
54 DEFINE_RWLOCK(rxrpc_call_lock);
55 
56 static void rxrpc_destroy_call(struct work_struct *work);
57 static void rxrpc_call_life_expired(unsigned long _call);
58 static void rxrpc_dead_call_expired(unsigned long _call);
59 static void rxrpc_ack_time_expired(unsigned long _call);
60 static void rxrpc_resend_time_expired(unsigned long _call);
61 
62 static DEFINE_SPINLOCK(rxrpc_call_hash_lock);
63 static DEFINE_HASHTABLE(rxrpc_call_hash, 10);
64 
65 /*
66  * Hash function for rxrpc_call_hash
67  */
68 static unsigned long rxrpc_call_hashfunc(
69 	u8		in_clientflag,
70 	u32		cid,
71 	u32		call_id,
72 	u32		epoch,
73 	u16		service_id,
74 	sa_family_t	proto,
75 	void		*localptr,
76 	unsigned int	addr_size,
77 	const u8	*peer_addr)
78 {
79 	const u16 *p;
80 	unsigned int i;
81 	unsigned long key;
82 
83 	_enter("");
84 
85 	key = (unsigned long)localptr;
86 	/* We just want to add up the __be32 values, so forcing the
87 	 * cast should be okay.
88 	 */
89 	key += epoch;
90 	key += service_id;
91 	key += call_id;
92 	key += (cid & RXRPC_CIDMASK) >> RXRPC_CIDSHIFT;
93 	key += cid & RXRPC_CHANNELMASK;
94 	key += in_clientflag;
95 	key += proto;
96 	/* Step through the peer address in 16-bit portions for speed */
97 	for (i = 0, p = (const u16 *)peer_addr; i < addr_size >> 1; i++, p++)
98 		key += *p;
99 	_leave(" key = 0x%lx", key);
100 	return key;
101 }
102 
103 /*
104  * Add a call to the hashtable
105  */
106 static void rxrpc_call_hash_add(struct rxrpc_call *call)
107 {
108 	unsigned long key;
109 	unsigned int addr_size = 0;
110 
111 	_enter("");
112 	switch (call->proto) {
113 	case AF_INET:
114 		addr_size = sizeof(call->peer_ip.ipv4_addr);
115 		break;
116 	case AF_INET6:
117 		addr_size = sizeof(call->peer_ip.ipv6_addr);
118 		break;
119 	default:
120 		break;
121 	}
122 	key = rxrpc_call_hashfunc(call->in_clientflag, call->cid,
123 				  call->call_id, call->epoch,
124 				  call->service_id, call->proto,
125 				  call->conn->trans->local, addr_size,
126 				  call->peer_ip.ipv6_addr);
127 	/* Store the full key in the call */
128 	call->hash_key = key;
129 	spin_lock(&rxrpc_call_hash_lock);
130 	hash_add_rcu(rxrpc_call_hash, &call->hash_node, key);
131 	spin_unlock(&rxrpc_call_hash_lock);
132 	_leave("");
133 }
134 
135 /*
136  * Remove a call from the hashtable
137  */
138 static void rxrpc_call_hash_del(struct rxrpc_call *call)
139 {
140 	_enter("");
141 	spin_lock(&rxrpc_call_hash_lock);
142 	hash_del_rcu(&call->hash_node);
143 	spin_unlock(&rxrpc_call_hash_lock);
144 	_leave("");
145 }
146 
147 /*
148  * Find a call in the hashtable and return it, or NULL if it
149  * isn't there.
150  */
151 struct rxrpc_call *rxrpc_find_call_hash(
152 	struct rxrpc_host_header *hdr,
153 	void		*localptr,
154 	sa_family_t	proto,
155 	const void	*peer_addr)
156 {
157 	unsigned long key;
158 	unsigned int addr_size = 0;
159 	struct rxrpc_call *call = NULL;
160 	struct rxrpc_call *ret = NULL;
161 	u8 in_clientflag = hdr->flags & RXRPC_CLIENT_INITIATED;
162 
163 	_enter("");
164 	switch (proto) {
165 	case AF_INET:
166 		addr_size = sizeof(call->peer_ip.ipv4_addr);
167 		break;
168 	case AF_INET6:
169 		addr_size = sizeof(call->peer_ip.ipv6_addr);
170 		break;
171 	default:
172 		break;
173 	}
174 
175 	key = rxrpc_call_hashfunc(in_clientflag, hdr->cid, hdr->callNumber,
176 				  hdr->epoch, hdr->serviceId,
177 				  proto, localptr, addr_size,
178 				  peer_addr);
179 	hash_for_each_possible_rcu(rxrpc_call_hash, call, hash_node, key) {
180 		if (call->hash_key == key &&
181 		    call->call_id == hdr->callNumber &&
182 		    call->cid == hdr->cid &&
183 		    call->in_clientflag == in_clientflag &&
184 		    call->service_id == hdr->serviceId &&
185 		    call->proto == proto &&
186 		    call->local == localptr &&
187 		    memcmp(call->peer_ip.ipv6_addr, peer_addr,
188 			   addr_size) == 0 &&
189 		    call->epoch == hdr->epoch) {
190 			ret = call;
191 			break;
192 		}
193 	}
194 	_leave(" = %p", ret);
195 	return ret;
196 }
197 
198 /*
199  * find an extant server call
200  * - called in process context with IRQs enabled
201  */
202 struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx,
203 					      unsigned long user_call_ID)
204 {
205 	struct rxrpc_call *call;
206 	struct rb_node *p;
207 
208 	_enter("%p,%lx", rx, user_call_ID);
209 
210 	read_lock(&rx->call_lock);
211 
212 	p = rx->calls.rb_node;
213 	while (p) {
214 		call = rb_entry(p, struct rxrpc_call, sock_node);
215 
216 		if (user_call_ID < call->user_call_ID)
217 			p = p->rb_left;
218 		else if (user_call_ID > call->user_call_ID)
219 			p = p->rb_right;
220 		else
221 			goto found_extant_call;
222 	}
223 
224 	read_unlock(&rx->call_lock);
225 	_leave(" = NULL");
226 	return NULL;
227 
228 found_extant_call:
229 	rxrpc_get_call(call);
230 	read_unlock(&rx->call_lock);
231 	_leave(" = %p [%d]", call, atomic_read(&call->usage));
232 	return call;
233 }
234 
235 /*
236  * allocate a new call
237  */
238 static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
239 {
240 	struct rxrpc_call *call;
241 
242 	call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
243 	if (!call)
244 		return NULL;
245 
246 	call->acks_winsz = 16;
247 	call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
248 				    gfp);
249 	if (!call->acks_window) {
250 		kmem_cache_free(rxrpc_call_jar, call);
251 		return NULL;
252 	}
253 
254 	setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
255 		    (unsigned long) call);
256 	setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
257 		    (unsigned long) call);
258 	setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
259 		    (unsigned long) call);
260 	setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
261 		    (unsigned long) call);
262 	INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
263 	INIT_WORK(&call->processor, &rxrpc_process_call);
264 	INIT_LIST_HEAD(&call->accept_link);
265 	skb_queue_head_init(&call->rx_queue);
266 	skb_queue_head_init(&call->rx_oos_queue);
267 	init_waitqueue_head(&call->tx_waitq);
268 	spin_lock_init(&call->lock);
269 	rwlock_init(&call->state_lock);
270 	atomic_set(&call->usage, 1);
271 	call->debug_id = atomic_inc_return(&rxrpc_debug_id);
272 	call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
273 
274 	memset(&call->sock_node, 0xed, sizeof(call->sock_node));
275 
276 	call->rx_data_expect = 1;
277 	call->rx_data_eaten = 0;
278 	call->rx_first_oos = 0;
279 	call->ackr_win_top = call->rx_data_eaten + 1 + rxrpc_rx_window_size;
280 	call->creation_jif = jiffies;
281 	return call;
282 }
283 
284 /*
285  * allocate a new client call and attempt to get a connection slot for it
286  */
287 static struct rxrpc_call *rxrpc_alloc_client_call(
288 	struct rxrpc_sock *rx,
289 	struct rxrpc_transport *trans,
290 	struct rxrpc_conn_bundle *bundle,
291 	gfp_t gfp)
292 {
293 	struct rxrpc_call *call;
294 	int ret;
295 
296 	_enter("");
297 
298 	ASSERT(rx != NULL);
299 	ASSERT(trans != NULL);
300 	ASSERT(bundle != NULL);
301 
302 	call = rxrpc_alloc_call(gfp);
303 	if (!call)
304 		return ERR_PTR(-ENOMEM);
305 
306 	sock_hold(&rx->sk);
307 	call->socket = rx;
308 	call->rx_data_post = 1;
309 
310 	ret = rxrpc_connect_call(rx, trans, bundle, call, gfp);
311 	if (ret < 0) {
312 		kmem_cache_free(rxrpc_call_jar, call);
313 		return ERR_PTR(ret);
314 	}
315 
316 	/* Record copies of information for hashtable lookup */
317 	call->proto = rx->proto;
318 	call->local = trans->local;
319 	switch (call->proto) {
320 	case AF_INET:
321 		call->peer_ip.ipv4_addr =
322 			trans->peer->srx.transport.sin.sin_addr.s_addr;
323 		break;
324 	case AF_INET6:
325 		memcpy(call->peer_ip.ipv6_addr,
326 		       trans->peer->srx.transport.sin6.sin6_addr.in6_u.u6_addr8,
327 		       sizeof(call->peer_ip.ipv6_addr));
328 		break;
329 	}
330 	call->epoch = call->conn->epoch;
331 	call->service_id = call->conn->service_id;
332 	call->in_clientflag = call->conn->in_clientflag;
333 	/* Add the new call to the hashtable */
334 	rxrpc_call_hash_add(call);
335 
336 	spin_lock(&call->conn->trans->peer->lock);
337 	hlist_add_head(&call->error_link, &call->conn->trans->peer->error_targets);
338 	spin_unlock(&call->conn->trans->peer->lock);
339 
340 	call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
341 	add_timer(&call->lifetimer);
342 
343 	_leave(" = %p", call);
344 	return call;
345 }
346 
347 /*
348  * set up a call for the given data
349  * - called in process context with IRQs enabled
350  */
351 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
352 					 struct rxrpc_transport *trans,
353 					 struct rxrpc_conn_bundle *bundle,
354 					 unsigned long user_call_ID,
355 					 gfp_t gfp)
356 {
357 	struct rxrpc_call *call, *xcall;
358 	struct rb_node *parent, **pp;
359 
360 	_enter("%p,%d,%d,%lx",
361 	       rx, trans->debug_id, bundle ? bundle->debug_id : -1,
362 	       user_call_ID);
363 
364 	call = rxrpc_alloc_client_call(rx, trans, bundle, gfp);
365 	if (IS_ERR(call)) {
366 		_leave(" = %ld", PTR_ERR(call));
367 		return call;
368 	}
369 
370 	call->user_call_ID = user_call_ID;
371 	__set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
372 
373 	write_lock(&rx->call_lock);
374 
375 	pp = &rx->calls.rb_node;
376 	parent = NULL;
377 	while (*pp) {
378 		parent = *pp;
379 		xcall = rb_entry(parent, struct rxrpc_call, sock_node);
380 
381 		if (user_call_ID < xcall->user_call_ID)
382 			pp = &(*pp)->rb_left;
383 		else if (user_call_ID > xcall->user_call_ID)
384 			pp = &(*pp)->rb_right;
385 		else
386 			goto found_user_ID_now_present;
387 	}
388 
389 	rxrpc_get_call(call);
390 
391 	rb_link_node(&call->sock_node, parent, pp);
392 	rb_insert_color(&call->sock_node, &rx->calls);
393 	write_unlock(&rx->call_lock);
394 
395 	write_lock_bh(&rxrpc_call_lock);
396 	list_add_tail(&call->link, &rxrpc_calls);
397 	write_unlock_bh(&rxrpc_call_lock);
398 
399 	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
400 
401 	_leave(" = %p [new]", call);
402 	return call;
403 
404 	/* We unexpectedly found the user ID in the list after taking
405 	 * the call_lock.  This shouldn't happen unless the user races
406 	 * with itself and tries to add the same user ID twice at the
407 	 * same time in different threads.
408 	 */
409 found_user_ID_now_present:
410 	write_unlock(&rx->call_lock);
411 	rxrpc_put_call(call);
412 	_leave(" = -EEXIST [%p]", call);
413 	return ERR_PTR(-EEXIST);
414 }
415 
416 /*
417  * set up an incoming call
418  * - called in process context with IRQs enabled
419  */
420 struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
421 				       struct rxrpc_connection *conn,
422 				       struct rxrpc_host_header *hdr)
423 {
424 	struct rxrpc_call *call, *candidate;
425 	struct rb_node **p, *parent;
426 	u32 call_id;
427 
428 	_enter(",%d", conn->debug_id);
429 
430 	ASSERT(rx != NULL);
431 
432 	candidate = rxrpc_alloc_call(GFP_NOIO);
433 	if (!candidate)
434 		return ERR_PTR(-EBUSY);
435 
436 	candidate->socket = rx;
437 	candidate->conn = conn;
438 	candidate->cid = hdr->cid;
439 	candidate->call_id = hdr->callNumber;
440 	candidate->channel = hdr->cid & RXRPC_CHANNELMASK;
441 	candidate->rx_data_post = 0;
442 	candidate->state = RXRPC_CALL_SERVER_ACCEPTING;
443 	if (conn->security_ix > 0)
444 		candidate->state = RXRPC_CALL_SERVER_SECURING;
445 
446 	write_lock_bh(&conn->lock);
447 
448 	/* set the channel for this call */
449 	call = conn->channels[candidate->channel];
450 	_debug("channel[%u] is %p", candidate->channel, call);
451 	if (call && call->call_id == hdr->callNumber) {
452 		/* already set; must've been a duplicate packet */
453 		_debug("extant call [%d]", call->state);
454 		ASSERTCMP(call->conn, ==, conn);
455 
456 		read_lock(&call->state_lock);
457 		switch (call->state) {
458 		case RXRPC_CALL_LOCALLY_ABORTED:
459 			if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
460 				rxrpc_queue_call(call);
461 		case RXRPC_CALL_REMOTELY_ABORTED:
462 			read_unlock(&call->state_lock);
463 			goto aborted_call;
464 		default:
465 			rxrpc_get_call(call);
466 			read_unlock(&call->state_lock);
467 			goto extant_call;
468 		}
469 	}
470 
471 	if (call) {
472 		/* it seems the channel is still in use from the previous call
473 		 * - ditch the old binding if its call is now complete */
474 		_debug("CALL: %u { %s }",
475 		       call->debug_id, rxrpc_call_states[call->state]);
476 
477 		if (call->state >= RXRPC_CALL_COMPLETE) {
478 			conn->channels[call->channel] = NULL;
479 		} else {
480 			write_unlock_bh(&conn->lock);
481 			kmem_cache_free(rxrpc_call_jar, candidate);
482 			_leave(" = -EBUSY");
483 			return ERR_PTR(-EBUSY);
484 		}
485 	}
486 
487 	/* check the call number isn't duplicate */
488 	_debug("check dup");
489 	call_id = hdr->callNumber;
490 	p = &conn->calls.rb_node;
491 	parent = NULL;
492 	while (*p) {
493 		parent = *p;
494 		call = rb_entry(parent, struct rxrpc_call, conn_node);
495 
496 		/* The tree is sorted in order of the __be32 value without
497 		 * turning it into host order.
498 		 */
499 		if (call_id < call->call_id)
500 			p = &(*p)->rb_left;
501 		else if (call_id > call->call_id)
502 			p = &(*p)->rb_right;
503 		else
504 			goto old_call;
505 	}
506 
507 	/* make the call available */
508 	_debug("new call");
509 	call = candidate;
510 	candidate = NULL;
511 	rb_link_node(&call->conn_node, parent, p);
512 	rb_insert_color(&call->conn_node, &conn->calls);
513 	conn->channels[call->channel] = call;
514 	sock_hold(&rx->sk);
515 	atomic_inc(&conn->usage);
516 	write_unlock_bh(&conn->lock);
517 
518 	spin_lock(&conn->trans->peer->lock);
519 	hlist_add_head(&call->error_link, &conn->trans->peer->error_targets);
520 	spin_unlock(&conn->trans->peer->lock);
521 
522 	write_lock_bh(&rxrpc_call_lock);
523 	list_add_tail(&call->link, &rxrpc_calls);
524 	write_unlock_bh(&rxrpc_call_lock);
525 
526 	/* Record copies of information for hashtable lookup */
527 	call->proto = rx->proto;
528 	call->local = conn->trans->local;
529 	switch (call->proto) {
530 	case AF_INET:
531 		call->peer_ip.ipv4_addr =
532 			conn->trans->peer->srx.transport.sin.sin_addr.s_addr;
533 		break;
534 	case AF_INET6:
535 		memcpy(call->peer_ip.ipv6_addr,
536 		       conn->trans->peer->srx.transport.sin6.sin6_addr.in6_u.u6_addr8,
537 		       sizeof(call->peer_ip.ipv6_addr));
538 		break;
539 	default:
540 		break;
541 	}
542 	call->epoch = conn->epoch;
543 	call->service_id = conn->service_id;
544 	call->in_clientflag = conn->in_clientflag;
545 	/* Add the new call to the hashtable */
546 	rxrpc_call_hash_add(call);
547 
548 	_net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
549 
550 	call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime;
551 	add_timer(&call->lifetimer);
552 	_leave(" = %p {%d} [new]", call, call->debug_id);
553 	return call;
554 
555 extant_call:
556 	write_unlock_bh(&conn->lock);
557 	kmem_cache_free(rxrpc_call_jar, candidate);
558 	_leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
559 	return call;
560 
561 aborted_call:
562 	write_unlock_bh(&conn->lock);
563 	kmem_cache_free(rxrpc_call_jar, candidate);
564 	_leave(" = -ECONNABORTED");
565 	return ERR_PTR(-ECONNABORTED);
566 
567 old_call:
568 	write_unlock_bh(&conn->lock);
569 	kmem_cache_free(rxrpc_call_jar, candidate);
570 	_leave(" = -ECONNRESET [old]");
571 	return ERR_PTR(-ECONNRESET);
572 }
573 
574 /*
575  * detach a call from a socket and set up for release
576  */
577 void rxrpc_release_call(struct rxrpc_call *call)
578 {
579 	struct rxrpc_connection *conn = call->conn;
580 	struct rxrpc_sock *rx = call->socket;
581 
582 	_enter("{%d,%d,%d,%d}",
583 	       call->debug_id, atomic_read(&call->usage),
584 	       atomic_read(&call->ackr_not_idle),
585 	       call->rx_first_oos);
586 
587 	spin_lock_bh(&call->lock);
588 	if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
589 		BUG();
590 	spin_unlock_bh(&call->lock);
591 
592 	/* dissociate from the socket
593 	 * - the socket's ref on the call is passed to the death timer
594 	 */
595 	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
596 
597 	write_lock_bh(&rx->call_lock);
598 	if (!list_empty(&call->accept_link)) {
599 		_debug("unlinking once-pending call %p { e=%lx f=%lx }",
600 		       call, call->events, call->flags);
601 		ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
602 		list_del_init(&call->accept_link);
603 		sk_acceptq_removed(&rx->sk);
604 	} else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
605 		rb_erase(&call->sock_node, &rx->calls);
606 		memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
607 		clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
608 	}
609 	write_unlock_bh(&rx->call_lock);
610 
611 	/* free up the channel for reuse */
612 	spin_lock(&conn->trans->client_lock);
613 	write_lock_bh(&conn->lock);
614 	write_lock(&call->state_lock);
615 
616 	if (conn->channels[call->channel] == call)
617 		conn->channels[call->channel] = NULL;
618 
619 	if (conn->out_clientflag && conn->bundle) {
620 		conn->avail_calls++;
621 		switch (conn->avail_calls) {
622 		case 1:
623 			list_move_tail(&conn->bundle_link,
624 				       &conn->bundle->avail_conns);
625 		case 2 ... RXRPC_MAXCALLS - 1:
626 			ASSERT(conn->channels[0] == NULL ||
627 			       conn->channels[1] == NULL ||
628 			       conn->channels[2] == NULL ||
629 			       conn->channels[3] == NULL);
630 			break;
631 		case RXRPC_MAXCALLS:
632 			list_move_tail(&conn->bundle_link,
633 				       &conn->bundle->unused_conns);
634 			ASSERT(conn->channels[0] == NULL &&
635 			       conn->channels[1] == NULL &&
636 			       conn->channels[2] == NULL &&
637 			       conn->channels[3] == NULL);
638 			break;
639 		default:
640 			pr_err("conn->avail_calls=%d\n", conn->avail_calls);
641 			BUG();
642 		}
643 	}
644 
645 	spin_unlock(&conn->trans->client_lock);
646 
647 	if (call->state < RXRPC_CALL_COMPLETE &&
648 	    call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
649 		_debug("+++ ABORTING STATE %d +++\n", call->state);
650 		call->state = RXRPC_CALL_LOCALLY_ABORTED;
651 		call->local_abort = RX_CALL_DEAD;
652 		set_bit(RXRPC_CALL_EV_ABORT, &call->events);
653 		rxrpc_queue_call(call);
654 	}
655 	write_unlock(&call->state_lock);
656 	write_unlock_bh(&conn->lock);
657 
658 	/* clean up the Rx queue */
659 	if (!skb_queue_empty(&call->rx_queue) ||
660 	    !skb_queue_empty(&call->rx_oos_queue)) {
661 		struct rxrpc_skb_priv *sp;
662 		struct sk_buff *skb;
663 
664 		_debug("purge Rx queues");
665 
666 		spin_lock_bh(&call->lock);
667 		while ((skb = skb_dequeue(&call->rx_queue)) ||
668 		       (skb = skb_dequeue(&call->rx_oos_queue))) {
669 			sp = rxrpc_skb(skb);
670 			if (sp->call) {
671 				ASSERTCMP(sp->call, ==, call);
672 				rxrpc_put_call(call);
673 				sp->call = NULL;
674 			}
675 			skb->destructor = NULL;
676 			spin_unlock_bh(&call->lock);
677 
678 			_debug("- zap %s %%%u #%u",
679 			       rxrpc_pkts[sp->hdr.type],
680 			       sp->hdr.serial, sp->hdr.seq);
681 			rxrpc_free_skb(skb);
682 			spin_lock_bh(&call->lock);
683 		}
684 		spin_unlock_bh(&call->lock);
685 
686 		ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
687 	}
688 
689 	del_timer_sync(&call->resend_timer);
690 	del_timer_sync(&call->ack_timer);
691 	del_timer_sync(&call->lifetimer);
692 	call->deadspan.expires = jiffies + rxrpc_dead_call_expiry;
693 	add_timer(&call->deadspan);
694 
695 	_leave("");
696 }
697 
698 /*
699  * handle a dead call being ready for reaping
700  */
701 static void rxrpc_dead_call_expired(unsigned long _call)
702 {
703 	struct rxrpc_call *call = (struct rxrpc_call *) _call;
704 
705 	_enter("{%d}", call->debug_id);
706 
707 	write_lock_bh(&call->state_lock);
708 	call->state = RXRPC_CALL_DEAD;
709 	write_unlock_bh(&call->state_lock);
710 	rxrpc_put_call(call);
711 }
712 
713 /*
714  * mark a call as to be released, aborting it if it's still in progress
715  * - called with softirqs disabled
716  */
717 static void rxrpc_mark_call_released(struct rxrpc_call *call)
718 {
719 	bool sched;
720 
721 	write_lock(&call->state_lock);
722 	if (call->state < RXRPC_CALL_DEAD) {
723 		sched = false;
724 		if (call->state < RXRPC_CALL_COMPLETE) {
725 			_debug("abort call %p", call);
726 			call->state = RXRPC_CALL_LOCALLY_ABORTED;
727 			call->local_abort = RX_CALL_DEAD;
728 			if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
729 				sched = true;
730 		}
731 		if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
732 			sched = true;
733 		if (sched)
734 			rxrpc_queue_call(call);
735 	}
736 	write_unlock(&call->state_lock);
737 }
738 
739 /*
740  * release all the calls associated with a socket
741  */
742 void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
743 {
744 	struct rxrpc_call *call;
745 	struct rb_node *p;
746 
747 	_enter("%p", rx);
748 
749 	read_lock_bh(&rx->call_lock);
750 
751 	/* mark all the calls as no longer wanting incoming packets */
752 	for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
753 		call = rb_entry(p, struct rxrpc_call, sock_node);
754 		rxrpc_mark_call_released(call);
755 	}
756 
757 	/* kill the not-yet-accepted incoming calls */
758 	list_for_each_entry(call, &rx->secureq, accept_link) {
759 		rxrpc_mark_call_released(call);
760 	}
761 
762 	list_for_each_entry(call, &rx->acceptq, accept_link) {
763 		rxrpc_mark_call_released(call);
764 	}
765 
766 	read_unlock_bh(&rx->call_lock);
767 	_leave("");
768 }
769 
770 /*
771  * release a call
772  */
773 void __rxrpc_put_call(struct rxrpc_call *call)
774 {
775 	ASSERT(call != NULL);
776 
777 	_enter("%p{u=%d}", call, atomic_read(&call->usage));
778 
779 	ASSERTCMP(atomic_read(&call->usage), >, 0);
780 
781 	if (atomic_dec_and_test(&call->usage)) {
782 		_debug("call %d dead", call->debug_id);
783 		ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
784 		rxrpc_queue_work(&call->destroyer);
785 	}
786 	_leave("");
787 }
788 
789 /*
790  * clean up a call
791  */
792 static void rxrpc_cleanup_call(struct rxrpc_call *call)
793 {
794 	_net("DESTROY CALL %d", call->debug_id);
795 
796 	ASSERT(call->socket);
797 
798 	memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
799 
800 	del_timer_sync(&call->lifetimer);
801 	del_timer_sync(&call->deadspan);
802 	del_timer_sync(&call->ack_timer);
803 	del_timer_sync(&call->resend_timer);
804 
805 	ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
806 	ASSERTCMP(call->events, ==, 0);
807 	if (work_pending(&call->processor)) {
808 		_debug("defer destroy");
809 		rxrpc_queue_work(&call->destroyer);
810 		return;
811 	}
812 
813 	if (call->conn) {
814 		spin_lock(&call->conn->trans->peer->lock);
815 		hlist_del_init(&call->error_link);
816 		spin_unlock(&call->conn->trans->peer->lock);
817 
818 		write_lock_bh(&call->conn->lock);
819 		rb_erase(&call->conn_node, &call->conn->calls);
820 		write_unlock_bh(&call->conn->lock);
821 		rxrpc_put_connection(call->conn);
822 	}
823 
824 	/* Remove the call from the hash */
825 	rxrpc_call_hash_del(call);
826 
827 	if (call->acks_window) {
828 		_debug("kill Tx window %d",
829 		       CIRC_CNT(call->acks_head, call->acks_tail,
830 				call->acks_winsz));
831 		smp_mb();
832 		while (CIRC_CNT(call->acks_head, call->acks_tail,
833 				call->acks_winsz) > 0) {
834 			struct rxrpc_skb_priv *sp;
835 			unsigned long _skb;
836 
837 			_skb = call->acks_window[call->acks_tail] & ~1;
838 			sp = rxrpc_skb((struct sk_buff *)_skb);
839 			_debug("+++ clear Tx %u", sp->hdr.seq);
840 			rxrpc_free_skb((struct sk_buff *)_skb);
841 			call->acks_tail =
842 				(call->acks_tail + 1) & (call->acks_winsz - 1);
843 		}
844 
845 		kfree(call->acks_window);
846 	}
847 
848 	rxrpc_free_skb(call->tx_pending);
849 
850 	rxrpc_purge_queue(&call->rx_queue);
851 	ASSERT(skb_queue_empty(&call->rx_oos_queue));
852 	sock_put(&call->socket->sk);
853 	kmem_cache_free(rxrpc_call_jar, call);
854 }
855 
856 /*
857  * destroy a call
858  */
859 static void rxrpc_destroy_call(struct work_struct *work)
860 {
861 	struct rxrpc_call *call =
862 		container_of(work, struct rxrpc_call, destroyer);
863 
864 	_enter("%p{%d,%d,%p}",
865 	       call, atomic_read(&call->usage), call->channel, call->conn);
866 
867 	ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
868 
869 	write_lock_bh(&rxrpc_call_lock);
870 	list_del_init(&call->link);
871 	write_unlock_bh(&rxrpc_call_lock);
872 
873 	rxrpc_cleanup_call(call);
874 	_leave("");
875 }
876 
877 /*
878  * preemptively destroy all the call records from a transport endpoint rather
879  * than waiting for them to time out
880  */
881 void __exit rxrpc_destroy_all_calls(void)
882 {
883 	struct rxrpc_call *call;
884 
885 	_enter("");
886 	write_lock_bh(&rxrpc_call_lock);
887 
888 	while (!list_empty(&rxrpc_calls)) {
889 		call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
890 		_debug("Zapping call %p", call);
891 
892 		list_del_init(&call->link);
893 
894 		switch (atomic_read(&call->usage)) {
895 		case 0:
896 			ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
897 			break;
898 		case 1:
899 			if (del_timer_sync(&call->deadspan) != 0 &&
900 			    call->state != RXRPC_CALL_DEAD)
901 				rxrpc_dead_call_expired((unsigned long) call);
902 			if (call->state != RXRPC_CALL_DEAD)
903 				break;
904 		default:
905 			pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
906 			       call, atomic_read(&call->usage),
907 			       atomic_read(&call->ackr_not_idle),
908 			       rxrpc_call_states[call->state],
909 			       call->flags, call->events);
910 			if (!skb_queue_empty(&call->rx_queue))
911 				pr_err("Rx queue occupied\n");
912 			if (!skb_queue_empty(&call->rx_oos_queue))
913 				pr_err("OOS queue occupied\n");
914 			break;
915 		}
916 
917 		write_unlock_bh(&rxrpc_call_lock);
918 		cond_resched();
919 		write_lock_bh(&rxrpc_call_lock);
920 	}
921 
922 	write_unlock_bh(&rxrpc_call_lock);
923 	_leave("");
924 }
925 
926 /*
927  * handle call lifetime being exceeded
928  */
929 static void rxrpc_call_life_expired(unsigned long _call)
930 {
931 	struct rxrpc_call *call = (struct rxrpc_call *) _call;
932 
933 	if (call->state >= RXRPC_CALL_COMPLETE)
934 		return;
935 
936 	_enter("{%d}", call->debug_id);
937 	read_lock_bh(&call->state_lock);
938 	if (call->state < RXRPC_CALL_COMPLETE) {
939 		set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events);
940 		rxrpc_queue_call(call);
941 	}
942 	read_unlock_bh(&call->state_lock);
943 }
944 
945 /*
946  * handle resend timer expiry
947  * - may not take call->state_lock as this can deadlock against del_timer_sync()
948  */
949 static void rxrpc_resend_time_expired(unsigned long _call)
950 {
951 	struct rxrpc_call *call = (struct rxrpc_call *) _call;
952 
953 	_enter("{%d}", call->debug_id);
954 
955 	if (call->state >= RXRPC_CALL_COMPLETE)
956 		return;
957 
958 	clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
959 	if (!test_and_set_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events))
960 		rxrpc_queue_call(call);
961 }
962 
963 /*
964  * handle ACK timer expiry
965  */
966 static void rxrpc_ack_time_expired(unsigned long _call)
967 {
968 	struct rxrpc_call *call = (struct rxrpc_call *) _call;
969 
970 	_enter("{%d}", call->debug_id);
971 
972 	if (call->state >= RXRPC_CALL_COMPLETE)
973 		return;
974 
975 	read_lock_bh(&call->state_lock);
976 	if (call->state < RXRPC_CALL_COMPLETE &&
977 	    !test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
978 		rxrpc_queue_call(call);
979 	read_unlock_bh(&call->state_lock);
980 }
981