xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rdsv3/ib_recv.c (revision 134379c07d59b848341b71d3c4819af39ad347cc)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24 
25 /*
26  * Copyright (c) 2006 Oracle.  All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *      - Redistributions of source code must retain the above
39  *        copyright notice, this list of conditions and the following
40  *        disclaimer.
41  *
42  *      - Redistributions in binary form must reproduce the above
43  *        copyright notice, this list of conditions and the following
44  *        disclaimer in the documentation and/or other materials
45  *        provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 #include <sys/types.h>
58 #include <sys/kmem.h>
59 #include <sys/cpuvar.h>
60 #include <sys/rds.h>
61 
62 #include <sys/ib/clients/rdsv3/rdsv3.h>
63 #include <sys/ib/clients/rdsv3/ib.h>
64 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
65 
66 static struct kmem_cache *rdsv3_ib_incoming_slab;
67 static struct kmem_cache *rdsv3_ib_frag_slab;
68 static atomic_t	rdsv3_ib_allocation = ATOMIC_INIT(0);
69 
70 static void
71 rdsv3_ib_frag_drop_page(struct rdsv3_page_frag *frag)
72 {
73 	RDSV3_DPRINTF5("rdsv3_ib_frag_drop_page",
74 	    "frag %p page %p offset %d", frag, frag->f_page, frag->f_offset);
75 	kmem_free(frag->f_page, PAGE_SIZE);
76 	frag->f_page = NULL;
77 }
78 
79 static void
80 rdsv3_ib_frag_free(struct rdsv3_page_frag *frag)
81 {
82 	RDSV3_DPRINTF5("rdsv3_ib_frag_free", "frag %p page %p",
83 	    frag, frag->f_page);
84 	ASSERT(frag->f_page == NULL);
85 	kmem_cache_free(rdsv3_ib_frag_slab, frag);
86 }
87 
88 /*
89  * We map a page at a time.  Its fragments are posted in order.  This
90  * is called in fragment order as the fragments get send completion events.
91  * Only the last frag in the page performs the unmapping.
92  *
93  * It's OK for ring cleanup to call this in whatever order it likes because
94  * DMA is not in flight and so we can unmap while other ring entries still
95  * hold page references in their frags.
96  */
97 static void
98 rdsv3_ib_recv_unmap_page(struct rdsv3_ib_connection *ic,
99     struct rdsv3_ib_recv_work *recv)
100 {
101 	struct rdsv3_page_frag *frag = recv->r_frag;
102 
103 #if 0
104 	RDSV3_DPRINTF5("rdsv3_ib_recv_unmap_page",
105 	    "recv %p frag %p page %p\n", recv, frag, frag->f_page);
106 #endif
107 	if (frag->f_mapped) {
108 		(void) ibt_unmap_mem_iov(
109 		    ib_get_ibt_hca_hdl(ic->i_cm_id->device), frag->f_mapped);
110 		frag->f_mapped = 0;
111 	}
112 }
113 
114 void
115 rdsv3_ib_recv_init_ring(struct rdsv3_ib_connection *ic)
116 {
117 	struct rdsv3_ib_recv_work *recv;
118 	struct rdsv3_header *hdrp;
119 	uint32_t i;
120 
121 	RDSV3_DPRINTF4("rdsv3_ib_recv_init_ring", "ic: %p", ic);
122 
123 	hdrp = ic->i_recv_hdrs;
124 	for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
125 		recv->r_ibinc = NULL;
126 		recv->r_frag = NULL;
127 
128 		recv->r_wr.recv.wr_id = i;
129 
130 		/* initialize the hdr sgl permanently */
131 		recv->r_sge[0].ds_va = (ib_vaddr_t)(uintptr_t)hdrp++;
132 		recv->r_sge[0].ds_len = sizeof (struct rdsv3_header);
133 		recv->r_sge[0].ds_key = ic->i_mr->lkey;
134 	}
135 }
136 
137 static void
138 rdsv3_ib_recv_clear_one(struct rdsv3_ib_connection *ic,
139     struct rdsv3_ib_recv_work *recv)
140 {
141 	RDSV3_DPRINTF4("rdsv3_ib_recv_clear_one", "ic: %p, recv: %p",
142 	    ic, recv);
143 
144 	if (recv->r_ibinc) {
145 		rdsv3_inc_put(&recv->r_ibinc->ii_inc);
146 		recv->r_ibinc = NULL;
147 	}
148 	if (recv->r_frag) {
149 		rdsv3_ib_recv_unmap_page(ic, recv);
150 		if (recv->r_frag->f_page)
151 			rdsv3_ib_frag_drop_page(recv->r_frag);
152 		rdsv3_ib_frag_free(recv->r_frag);
153 		recv->r_frag = NULL;
154 	}
155 
156 	RDSV3_DPRINTF4("rdsv3_ib_recv_clear_one", "Return: ic: %p, recv: %p",
157 	    ic, recv);
158 }
159 
160 void
161 rdsv3_ib_recv_clear_ring(struct rdsv3_ib_connection *ic)
162 {
163 	uint32_t i;
164 
165 	RDSV3_DPRINTF4("rdsv3_ib_recv_clear_ring", "ic: %p", ic);
166 
167 	for (i = 0; i < ic->i_recv_ring.w_nr; i++)
168 		rdsv3_ib_recv_clear_one(ic, &ic->i_recvs[i]);
169 
170 	if (ic->i_frag.f_page)
171 		rdsv3_ib_frag_drop_page(&ic->i_frag);
172 }
173 
174 extern int atomic_add_unless(atomic_t *, uint_t, ulong_t);
175 
176 static int
177 rdsv3_ib_recv_refill_one(struct rdsv3_connection *conn,
178     struct rdsv3_ib_recv_work *recv,
179     int kptr_gfp, int page_gfp)
180 {
181 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
182 	ibt_mi_hdl_t mi_hdl;
183 	ibt_iov_attr_t iov_attr;
184 	ibt_iov_t iov_arr[1];
185 	int ret = -ENOMEM;
186 
187 	RDSV3_DPRINTF5("rdsv3_ib_recv_refill_one", "conn: %p, recv: %p",
188 	    conn, recv);
189 
190 	if (recv->r_ibinc == NULL) {
191 		if (!atomic_add_unless(&rdsv3_ib_allocation, 1,
192 		    rdsv3_ib_sysctl_max_recv_allocation)) {
193 			rdsv3_ib_stats_inc(s_ib_rx_alloc_limit);
194 			goto out;
195 		}
196 		recv->r_ibinc = kmem_cache_alloc(rdsv3_ib_incoming_slab,
197 		    kptr_gfp);
198 		if (recv->r_ibinc == NULL) {
199 			atomic_add_32(&rdsv3_ib_allocation, -1);
200 			goto out;
201 		}
202 		list_create(&recv->r_ibinc->ii_frags,
203 		    sizeof (struct rdsv3_page_frag),
204 		    offsetof(struct rdsv3_page_frag, f_item));
205 		rdsv3_inc_init(&recv->r_ibinc->ii_inc, conn, conn->c_faddr);
206 	}
207 
208 	if (recv->r_frag == NULL) {
209 		recv->r_frag = kmem_cache_alloc(rdsv3_ib_frag_slab, kptr_gfp);
210 		if (recv->r_frag == NULL)
211 			goto out;
212 		list_link_init(&recv->r_frag->f_item);
213 		recv->r_frag->f_page = NULL;
214 	}
215 
216 	if (ic->i_frag.f_page == NULL) {
217 		ic->i_frag.f_page = kmem_alloc(PAGE_SIZE, page_gfp);
218 		if (ic->i_frag.f_page == NULL)
219 			goto out;
220 		ic->i_frag.f_offset = 0;
221 	}
222 
223 	iov_attr.iov_as = NULL;
224 	iov_attr.iov = &iov_arr[0];
225 	iov_attr.iov_buf = NULL;
226 	iov_attr.iov_list_len = 1;
227 	iov_attr.iov_wr_nds = 1;
228 	iov_attr.iov_lso_hdr_sz = 0;
229 	iov_attr.iov_flags = IBT_IOV_SLEEP | IBT_IOV_RECV;
230 
231 	/* Data */
232 	iov_arr[0].iov_addr = ic->i_frag.f_page + ic->i_frag.f_offset;
233 	iov_arr[0].iov_len = RDSV3_FRAG_SIZE;
234 
235 	/*
236 	 * Header comes from pre-registered buffer, so don't map it.
237 	 * Map the data only and stick in the header sgl quietly after
238 	 * the call.
239 	 */
240 	recv->r_wr.recv.wr_sgl = &recv->r_sge[1];
241 	recv->r_wr.recv.wr_nds = 1;
242 
243 	ret = ibt_map_mem_iov(ib_get_ibt_hca_hdl(ic->i_cm_id->device),
244 	    &iov_attr, &recv->r_wr, &mi_hdl);
245 	if (ret != IBT_SUCCESS) {
246 		RDSV3_DPRINTF2("rdsv3_ib_recv_refill_one",
247 		    "ibt_map_mem_iov failed: %d", ret);
248 		goto out;
249 	}
250 
251 	/* stick in the header */
252 	recv->r_wr.recv.wr_sgl = &recv->r_sge[0];
253 	recv->r_wr.recv.wr_nds = RDSV3_IB_RECV_SGE;
254 
255 	/*
256 	 * Once we get the RDSV3_PAGE_LAST_OFF frag then rdsv3_ib_frag_unmap()
257 	 * must be called on this recv.  This happens as completions hit
258 	 * in order or on connection shutdown.
259 	 */
260 	recv->r_frag->f_page = ic->i_frag.f_page;
261 	recv->r_frag->f_offset = ic->i_frag.f_offset;
262 	recv->r_frag->f_mapped = mi_hdl;
263 
264 	if (ic->i_frag.f_offset < RDSV3_PAGE_LAST_OFF) {
265 		ic->i_frag.f_offset += RDSV3_FRAG_SIZE;
266 	} else {
267 		ic->i_frag.f_page = NULL;
268 		ic->i_frag.f_offset = 0;
269 	}
270 
271 	ret = 0;
272 
273 	RDSV3_DPRINTF5("rdsv3_ib_recv_refill_one", "Return: conn: %p, recv: %p",
274 	    conn, recv);
275 out:
276 	return (ret);
277 }
278 
279 /*
280  * This tries to allocate and post unused work requests after making sure that
281  * they have all the allocations they need to queue received fragments into
282  * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
283  * pairs don't go unmatched.
284  *
285  * -1 is returned if posting fails due to temporary resource exhaustion.
286  */
287 int
288 rdsv3_ib_recv_refill(struct rdsv3_connection *conn, int kptr_gfp,
289     int page_gfp, int prefill)
290 {
291 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
292 	struct rdsv3_ib_recv_work *recv;
293 	unsigned int succ_wr;
294 	unsigned int posted = 0;
295 	int ret = 0;
296 	uint32_t pos;
297 
298 	RDSV3_DPRINTF4("rdsv3_ib_recv_refill", "conn: %p, prefill: %d",
299 	    conn, prefill);
300 
301 	while ((prefill || rdsv3_conn_up(conn)) &&
302 	    rdsv3_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
303 		if (pos >= ic->i_recv_ring.w_nr) {
304 			RDSV3_DPRINTF2("rdsv3_ib_recv_refill",
305 			    "Argh - ring alloc returned pos=%u",
306 			    pos);
307 			ret = -EINVAL;
308 			break;
309 		}
310 
311 		recv = &ic->i_recvs[pos];
312 		ret = rdsv3_ib_recv_refill_one(conn, recv, kptr_gfp, page_gfp);
313 		if (ret) {
314 			ret = -1;
315 			break;
316 		}
317 
318 		/* XXX when can this fail? */
319 		ret = ibt_post_recv(ib_get_ibt_channel_hdl(ic->i_cm_id),
320 		    &recv->r_wr.recv, 1, &succ_wr);
321 		RDSV3_DPRINTF5("rdsv3_ib_recv_refill",
322 		    "recv %p ibinc %p frag %p ret %d\n", recv,
323 		    recv->r_ibinc, recv->r_frag, ret);
324 		if (ret) {
325 			RDSV3_DPRINTF2("rdsv3_ib_recv_refill",
326 			    "recv post on %u.%u.%u.%u returned %d, "
327 			    "disconnecting and reconnecting\n",
328 			    NIPQUAD(conn->c_faddr), ret);
329 			rdsv3_conn_drop(conn);
330 			ret = -1;
331 			break;
332 		}
333 
334 		posted++;
335 	}
336 
337 	/* We're doing flow control - update the window. */
338 	if (ic->i_flowctl && posted)
339 		rdsv3_ib_advertise_credits(conn, posted);
340 
341 	if (ret)
342 		rdsv3_ib_ring_unalloc(&ic->i_recv_ring, 1);
343 
344 	RDSV3_DPRINTF4("rdsv3_ib_recv_refill", "Return: conn: %p, posted: %d",
345 	    conn, posted);
346 	return (ret);
347 }
348 
349 void
350 rdsv3_ib_inc_purge(struct rdsv3_incoming *inc)
351 {
352 	struct rdsv3_ib_incoming *ibinc;
353 	struct rdsv3_page_frag *frag;
354 	struct rdsv3_page_frag *pos;
355 
356 	RDSV3_DPRINTF4("rdsv3_ib_inc_purge", "inc: %p", inc);
357 
358 	ibinc = container_of(inc, struct rdsv3_ib_incoming, ii_inc);
359 	RDSV3_DPRINTF5("rdsv3_ib_inc_purge",
360 	    "purging ibinc %p inc %p\n", ibinc, inc);
361 
362 	RDSV3_FOR_EACH_LIST_NODE_SAFE(frag, pos, &ibinc->ii_frags, f_item) {
363 		list_remove_node(&frag->f_item);
364 		rdsv3_ib_frag_drop_page(frag);
365 		rdsv3_ib_frag_free(frag);
366 	}
367 
368 	RDSV3_DPRINTF4("rdsv3_ib_inc_purge", "Return: inc: %p", inc);
369 }
370 
371 void
372 rdsv3_ib_inc_free(struct rdsv3_incoming *inc)
373 {
374 	struct rdsv3_ib_incoming *ibinc;
375 
376 	RDSV3_DPRINTF4("rdsv3_ib_inc_free", "inc: %p", inc);
377 
378 	ibinc = container_of(inc, struct rdsv3_ib_incoming, ii_inc);
379 
380 	rdsv3_ib_inc_purge(inc);
381 	RDSV3_DPRINTF5("rdsv3_ib_inc_free", "freeing ibinc %p inc %p",
382 	    ibinc, inc);
383 	ASSERT(list_is_empty(&ibinc->ii_frags));
384 	kmem_cache_free(rdsv3_ib_incoming_slab, ibinc);
385 	atomic_dec_uint(&rdsv3_ib_allocation);
386 
387 	RDSV3_DPRINTF4("rdsv3_ib_inc_free", "Return: inc: %p", inc);
388 }
389 
390 int
391 rdsv3_ib_inc_copy_to_user(struct rdsv3_incoming *inc, uio_t *uiop,
392     size_t size)
393 {
394 	struct rdsv3_ib_incoming *ibinc;
395 	struct rdsv3_page_frag *frag;
396 	unsigned long to_copy;
397 	unsigned long frag_off = 0;
398 	int copied = 0;
399 	int ret;
400 	uint32_t len;
401 
402 	ibinc = container_of(inc, struct rdsv3_ib_incoming, ii_inc);
403 	frag = list_head(&ibinc->ii_frags);
404 	len = ntohl(inc->i_hdr.h_len);
405 
406 	RDSV3_DPRINTF4("rdsv3_ib_inc_copy_to_user", "inc: %p, size: %d len: %d",
407 	    inc, size, len);
408 
409 	while (copied < size && copied < len) {
410 		if (frag_off == RDSV3_FRAG_SIZE) {
411 			frag = list_next(&ibinc->ii_frags, frag);
412 			frag_off = 0;
413 		}
414 
415 		to_copy = min(len - copied, RDSV3_FRAG_SIZE - frag_off);
416 		to_copy = min(size - copied, to_copy);
417 
418 		RDSV3_DPRINTF5("rdsv3_ib_inc_copy_to_user",
419 		    "%lu bytes to user %p from frag [%p, %u] + %lu",
420 		    to_copy, uiop,
421 		    frag->f_page, frag->f_offset, frag_off);
422 
423 		ret = uiomove((caddr_t)(frag->f_page +
424 		    frag->f_offset + frag_off),
425 		    to_copy, UIO_READ, uiop);
426 		if (ret) {
427 			RDSV3_DPRINTF2("rdsv3_ib_inc_copy_to_user",
428 			    "uiomove (%d) returned: %d", to_copy, ret);
429 			break;
430 		}
431 
432 		frag_off += to_copy;
433 		copied += to_copy;
434 	}
435 
436 	RDSV3_DPRINTF4("rdsv3_ib_inc_copy_to_user",
437 	    "Return: inc: %p, copied: %d", inc, copied);
438 
439 	return (copied);
440 }
441 
442 /* ic starts out kmem_zalloc()ed */
443 void
444 rdsv3_ib_recv_init_ack(struct rdsv3_ib_connection *ic)
445 {
446 	ibt_send_wr_t *wr = &ic->i_ack_wr;
447 	ibt_wr_ds_t *sge = &ic->i_ack_sge;
448 
449 	RDSV3_DPRINTF4("rdsv3_ib_recv_init_ack", "ic: %p", ic);
450 
451 	sge->ds_va = ic->i_ack_dma;
452 	sge->ds_len = sizeof (struct rdsv3_header);
453 	sge->ds_key = ic->i_mr->lkey;
454 
455 	wr->wr_sgl = sge;
456 	wr->wr_nds = 1;
457 	wr->wr_opcode = IBT_WRC_SEND;
458 	wr->wr_id = RDSV3_IB_ACK_WR_ID;
459 	wr->wr_flags = IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
460 }
461 
462 /*
463  * You'd think that with reliable IB connections you wouldn't need to ack
464  * messages that have been received.  The problem is that IB hardware generates
465  * an ack message before it has DMAed the message into memory.  This creates a
466  * potential message loss if the HCA is disabled for any reason between when it
467  * sends the ack and before the message is DMAed and processed.  This is only a
468  * potential issue if another HCA is available for fail-over.
469  *
470  * When the remote host receives our ack they'll free the sent message from
471  * their send queue.  To decrease the latency of this we always send an ack
472  * immediately after we've received messages.
473  *
474  * For simplicity, we only have one ack in flight at a time.  This puts
475  * pressure on senders to have deep enough send queues to absorb the latency of
476  * a single ack frame being in flight.  This might not be good enough.
477  *
478  * This is implemented by have a long-lived send_wr and sge which point to a
479  * statically allocated ack frame.  This ack wr does not fall under the ring
480  * accounting that the tx and rx wrs do.  The QP attribute specifically makes
481  * room for it beyond the ring size.  Send completion notices its special
482  * wr_id and avoids working with the ring in that case.
483  */
484 static void
485 rdsv3_ib_set_ack(struct rdsv3_ib_connection *ic, uint64_t seq,
486     int ack_required)
487 {
488 	RDSV3_DPRINTF4("rdsv3_ib_set_ack", "ic: %p, seq: %lld ack: %d",
489 	    ic, seq, ack_required);
490 
491 	mutex_enter(&ic->i_ack_lock);
492 	ic->i_ack_next = seq;
493 	if (ack_required)
494 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
495 	mutex_exit(&ic->i_ack_lock);
496 }
497 
498 static uint64_t
499 rdsv3_ib_get_ack(struct rdsv3_ib_connection *ic)
500 {
501 	uint64_t seq;
502 
503 	RDSV3_DPRINTF4("rdsv3_ib_get_ack", "ic: %p", ic);
504 
505 	clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
506 
507 	mutex_enter(&ic->i_ack_lock);
508 	seq = ic->i_ack_next;
509 	mutex_exit(&ic->i_ack_lock);
510 
511 	return (seq);
512 }
513 
514 static void
515 rdsv3_ib_send_ack(struct rdsv3_ib_connection *ic, unsigned int adv_credits)
516 {
517 	struct rdsv3_header *hdr = ic->i_ack;
518 	uint64_t seq;
519 	int ret;
520 
521 	RDSV3_DPRINTF4("rdsv3_ib_send_ack", "ic: %p adv_credits: %d",
522 	    ic, adv_credits);
523 
524 	seq = rdsv3_ib_get_ack(ic);
525 
526 	RDSV3_DPRINTF4("rdsv3_ib_send_ack", "send_ack: ic %p ack %llu",
527 	    ic, (unsigned long long) seq);
528 	rdsv3_message_populate_header(hdr, 0, 0, 0);
529 	hdr->h_ack = htonll(seq);
530 	hdr->h_credit = adv_credits;
531 	rdsv3_message_make_checksum(hdr);
532 	ic->i_ack_queued = jiffies;
533 
534 	ret = ibt_post_send(RDSV3_QP2CHANHDL(ic->i_cm_id->qp), &ic->i_ack_wr, 1,
535 	    NULL);
536 	if (ret) {
537 		/*
538 		 * Failed to send. Release the WR, and
539 		 * force another ACK.
540 		 */
541 		clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
542 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
543 		rdsv3_ib_stats_inc(s_ib_ack_send_failure);
544 		RDSV3_DPRINTF2("rdsv3_ib_send_ack", "sending ack failed\n");
545 		rdsv3_conn_drop(ic->conn);
546 	} else {
547 		rdsv3_ib_stats_inc(s_ib_ack_sent);
548 	}
549 	RDSV3_DPRINTF4("rdsv3_ib_send_ack", "Return: ic: %p adv_credits: %d",
550 	    ic, adv_credits);
551 }
552 
553 /*
554  * There are 3 ways of getting acknowledgements to the peer:
555  *  1.	We call rdsv3_ib_attempt_ack from the recv completion handler
556  *	to send an ACK-only frame.
557  *	However, there can be only one such frame in the send queue
558  *	at any time, so we may have to postpone it.
559  *  2.	When another (data) packet is transmitted while there's
560  *	an ACK in the queue, we piggyback the ACK sequence number
561  *	on the data packet.
562  *  3.	If the ACK WR is done sending, we get called from the
563  *	send queue completion handler, and check whether there's
564  *	another ACK pending (postponed because the WR was on the
565  *	queue). If so, we transmit it.
566  *
567  * We maintain 2 variables:
568  *  -	i_ack_flags, which keeps track of whether the ACK WR
569  *	is currently in the send queue or not (IB_ACK_IN_FLIGHT)
570  *  -	i_ack_next, which is the last sequence number we received
571  *
572  * Potentially, send queue and receive queue handlers can run concurrently.
573  * It would be nice to not have to use a spinlock to synchronize things,
574  * but the one problem that rules this out is that 64bit updates are
575  * not atomic on all platforms. Things would be a lot simpler if
576  * we had atomic64 or maybe cmpxchg64 everywhere.
577  *
578  * Reconnecting complicates this picture just slightly. When we
579  * reconnect, we may be seeing duplicate packets. The peer
580  * is retransmitting them, because it hasn't seen an ACK for
581  * them. It is important that we ACK these.
582  *
583  * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
584  * this flag set *MUST* be acknowledged immediately.
585  */
586 
587 /*
588  * When we get here, we're called from the recv queue handler.
589  * Check whether we ought to transmit an ACK.
590  */
591 void
592 rdsv3_ib_attempt_ack(struct rdsv3_ib_connection *ic)
593 {
594 	unsigned int adv_credits;
595 
596 	RDSV3_DPRINTF4("rdsv3_ib_attempt_ack", "ic: %p", ic);
597 
598 	if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
599 		return;
600 
601 	if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
602 		rdsv3_ib_stats_inc(s_ib_ack_send_delayed);
603 		return;
604 	}
605 
606 	/* Can we get a send credit? */
607 	if (!rdsv3_ib_send_grab_credits(ic, 1, &adv_credits, 0)) {
608 		rdsv3_ib_stats_inc(s_ib_tx_throttle);
609 		clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
610 		return;
611 	}
612 
613 	clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
614 	rdsv3_ib_send_ack(ic, adv_credits);
615 
616 	RDSV3_DPRINTF4("rdsv3_ib_attempt_ack", "Return: ic: %p", ic);
617 }
618 
619 /*
620  * We get here from the send completion handler, when the
621  * adapter tells us the ACK frame was sent.
622  */
623 void
624 rdsv3_ib_ack_send_complete(struct rdsv3_ib_connection *ic)
625 {
626 	RDSV3_DPRINTF4("rdsv3_ib_ack_send_complete", "ic: %p", ic);
627 	clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
628 	rdsv3_ib_attempt_ack(ic);
629 }
630 
631 /*
632  * This is called by the regular xmit code when it wants to piggyback
633  * an ACK on an outgoing frame.
634  */
635 uint64_t
636 rdsv3_ib_piggyb_ack(struct rdsv3_ib_connection *ic)
637 {
638 	RDSV3_DPRINTF4("rdsv3_ib_piggyb_ack", "ic: %p", ic);
639 	if (test_and_clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags)) {
640 		rdsv3_ib_stats_inc(s_ib_ack_send_piggybacked);
641 	}
642 	return (rdsv3_ib_get_ack(ic));
643 }
644 
645 static struct rdsv3_header *
646 rdsv3_ib_get_header(struct rdsv3_connection *conn,
647     struct rdsv3_ib_recv_work *recv,
648     uint32_t data_len)
649 {
650 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
651 	void *hdr_buff = &ic->i_recv_hdrs[recv - ic->i_recvs];
652 
653 	RDSV3_DPRINTF4("rdsv3_ib_get_header", "conn: %p, recv: %p len: %d",
654 	    conn, recv, data_len);
655 
656 	/*
657 	 * Support header at the front (RDS 3.1+) as well as header-at-end.
658 	 *
659 	 * Cases:
660 	 * 1) header all in header buff (great!)
661 	 * 2) header all in data page (copy all to header buff)
662 	 * 3) header split across hdr buf + data page
663 	 *    (move bit in hdr buff to end before copying other bit from
664 	 *    data page)
665 	 */
666 	if (conn->c_version > RDS_PROTOCOL_3_0 || data_len == RDSV3_FRAG_SIZE)
667 		return (hdr_buff);
668 	/*
669 	 * XXX - Need to discuss the support for version < RDS_PROTOCOL_3_1.
670 	 */
671 	if (conn->c_version == RDS_PROTOCOL_3_0)
672 		return (hdr_buff);
673 
674 	/* version < RDS_PROTOCOL_3_0 */
675 	RDSV3_DPRINTF2("rdsv3_ib_get_header",
676 	    "NULL header (version: 0x%x, data_len: %d)", conn->c_version,
677 	    data_len);
678 	return (NULL);
679 }
680 
681 /*
682  * It's kind of lame that we're copying from the posted receive pages into
683  * long-lived bitmaps.  We could have posted the bitmaps and rdma written into
684  * them.  But receiving new congestion bitmaps should be a *rare* event, so
685  * hopefully we won't need to invest that complexity in making it more
686  * efficient.  By copying we can share a simpler core with TCP which has to
687  * copy.
688  */
689 static void
690 rdsv3_ib_cong_recv(struct rdsv3_connection *conn,
691     struct rdsv3_ib_incoming *ibinc)
692 {
693 	struct rdsv3_cong_map *map;
694 	unsigned int map_off;
695 	unsigned int map_page;
696 	struct rdsv3_page_frag *frag;
697 	unsigned long frag_off;
698 	unsigned long to_copy;
699 	unsigned long copied;
700 	uint64_t uncongested = 0;
701 	caddr_t addr;
702 
703 	RDSV3_DPRINTF4("rdsv3_ib_cong_recv", "conn: %p, ibinc: %p",
704 	    conn, ibinc);
705 
706 	/* catch completely corrupt packets */
707 	if (ntohl(ibinc->ii_inc.i_hdr.h_len) != RDSV3_CONG_MAP_BYTES)
708 		return;
709 
710 	map = conn->c_fcong;
711 	map_page = 0;
712 	map_off = 0;
713 
714 	frag = list_head(&ibinc->ii_frags);
715 	frag_off = 0;
716 
717 	copied = 0;
718 
719 	while (copied < RDSV3_CONG_MAP_BYTES) {
720 		uint64_t *src, *dst;
721 		unsigned int k;
722 
723 		to_copy = min(RDSV3_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
724 		ASSERT(!(to_copy & 7)); /* Must be 64bit aligned. */
725 
726 		addr = frag->f_page + frag->f_offset;
727 
728 		src = (uint64_t *)(addr + frag_off);
729 		dst = (uint64_t *)(map->m_page_addrs[map_page] + map_off);
730 		RDSV3_DPRINTF4("rdsv3_ib_cong_recv",
731 		    "src: %p dst: %p copied: %d", src, dst, copied);
732 		for (k = 0; k < to_copy; k += 8) {
733 			/*
734 			 * Record ports that became uncongested, ie
735 			 * bits that changed from 0 to 1.
736 			 */
737 			uncongested |= ~(*src) & *dst;
738 			*dst++ = *src++;
739 		}
740 
741 		copied += to_copy;
742 		RDSV3_DPRINTF4("rdsv3_ib_cong_recv",
743 		    "src: %p dst: %p copied: %d", src, dst, copied);
744 
745 		map_off += to_copy;
746 		if (map_off == PAGE_SIZE) {
747 			map_off = 0;
748 			map_page++;
749 		}
750 
751 		frag_off += to_copy;
752 		if (frag_off == RDSV3_FRAG_SIZE) {
753 			frag = list_next(&ibinc->ii_frags, frag);
754 			frag_off = 0;
755 		}
756 	}
757 
758 #if 0
759 XXX
760 	/* the congestion map is in little endian order */
761 	uncongested = le64_to_cpu(uncongested);
762 #endif
763 
764 	rdsv3_cong_map_updated(map, uncongested);
765 
766 	RDSV3_DPRINTF4("rdsv3_ib_cong_recv", "Return: conn: %p, ibinc: %p",
767 	    conn, ibinc);
768 }
769 
770 /*
771  * Rings are posted with all the allocations they'll need to queue the
772  * incoming message to the receiving socket so this can't fail.
773  * All fragments start with a header, so we can make sure we're not receiving
774  * garbage, and we can tell a small 8 byte fragment from an ACK frame.
775  */
776 struct rdsv3_ib_ack_state {
777 	uint64_t		ack_next;
778 	uint64_t		ack_recv;
779 	unsigned int	ack_required:1;
780 	unsigned int	ack_next_valid:1;
781 	unsigned int	ack_recv_valid:1;
782 };
783 
784 static void
785 rdsv3_ib_process_recv(struct rdsv3_connection *conn,
786     struct rdsv3_ib_recv_work *recv, uint32_t data_len,
787     struct rdsv3_ib_ack_state *state)
788 {
789 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
790 	struct rdsv3_ib_incoming *ibinc = ic->i_ibinc;
791 	struct rdsv3_header *ihdr, *hdr;
792 
793 	/* XXX shut down the connection if port 0,0 are seen? */
794 
795 	RDSV3_DPRINTF5("rdsv3_ib_process_recv",
796 	    "ic %p ibinc %p recv %p byte len %u", ic, ibinc, recv, data_len);
797 
798 	if (data_len < sizeof (struct rdsv3_header)) {
799 		RDSV3_DPRINTF2("rdsv3_ib_process_recv",
800 		    "incoming message from %u.%u.%u.%u didn't include a "
801 		    "header, disconnecting and reconnecting",
802 		    NIPQUAD(conn->c_faddr));
803 		rdsv3_conn_drop(conn);
804 		return;
805 	}
806 	data_len -= sizeof (struct rdsv3_header);
807 
808 	if ((ihdr = rdsv3_ib_get_header(conn, recv, data_len)) == NULL) {
809 		RDSV3_DPRINTF2("rdsv3_ib_process_recv", "incoming message "
810 		    "from %u.%u.%u.%u didn't have a proper version (0x%x) or"
811 		    "data_len (0x%x), disconnecting and "
812 		    "reconnecting",
813 		    NIPQUAD(conn->c_faddr), conn->c_version, data_len);
814 		rdsv3_conn_drop(conn);
815 		return;
816 	}
817 
818 	/* Validate the checksum. */
819 	if (!rdsv3_message_verify_checksum(ihdr)) {
820 		RDSV3_DPRINTF2("rdsv3_ib_process_recv", "incoming message "
821 		    "from %u.%u.%u.%u has corrupted header - "
822 		    "forcing a reconnect",
823 		    NIPQUAD(conn->c_faddr));
824 		rdsv3_conn_drop(conn);
825 		rdsv3_stats_inc(s_recv_drop_bad_checksum);
826 		return;
827 	}
828 
829 	/* Process the ACK sequence which comes with every packet */
830 	state->ack_recv = ntohll(ihdr->h_ack);
831 	state->ack_recv_valid = 1;
832 
833 	/* Process the credits update if there was one */
834 	if (ihdr->h_credit)
835 		rdsv3_ib_send_add_credits(conn, ihdr->h_credit);
836 
837 	if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) {
838 		/*
839 		 * This is an ACK-only packet. The fact that it gets
840 		 * special treatment here is that historically, ACKs
841 		 * were rather special beasts.
842 		 */
843 		rdsv3_ib_stats_inc(s_ib_ack_received);
844 
845 		/*
846 		 * Usually the frags make their way on to incs and are then
847 		 * freed as
848 		 * the inc is freed.  We don't go that route, so we have to
849 		 * drop the
850 		 * page ref ourselves.  We can't just leave the page on the recv
851 		 * because that confuses the dma mapping of pages and each
852 		 * recv's use
853 		 * of a partial page.  We can leave the frag, though, it will be
854 		 * reused.
855 		 *
856 		 * FIXME: Fold this into the code path below.
857 		 */
858 		rdsv3_ib_frag_drop_page(recv->r_frag);
859 		return;
860 	}
861 
862 	/*
863 	 * If we don't already have an inc on the connection then this
864 	 * fragment has a header and starts a message.. copy its header
865 	 * into the inc and save the inc so we can hang upcoming fragments
866 	 * off its list.
867 	 */
868 	if (ibinc == NULL) {
869 		ibinc = recv->r_ibinc;
870 		recv->r_ibinc = NULL;
871 		ic->i_ibinc = ibinc;
872 
873 		hdr = &ibinc->ii_inc.i_hdr;
874 		(void) memcpy(hdr, ihdr, sizeof (*hdr));
875 		ic->i_recv_data_rem = ntohl(hdr->h_len);
876 
877 		RDSV3_DPRINTF5("rdsv3_ib_process_recv",
878 		    "ic %p ibinc %p rem %u flag 0x%x", ic, ibinc,
879 		    ic->i_recv_data_rem, hdr->h_flags);
880 	} else {
881 		hdr = &ibinc->ii_inc.i_hdr;
882 		/*
883 		 * We can't just use memcmp here; fragments of a
884 		 * single message may carry different ACKs
885 		 */
886 		if (hdr->h_sequence != ihdr->h_sequence ||
887 		    hdr->h_len != ihdr->h_len ||
888 		    hdr->h_sport != ihdr->h_sport ||
889 		    hdr->h_dport != ihdr->h_dport) {
890 			RDSV3_DPRINTF2("rdsv3_ib_process_recv",
891 			    "fragment header mismatch; forcing reconnect");
892 			rdsv3_conn_drop(conn);
893 			return;
894 		}
895 	}
896 
897 	list_insert_tail(&ibinc->ii_frags, recv->r_frag);
898 	recv->r_frag = NULL;
899 
900 	if (ic->i_recv_data_rem > RDSV3_FRAG_SIZE)
901 		ic->i_recv_data_rem -= RDSV3_FRAG_SIZE;
902 	else {
903 		ic->i_recv_data_rem = 0;
904 		ic->i_ibinc = NULL;
905 
906 		if (ibinc->ii_inc.i_hdr.h_flags == RDSV3_FLAG_CONG_BITMAP)
907 			rdsv3_ib_cong_recv(conn, ibinc);
908 		else {
909 			rdsv3_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
910 			    &ibinc->ii_inc, KM_NOSLEEP);
911 			state->ack_next = ntohll(hdr->h_sequence);
912 			state->ack_next_valid = 1;
913 		}
914 
915 		/*
916 		 * Evaluate the ACK_REQUIRED flag *after* we received
917 		 * the complete frame, and after bumping the next_rx
918 		 * sequence.
919 		 */
920 		if (hdr->h_flags & RDSV3_FLAG_ACK_REQUIRED) {
921 			rdsv3_stats_inc(s_recv_ack_required);
922 			state->ack_required = 1;
923 		}
924 
925 		rdsv3_inc_put(&ibinc->ii_inc);
926 	}
927 
928 	RDSV3_DPRINTF4("rdsv3_ib_process_recv",
929 	    "Return: conn: %p recv: %p len: %d state: %p",
930 	    conn, recv, data_len, state);
931 }
932 
933 /*
934  * Plucking the oldest entry from the ring can be done concurrently with
935  * the thread refilling the ring.  Each ring operation is protected by
936  * spinlocks and the transient state of refilling doesn't change the
937  * recording of which entry is oldest.
938  *
939  * This relies on IB only calling one cq comp_handler for each cq so that
940  * there will only be one caller of rdsv3_recv_incoming() per RDS connection.
941  */
942 
943 void
944 rdsv3_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
945 {
946 	struct rdsv3_connection *conn = context;
947 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
948 
949 	RDSV3_DPRINTF4("rdsv3_ib_recv_cq_comp_handler",
950 	    "Enter(conn: %p cq: %p)", conn, cq);
951 
952 	rdsv3_ib_stats_inc(s_ib_rx_cq_call);
953 
954 	(void) ddi_taskq_dispatch(ic->i_recv_tasklet, rdsv3_ib_recv_tasklet_fn,
955 	    (void *)ic, DDI_SLEEP);
956 }
957 
958 static inline void
959 rdsv3_poll_cq(struct rdsv3_ib_connection *ic, struct rdsv3_ib_ack_state *state)
960 {
961 	struct rdsv3_connection *conn = ic->conn;
962 	ibt_wc_t wc;
963 	struct rdsv3_ib_recv_work *recv;
964 	uint_t polled;
965 
966 	while (ibt_poll_cq(RDSV3_CQ2CQHDL(ic->i_recv_cq), &wc, 1, &polled) ==
967 	    IBT_SUCCESS) {
968 		RDSV3_DPRINTF5("rdsv3_ib_recv_cq_comp_handler",
969 		    "rwc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
970 		    (unsigned long long)wc.wc_id, wc.wc_status,
971 		    wc.wc_bytes_xfer, ntohl(wc.wc_immed_data));
972 		rdsv3_ib_stats_inc(s_ib_rx_cq_event);
973 
974 		recv = &ic->i_recvs[rdsv3_ib_ring_oldest(&ic->i_recv_ring)];
975 
976 		rdsv3_ib_recv_unmap_page(ic, recv);
977 
978 		/*
979 		 * Also process recvs in connecting state because it is possible
980 		 * to get a recv completion _before_ the rdmacm ESTABLISHED
981 		 * event is processed.
982 		 */
983 		if (rdsv3_conn_up(conn) || rdsv3_conn_connecting(conn)) {
984 			/*
985 			 * We expect errors as the qp is drained during
986 			 * shutdown
987 			 */
988 			if (wc.wc_status == IBT_WC_SUCCESS) {
989 				rdsv3_ib_process_recv(conn, recv,
990 				    wc.wc_bytes_xfer, state);
991 			} else {
992 				RDSV3_DPRINTF2("rdsv3_ib_recv_cq_comp_handler",
993 				    "recv completion on "
994 				    "%u.%u.%u.%u had status %u, "
995 				    "disconnecting and reconnecting\n",
996 				    NIPQUAD(conn->c_faddr),
997 				    wc.wc_status);
998 				rdsv3_conn_drop(conn);
999 			}
1000 		}
1001 
1002 		rdsv3_ib_ring_free(&ic->i_recv_ring, 1);
1003 	}
1004 }
1005 
1006 static processorid_t rdsv3_taskq_bind_cpuid = 0;
1007 void
1008 rdsv3_ib_recv_tasklet_fn(void *data)
1009 {
1010 	struct rdsv3_ib_connection *ic = (struct rdsv3_ib_connection *)data;
1011 	struct rdsv3_connection *conn = ic->conn;
1012 	struct rdsv3_ib_ack_state state = { 0, };
1013 	cpu_t   *cp;
1014 
1015 	RDSV3_DPRINTF4("rdsv3_ib_recv_tasklet_fn", "Enter: ic: %p", ic);
1016 
1017 	/* If not already bound, bind this thread to a CPU */
1018 	if (ic->i_recv_tasklet_cpuid != rdsv3_taskq_bind_cpuid) {
1019 		cp = cpu[rdsv3_taskq_bind_cpuid];
1020 		mutex_enter(&cpu_lock);
1021 		if (cpu_is_online(cp)) {
1022 			if (ic->i_recv_tasklet_cpuid >= 0)
1023 				thread_affinity_clear(curthread);
1024 			thread_affinity_set(curthread, rdsv3_taskq_bind_cpuid);
1025 			ic->i_recv_tasklet_cpuid = rdsv3_taskq_bind_cpuid;
1026 		}
1027 		mutex_exit(&cpu_lock);
1028 	}
1029 
1030 	rdsv3_poll_cq(ic, &state);
1031 	(void) ibt_enable_cq_notify(RDSV3_CQ2CQHDL(ic->i_recv_cq),
1032 	    IBT_NEXT_SOLICITED);
1033 	rdsv3_poll_cq(ic, &state);
1034 
1035 	if (state.ack_next_valid)
1036 		rdsv3_ib_set_ack(ic, state.ack_next, state.ack_required);
1037 	if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
1038 		rdsv3_send_drop_acked(conn, state.ack_recv, NULL);
1039 		ic->i_ack_recv = state.ack_recv;
1040 	}
1041 	if (rdsv3_conn_up(conn))
1042 		rdsv3_ib_attempt_ack(ic);
1043 
1044 	/*
1045 	 * If we ever end up with a really empty receive ring, we're
1046 	 * in deep trouble, as the sender will definitely see RNR
1047 	 * timeouts.
1048 	 */
1049 	if (rdsv3_ib_ring_empty(&ic->i_recv_ring))
1050 		rdsv3_ib_stats_inc(s_ib_rx_ring_empty);
1051 
1052 	/*
1053 	 * If the ring is running low, then schedule the thread to refill.
1054 	 */
1055 	if (rdsv3_ib_ring_low(&ic->i_recv_ring) &&
1056 	    (rdsv3_conn_up(conn) || rdsv3_conn_connecting(conn)))
1057 		rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_recv_w, 0);
1058 
1059 	RDSV3_DPRINTF4("rdsv3_ib_recv_tasklet_fn", "Return: ic: %p", ic);
1060 }
1061 
1062 int
1063 rdsv3_ib_recv(struct rdsv3_connection *conn)
1064 {
1065 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
1066 	int ret = 0;
1067 
1068 	RDSV3_DPRINTF4("rdsv3_ib_recv", "conn %p\n", conn);
1069 
1070 	/*
1071 	 * If we get a temporary posting failure in this context then
1072 	 * we're really low and we want the caller to back off for a bit.
1073 	 */
1074 	mutex_enter(&ic->i_recv_mutex);
1075 	if (rdsv3_ib_recv_refill(conn, KM_NOSLEEP, 0, 0))
1076 		ret = -ENOMEM;
1077 	else
1078 		rdsv3_ib_stats_inc(s_ib_rx_refill_from_thread);
1079 	mutex_exit(&ic->i_recv_mutex);
1080 
1081 	if (rdsv3_conn_up(conn))
1082 		rdsv3_ib_attempt_ack(ic);
1083 
1084 	RDSV3_DPRINTF4("rdsv3_ib_recv", "Return: conn: %p", conn);
1085 
1086 	return (ret);
1087 }
1088 
1089 uint_t	MaxRecvMemory = 128 * 1024 * 1024;
1090 
1091 int
1092 rdsv3_ib_recv_init(void)
1093 {
1094 	int ret = -ENOMEM;
1095 
1096 	RDSV3_DPRINTF4("rdsv3_ib_recv_init", "Enter");
1097 
1098 	/* XXX - hard code it to 128 MB */
1099 	rdsv3_ib_sysctl_max_recv_allocation = MaxRecvMemory / RDSV3_FRAG_SIZE;
1100 
1101 	rdsv3_ib_incoming_slab = kmem_cache_create("rdsv3_ib_incoming",
1102 	    sizeof (struct rdsv3_ib_incoming), 0, NULL, NULL, NULL,
1103 	    NULL, NULL, 0);
1104 	if (rdsv3_ib_incoming_slab == NULL)
1105 		goto out;
1106 
1107 	rdsv3_ib_frag_slab = kmem_cache_create("rdsv3_ib_frag",
1108 	    sizeof (struct rdsv3_page_frag),
1109 	    0, NULL, NULL, NULL, NULL, NULL, 0);
1110 	if (rdsv3_ib_frag_slab == NULL)
1111 		kmem_cache_destroy(rdsv3_ib_incoming_slab);
1112 	else
1113 		ret = 0;
1114 
1115 	RDSV3_DPRINTF4("rdsv3_ib_recv_init", "Return");
1116 out:
1117 	return (ret);
1118 }
1119 
1120 void
1121 rdsv3_ib_recv_exit(void)
1122 {
1123 	RDSV3_DPRINTF4("rdsv3_ib_recv_exit", "Enter");
1124 	kmem_cache_destroy(rdsv3_ib_incoming_slab);
1125 	kmem_cache_destroy(rdsv3_ib_frag_slab);
1126 	RDSV3_DPRINTF4("rdsv3_ib_recv_exit", "Return");
1127 }
1128