xref: /linux/net/sunrpc/xprtrdma/verbs.c (revision 42fda66387daa53538ae13a2c858396aaf037158)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/pci.h>	/* for Tavor hack below */
51 
52 #include "xprt_rdma.h"
53 
54 /*
55  * Globals/Macros
56  */
57 
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY	RPCDBG_TRANS
60 #endif
61 
62 /*
63  * internal functions
64  */
65 
66 /*
67  * handle replies in tasklet context, using a single, global list
68  * rdma tasklet function -- just turn around and call the func
69  * for all replies on the list
70  */
71 
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
74 
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
77 {
78 	struct rpcrdma_rep *rep;
79 	void (*func)(struct rpcrdma_rep *);
80 	unsigned long flags;
81 
82 	data = data;
83 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84 	while (!list_empty(&rpcrdma_tasklets_g)) {
85 		rep = list_entry(rpcrdma_tasklets_g.next,
86 				 struct rpcrdma_rep, rr_list);
87 		list_del(&rep->rr_list);
88 		func = rep->rr_func;
89 		rep->rr_func = NULL;
90 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91 
92 		if (func)
93 			func(rep);
94 		else
95 			rpcrdma_recv_buffer_put(rep);
96 
97 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98 	}
99 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 }
101 
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103 
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106 {
107 	unsigned long flags;
108 
109 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110 	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112 	tasklet_schedule(&rpcrdma_tasklet_g);
113 }
114 
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117 {
118 	struct rpcrdma_ep *ep = context;
119 
120 	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121 		__func__, event->event, event->device->name, context);
122 	if (ep->rep_connected == 1) {
123 		ep->rep_connected = -EIO;
124 		ep->rep_func(ep);
125 		wake_up_all(&ep->rep_connect_wait);
126 	}
127 }
128 
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131 {
132 	struct rpcrdma_ep *ep = context;
133 
134 	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135 		__func__, event->event, event->device->name, context);
136 	if (ep->rep_connected == 1) {
137 		ep->rep_connected = -EIO;
138 		ep->rep_func(ep);
139 		wake_up_all(&ep->rep_connect_wait);
140 	}
141 }
142 
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
145 {
146 	struct rpcrdma_rep *rep =
147 			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148 
149 	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150 		__func__, rep, wc->status, wc->opcode, wc->byte_len);
151 
152 	if (!rep) /* send or bind completion that we don't care about */
153 		return;
154 
155 	if (IB_WC_SUCCESS != wc->status) {
156 		dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157 			__func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158 			 wc->status);
159 		rep->rr_len = ~0U;
160 		rpcrdma_schedule_tasklet(rep);
161 		return;
162 	}
163 
164 	switch (wc->opcode) {
165 	case IB_WC_RECV:
166 		rep->rr_len = wc->byte_len;
167 		ib_dma_sync_single_for_cpu(
168 			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169 			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170 		/* Keep (only) the most recent credits, after check validity */
171 		if (rep->rr_len >= 16) {
172 			struct rpcrdma_msg *p =
173 					(struct rpcrdma_msg *) rep->rr_base;
174 			unsigned int credits = ntohl(p->rm_credit);
175 			if (credits == 0) {
176 				dprintk("RPC:       %s: server"
177 					" dropped credits to 0!\n", __func__);
178 				/* don't deadlock */
179 				credits = 1;
180 			} else if (credits > rep->rr_buffer->rb_max_requests) {
181 				dprintk("RPC:       %s: server"
182 					" over-crediting: %d (%d)\n",
183 					__func__, credits,
184 					rep->rr_buffer->rb_max_requests);
185 				credits = rep->rr_buffer->rb_max_requests;
186 			}
187 			atomic_set(&rep->rr_buffer->rb_credits, credits);
188 		}
189 		/* fall through */
190 	case IB_WC_BIND_MW:
191 		rpcrdma_schedule_tasklet(rep);
192 		break;
193 	default:
194 		dprintk("RPC:       %s: unexpected WC event %X\n",
195 			__func__, wc->opcode);
196 		break;
197 	}
198 }
199 
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
202 {
203 	struct ib_wc wc;
204 	int rc;
205 
206 	for (;;) {
207 		rc = ib_poll_cq(cq, 1, &wc);
208 		if (rc < 0) {
209 			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210 				__func__, rc);
211 			return rc;
212 		}
213 		if (rc == 0)
214 			break;
215 
216 		rpcrdma_event_process(&wc);
217 	}
218 
219 	return 0;
220 }
221 
222 /*
223  * rpcrdma_cq_event_upcall
224  *
225  * This upcall handles recv, send, bind and unbind events.
226  * It is reentrant but processes single events in order to maintain
227  * ordering of receives to keep server credits.
228  *
229  * It is the responsibility of the scheduled tasklet to return
230  * recv buffers to the pool. NOTE: this affects synchronization of
231  * connection shutdown. That is, the structures required for
232  * the completion of the reply handler must remain intact until
233  * all memory has been reclaimed.
234  *
235  * Note that send events are suppressed and do not result in an upcall.
236  */
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239 {
240 	int rc;
241 
242 	rc = rpcrdma_cq_poll(cq);
243 	if (rc)
244 		return;
245 
246 	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247 	if (rc) {
248 		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249 			__func__, rc);
250 		return;
251 	}
252 
253 	rpcrdma_cq_poll(cq);
254 }
255 
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258 	"address resolved",
259 	"address error",
260 	"route resolved",
261 	"route error",
262 	"connect request",
263 	"connect response",
264 	"connect error",
265 	"unreachable",
266 	"rejected",
267 	"established",
268 	"disconnected",
269 	"device removal"
270 };
271 #endif
272 
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275 {
276 	struct rpcrdma_xprt *xprt = id->context;
277 	struct rpcrdma_ia *ia = &xprt->rx_ia;
278 	struct rpcrdma_ep *ep = &xprt->rx_ep;
279 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280 	struct ib_qp_attr attr;
281 	struct ib_qp_init_attr iattr;
282 	int connstate = 0;
283 
284 	switch (event->event) {
285 	case RDMA_CM_EVENT_ADDR_RESOLVED:
286 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
287 		complete(&ia->ri_done);
288 		break;
289 	case RDMA_CM_EVENT_ADDR_ERROR:
290 		ia->ri_async_rc = -EHOSTUNREACH;
291 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
292 			__func__, ep);
293 		complete(&ia->ri_done);
294 		break;
295 	case RDMA_CM_EVENT_ROUTE_ERROR:
296 		ia->ri_async_rc = -ENETUNREACH;
297 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
298 			__func__, ep);
299 		complete(&ia->ri_done);
300 		break;
301 	case RDMA_CM_EVENT_ESTABLISHED:
302 		connstate = 1;
303 		ib_query_qp(ia->ri_id->qp, &attr,
304 			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
305 			&iattr);
306 		dprintk("RPC:       %s: %d responder resources"
307 			" (%d initiator)\n",
308 			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
309 		goto connected;
310 	case RDMA_CM_EVENT_CONNECT_ERROR:
311 		connstate = -ENOTCONN;
312 		goto connected;
313 	case RDMA_CM_EVENT_UNREACHABLE:
314 		connstate = -ENETDOWN;
315 		goto connected;
316 	case RDMA_CM_EVENT_REJECTED:
317 		connstate = -ECONNREFUSED;
318 		goto connected;
319 	case RDMA_CM_EVENT_DISCONNECTED:
320 		connstate = -ECONNABORTED;
321 		goto connected;
322 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
323 		connstate = -ENODEV;
324 connected:
325 		dprintk("RPC:       %s: %s: %u.%u.%u.%u:%u"
326 			" (ep 0x%p event 0x%x)\n",
327 			__func__,
328 			(event->event <= 11) ? conn[event->event] :
329 						"unknown connection error",
330 			NIPQUAD(addr->sin_addr.s_addr),
331 			ntohs(addr->sin_port),
332 			ep, event->event);
333 		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
334 		dprintk("RPC:       %s: %sconnected\n",
335 					__func__, connstate > 0 ? "" : "dis");
336 		ep->rep_connected = connstate;
337 		ep->rep_func(ep);
338 		wake_up_all(&ep->rep_connect_wait);
339 		break;
340 	default:
341 		ia->ri_async_rc = -EINVAL;
342 		dprintk("RPC:       %s: unexpected CM event %X\n",
343 			__func__, event->event);
344 		complete(&ia->ri_done);
345 		break;
346 	}
347 
348 	return 0;
349 }
350 
351 static struct rdma_cm_id *
352 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
353 			struct rpcrdma_ia *ia, struct sockaddr *addr)
354 {
355 	struct rdma_cm_id *id;
356 	int rc;
357 
358 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
359 	if (IS_ERR(id)) {
360 		rc = PTR_ERR(id);
361 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
362 			__func__, rc);
363 		return id;
364 	}
365 
366 	ia->ri_async_rc = 0;
367 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
368 	if (rc) {
369 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
370 			__func__, rc);
371 		goto out;
372 	}
373 	wait_for_completion(&ia->ri_done);
374 	rc = ia->ri_async_rc;
375 	if (rc)
376 		goto out;
377 
378 	ia->ri_async_rc = 0;
379 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
380 	if (rc) {
381 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
382 			__func__, rc);
383 		goto out;
384 	}
385 	wait_for_completion(&ia->ri_done);
386 	rc = ia->ri_async_rc;
387 	if (rc)
388 		goto out;
389 
390 	return id;
391 
392 out:
393 	rdma_destroy_id(id);
394 	return ERR_PTR(rc);
395 }
396 
397 /*
398  * Drain any cq, prior to teardown.
399  */
400 static void
401 rpcrdma_clean_cq(struct ib_cq *cq)
402 {
403 	struct ib_wc wc;
404 	int count = 0;
405 
406 	while (1 == ib_poll_cq(cq, 1, &wc))
407 		++count;
408 
409 	if (count)
410 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
411 			__func__, count, wc.opcode);
412 }
413 
414 /*
415  * Exported functions.
416  */
417 
418 /*
419  * Open and initialize an Interface Adapter.
420  *  o initializes fields of struct rpcrdma_ia, including
421  *    interface and provider attributes and protection zone.
422  */
423 int
424 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
425 {
426 	int rc;
427 	struct rpcrdma_ia *ia = &xprt->rx_ia;
428 
429 	init_completion(&ia->ri_done);
430 
431 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
432 	if (IS_ERR(ia->ri_id)) {
433 		rc = PTR_ERR(ia->ri_id);
434 		goto out1;
435 	}
436 
437 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
438 	if (IS_ERR(ia->ri_pd)) {
439 		rc = PTR_ERR(ia->ri_pd);
440 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
441 			__func__, rc);
442 		goto out2;
443 	}
444 
445 	/*
446 	 * Optionally obtain an underlying physical identity mapping in
447 	 * order to do a memory window-based bind. This base registration
448 	 * is protected from remote access - that is enabled only by binding
449 	 * for the specific bytes targeted during each RPC operation, and
450 	 * revoked after the corresponding completion similar to a storage
451 	 * adapter.
452 	 */
453 	if (memreg > RPCRDMA_REGISTER) {
454 		int mem_priv = IB_ACCESS_LOCAL_WRITE;
455 		switch (memreg) {
456 #if RPCRDMA_PERSISTENT_REGISTRATION
457 		case RPCRDMA_ALLPHYSICAL:
458 			mem_priv |= IB_ACCESS_REMOTE_WRITE;
459 			mem_priv |= IB_ACCESS_REMOTE_READ;
460 			break;
461 #endif
462 		case RPCRDMA_MEMWINDOWS_ASYNC:
463 		case RPCRDMA_MEMWINDOWS:
464 			mem_priv |= IB_ACCESS_MW_BIND;
465 			break;
466 		default:
467 			break;
468 		}
469 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
470 		if (IS_ERR(ia->ri_bind_mem)) {
471 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
472 				"phys register failed with %lX\n\t"
473 				"Will continue with degraded performance\n",
474 				__func__, PTR_ERR(ia->ri_bind_mem));
475 			memreg = RPCRDMA_REGISTER;
476 			ia->ri_bind_mem = NULL;
477 		}
478 	}
479 
480 	/* Else will do memory reg/dereg for each chunk */
481 	ia->ri_memreg_strategy = memreg;
482 
483 	return 0;
484 out2:
485 	rdma_destroy_id(ia->ri_id);
486 out1:
487 	return rc;
488 }
489 
490 /*
491  * Clean up/close an IA.
492  *   o if event handles and PD have been initialized, free them.
493  *   o close the IA
494  */
495 void
496 rpcrdma_ia_close(struct rpcrdma_ia *ia)
497 {
498 	int rc;
499 
500 	dprintk("RPC:       %s: entering\n", __func__);
501 	if (ia->ri_bind_mem != NULL) {
502 		rc = ib_dereg_mr(ia->ri_bind_mem);
503 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
504 			__func__, rc);
505 	}
506 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id) && ia->ri_id->qp)
507 		rdma_destroy_qp(ia->ri_id);
508 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
509 		rc = ib_dealloc_pd(ia->ri_pd);
510 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
511 			__func__, rc);
512 	}
513 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id))
514 		rdma_destroy_id(ia->ri_id);
515 }
516 
517 /*
518  * Create unconnected endpoint.
519  */
520 int
521 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
522 				struct rpcrdma_create_data_internal *cdata)
523 {
524 	struct ib_device_attr devattr;
525 	int rc;
526 
527 	rc = ib_query_device(ia->ri_id->device, &devattr);
528 	if (rc) {
529 		dprintk("RPC:       %s: ib_query_device failed %d\n",
530 			__func__, rc);
531 		return rc;
532 	}
533 
534 	/* check provider's send/recv wr limits */
535 	if (cdata->max_requests > devattr.max_qp_wr)
536 		cdata->max_requests = devattr.max_qp_wr;
537 
538 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
539 	ep->rep_attr.qp_context = ep;
540 	/* send_cq and recv_cq initialized below */
541 	ep->rep_attr.srq = NULL;
542 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
543 	switch (ia->ri_memreg_strategy) {
544 	case RPCRDMA_MEMWINDOWS_ASYNC:
545 	case RPCRDMA_MEMWINDOWS:
546 		/* Add room for mw_binds+unbinds - overkill! */
547 		ep->rep_attr.cap.max_send_wr++;
548 		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
549 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
550 			return -EINVAL;
551 		break;
552 	default:
553 		break;
554 	}
555 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
556 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
557 	ep->rep_attr.cap.max_recv_sge = 1;
558 	ep->rep_attr.cap.max_inline_data = 0;
559 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
560 	ep->rep_attr.qp_type = IB_QPT_RC;
561 	ep->rep_attr.port_num = ~0;
562 
563 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
564 		"iovs: send %d recv %d\n",
565 		__func__,
566 		ep->rep_attr.cap.max_send_wr,
567 		ep->rep_attr.cap.max_recv_wr,
568 		ep->rep_attr.cap.max_send_sge,
569 		ep->rep_attr.cap.max_recv_sge);
570 
571 	/* set trigger for requesting send completion */
572 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
573 	switch (ia->ri_memreg_strategy) {
574 	case RPCRDMA_MEMWINDOWS_ASYNC:
575 	case RPCRDMA_MEMWINDOWS:
576 		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
577 		break;
578 	default:
579 		break;
580 	}
581 	if (ep->rep_cqinit <= 2)
582 		ep->rep_cqinit = 0;
583 	INIT_CQCOUNT(ep);
584 	ep->rep_ia = ia;
585 	init_waitqueue_head(&ep->rep_connect_wait);
586 
587 	/*
588 	 * Create a single cq for receive dto and mw_bind (only ever
589 	 * care about unbind, really). Send completions are suppressed.
590 	 * Use single threaded tasklet upcalls to maintain ordering.
591 	 */
592 	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
593 				  rpcrdma_cq_async_error_upcall, NULL,
594 				  ep->rep_attr.cap.max_recv_wr +
595 				  ep->rep_attr.cap.max_send_wr + 1, 0);
596 	if (IS_ERR(ep->rep_cq)) {
597 		rc = PTR_ERR(ep->rep_cq);
598 		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
599 			__func__, rc);
600 		goto out1;
601 	}
602 
603 	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
604 	if (rc) {
605 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
606 			__func__, rc);
607 		goto out2;
608 	}
609 
610 	ep->rep_attr.send_cq = ep->rep_cq;
611 	ep->rep_attr.recv_cq = ep->rep_cq;
612 
613 	/* Initialize cma parameters */
614 
615 	/* RPC/RDMA does not use private data */
616 	ep->rep_remote_cma.private_data = NULL;
617 	ep->rep_remote_cma.private_data_len = 0;
618 
619 	/* Client offers RDMA Read but does not initiate */
620 	switch (ia->ri_memreg_strategy) {
621 	case RPCRDMA_BOUNCEBUFFERS:
622 		ep->rep_remote_cma.responder_resources = 0;
623 		break;
624 	case RPCRDMA_MTHCAFMR:
625 	case RPCRDMA_REGISTER:
626 		ep->rep_remote_cma.responder_resources = cdata->max_requests *
627 				(RPCRDMA_MAX_DATA_SEGS / 8);
628 		break;
629 	case RPCRDMA_MEMWINDOWS:
630 	case RPCRDMA_MEMWINDOWS_ASYNC:
631 #if RPCRDMA_PERSISTENT_REGISTRATION
632 	case RPCRDMA_ALLPHYSICAL:
633 #endif
634 		ep->rep_remote_cma.responder_resources = cdata->max_requests *
635 				(RPCRDMA_MAX_DATA_SEGS / 2);
636 		break;
637 	default:
638 		break;
639 	}
640 	if (ep->rep_remote_cma.responder_resources > devattr.max_qp_rd_atom)
641 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
642 	ep->rep_remote_cma.initiator_depth = 0;
643 
644 	ep->rep_remote_cma.retry_count = 7;
645 	ep->rep_remote_cma.flow_control = 0;
646 	ep->rep_remote_cma.rnr_retry_count = 0;
647 
648 	return 0;
649 
650 out2:
651 	if (ib_destroy_cq(ep->rep_cq))
652 		;
653 out1:
654 	return rc;
655 }
656 
657 /*
658  * rpcrdma_ep_destroy
659  *
660  * Disconnect and destroy endpoint. After this, the only
661  * valid operations on the ep are to free it (if dynamically
662  * allocated) or re-create it.
663  *
664  * The caller's error handling must be sure to not leak the endpoint
665  * if this function fails.
666  */
667 int
668 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
669 {
670 	int rc;
671 
672 	dprintk("RPC:       %s: entering, connected is %d\n",
673 		__func__, ep->rep_connected);
674 
675 	if (ia->ri_id->qp) {
676 		rc = rpcrdma_ep_disconnect(ep, ia);
677 		if (rc)
678 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
679 				" returned %i\n", __func__, rc);
680 	}
681 
682 	ep->rep_func = NULL;
683 
684 	/* padding - could be done in rpcrdma_buffer_destroy... */
685 	if (ep->rep_pad_mr) {
686 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
687 		ep->rep_pad_mr = NULL;
688 	}
689 
690 	if (ia->ri_id->qp) {
691 		rdma_destroy_qp(ia->ri_id);
692 		ia->ri_id->qp = NULL;
693 	}
694 
695 	rpcrdma_clean_cq(ep->rep_cq);
696 	rc = ib_destroy_cq(ep->rep_cq);
697 	if (rc)
698 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
699 			__func__, rc);
700 
701 	return rc;
702 }
703 
704 /*
705  * Connect unconnected endpoint.
706  */
707 int
708 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
709 {
710 	struct rdma_cm_id *id;
711 	int rc = 0;
712 	int retry_count = 0;
713 	int reconnect = (ep->rep_connected != 0);
714 
715 	if (reconnect) {
716 		struct rpcrdma_xprt *xprt;
717 retry:
718 		rc = rpcrdma_ep_disconnect(ep, ia);
719 		if (rc && rc != -ENOTCONN)
720 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
721 				" status %i\n", __func__, rc);
722 		rpcrdma_clean_cq(ep->rep_cq);
723 
724 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
725 		id = rpcrdma_create_id(xprt, ia,
726 				(struct sockaddr *)&xprt->rx_data.addr);
727 		if (IS_ERR(id)) {
728 			rc = PTR_ERR(id);
729 			goto out;
730 		}
731 		/* TEMP TEMP TEMP - fail if new device:
732 		 * Deregister/remarshal *all* requests!
733 		 * Close and recreate adapter, pd, etc!
734 		 * Re-determine all attributes still sane!
735 		 * More stuff I haven't thought of!
736 		 * Rrrgh!
737 		 */
738 		if (ia->ri_id->device != id->device) {
739 			printk("RPC:       %s: can't reconnect on "
740 				"different device!\n", __func__);
741 			rdma_destroy_id(id);
742 			rc = -ENETDOWN;
743 			goto out;
744 		}
745 		/* END TEMP */
746 		rdma_destroy_id(ia->ri_id);
747 		ia->ri_id = id;
748 	}
749 
750 	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
751 	if (rc) {
752 		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
753 			__func__, rc);
754 		goto out;
755 	}
756 
757 /* XXX Tavor device performs badly with 2K MTU! */
758 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
759 	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
760 	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
761 	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
762 	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
763 		struct ib_qp_attr attr = {
764 			.path_mtu = IB_MTU_1024
765 		};
766 		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
767 	}
768 }
769 
770 	/* Theoretically a client initiator_depth > 0 is not needed,
771 	 * but many peers fail to complete the connection unless they
772 	 * == responder_resources! */
773 	if (ep->rep_remote_cma.initiator_depth !=
774 				ep->rep_remote_cma.responder_resources)
775 		ep->rep_remote_cma.initiator_depth =
776 			ep->rep_remote_cma.responder_resources;
777 
778 	ep->rep_connected = 0;
779 
780 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
781 	if (rc) {
782 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
783 				__func__, rc);
784 		goto out;
785 	}
786 
787 	if (reconnect)
788 		return 0;
789 
790 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
791 
792 	/*
793 	 * Check state. A non-peer reject indicates no listener
794 	 * (ECONNREFUSED), which may be a transient state. All
795 	 * others indicate a transport condition which has already
796 	 * undergone a best-effort.
797 	 */
798 	if (ep->rep_connected == -ECONNREFUSED
799 	    && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
800 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
801 		goto retry;
802 	}
803 	if (ep->rep_connected <= 0) {
804 		/* Sometimes, the only way to reliably connect to remote
805 		 * CMs is to use same nonzero values for ORD and IRD. */
806 		ep->rep_remote_cma.initiator_depth =
807 					ep->rep_remote_cma.responder_resources;
808 		if (ep->rep_remote_cma.initiator_depth == 0)
809 			++ep->rep_remote_cma.initiator_depth;
810 		if (ep->rep_remote_cma.responder_resources == 0)
811 			++ep->rep_remote_cma.responder_resources;
812 		if (retry_count++ == 0)
813 			goto retry;
814 		rc = ep->rep_connected;
815 	} else {
816 		dprintk("RPC:       %s: connected\n", __func__);
817 	}
818 
819 out:
820 	if (rc)
821 		ep->rep_connected = rc;
822 	return rc;
823 }
824 
825 /*
826  * rpcrdma_ep_disconnect
827  *
828  * This is separate from destroy to facilitate the ability
829  * to reconnect without recreating the endpoint.
830  *
831  * This call is not reentrant, and must not be made in parallel
832  * on the same endpoint.
833  */
834 int
835 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
836 {
837 	int rc;
838 
839 	rpcrdma_clean_cq(ep->rep_cq);
840 	rc = rdma_disconnect(ia->ri_id);
841 	if (!rc) {
842 		/* returns without wait if not connected */
843 		wait_event_interruptible(ep->rep_connect_wait,
844 							ep->rep_connected != 1);
845 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
846 			(ep->rep_connected == 1) ? "still " : "dis");
847 	} else {
848 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
849 		ep->rep_connected = rc;
850 	}
851 	return rc;
852 }
853 
854 /*
855  * Initialize buffer memory
856  */
857 int
858 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
859 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
860 {
861 	char *p;
862 	size_t len;
863 	int i, rc;
864 
865 	buf->rb_max_requests = cdata->max_requests;
866 	spin_lock_init(&buf->rb_lock);
867 	atomic_set(&buf->rb_credits, 1);
868 
869 	/* Need to allocate:
870 	 *   1.  arrays for send and recv pointers
871 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
872 	 *   3.  array of struct rpcrdma_rep for replies
873 	 *   4.  padding, if any
874 	 *   5.  mw's, if any
875 	 * Send/recv buffers in req/rep need to be registered
876 	 */
877 
878 	len = buf->rb_max_requests *
879 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
880 	len += cdata->padding;
881 	switch (ia->ri_memreg_strategy) {
882 	case RPCRDMA_MTHCAFMR:
883 		/* TBD we are perhaps overallocating here */
884 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
885 				sizeof(struct rpcrdma_mw);
886 		break;
887 	case RPCRDMA_MEMWINDOWS_ASYNC:
888 	case RPCRDMA_MEMWINDOWS:
889 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
890 				sizeof(struct rpcrdma_mw);
891 		break;
892 	default:
893 		break;
894 	}
895 
896 	/* allocate 1, 4 and 5 in one shot */
897 	p = kzalloc(len, GFP_KERNEL);
898 	if (p == NULL) {
899 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
900 			__func__, len);
901 		rc = -ENOMEM;
902 		goto out;
903 	}
904 	buf->rb_pool = p;	/* for freeing it later */
905 
906 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
907 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
908 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
909 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
910 
911 	/*
912 	 * Register the zeroed pad buffer, if any.
913 	 */
914 	if (cdata->padding) {
915 		rc = rpcrdma_register_internal(ia, p, cdata->padding,
916 					    &ep->rep_pad_mr, &ep->rep_pad);
917 		if (rc)
918 			goto out;
919 	}
920 	p += cdata->padding;
921 
922 	/*
923 	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
924 	 * We "cycle" the mw's in order to minimize rkey reuse,
925 	 * and also reduce unbind-to-bind collision.
926 	 */
927 	INIT_LIST_HEAD(&buf->rb_mws);
928 	switch (ia->ri_memreg_strategy) {
929 	case RPCRDMA_MTHCAFMR:
930 		{
931 		struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
932 		struct ib_fmr_attr fa = {
933 			RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT
934 		};
935 		/* TBD we are perhaps overallocating here */
936 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
937 			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
938 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
939 				&fa);
940 			if (IS_ERR(r->r.fmr)) {
941 				rc = PTR_ERR(r->r.fmr);
942 				dprintk("RPC:       %s: ib_alloc_fmr"
943 					" failed %i\n", __func__, rc);
944 				goto out;
945 			}
946 			list_add(&r->mw_list, &buf->rb_mws);
947 			++r;
948 		}
949 		}
950 		break;
951 	case RPCRDMA_MEMWINDOWS_ASYNC:
952 	case RPCRDMA_MEMWINDOWS:
953 		{
954 		struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
955 		/* Allocate one extra request's worth, for full cycling */
956 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
957 			r->r.mw = ib_alloc_mw(ia->ri_pd);
958 			if (IS_ERR(r->r.mw)) {
959 				rc = PTR_ERR(r->r.mw);
960 				dprintk("RPC:       %s: ib_alloc_mw"
961 					" failed %i\n", __func__, rc);
962 				goto out;
963 			}
964 			list_add(&r->mw_list, &buf->rb_mws);
965 			++r;
966 		}
967 		}
968 		break;
969 	default:
970 		break;
971 	}
972 
973 	/*
974 	 * Allocate/init the request/reply buffers. Doing this
975 	 * using kmalloc for now -- one for each buf.
976 	 */
977 	for (i = 0; i < buf->rb_max_requests; i++) {
978 		struct rpcrdma_req *req;
979 		struct rpcrdma_rep *rep;
980 
981 		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
982 		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
983 		/* Typical ~2400b, so rounding up saves work later */
984 		if (len < 4096)
985 			len = 4096;
986 		req = kmalloc(len, GFP_KERNEL);
987 		if (req == NULL) {
988 			dprintk("RPC:       %s: request buffer %d alloc"
989 				" failed\n", __func__, i);
990 			rc = -ENOMEM;
991 			goto out;
992 		}
993 		memset(req, 0, sizeof(struct rpcrdma_req));
994 		buf->rb_send_bufs[i] = req;
995 		buf->rb_send_bufs[i]->rl_buffer = buf;
996 
997 		rc = rpcrdma_register_internal(ia, req->rl_base,
998 				len - offsetof(struct rpcrdma_req, rl_base),
999 				&buf->rb_send_bufs[i]->rl_handle,
1000 				&buf->rb_send_bufs[i]->rl_iov);
1001 		if (rc)
1002 			goto out;
1003 
1004 		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1005 
1006 		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1007 		rep = kmalloc(len, GFP_KERNEL);
1008 		if (rep == NULL) {
1009 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1010 				__func__, i);
1011 			rc = -ENOMEM;
1012 			goto out;
1013 		}
1014 		memset(rep, 0, sizeof(struct rpcrdma_rep));
1015 		buf->rb_recv_bufs[i] = rep;
1016 		buf->rb_recv_bufs[i]->rr_buffer = buf;
1017 		init_waitqueue_head(&rep->rr_unbind);
1018 
1019 		rc = rpcrdma_register_internal(ia, rep->rr_base,
1020 				len - offsetof(struct rpcrdma_rep, rr_base),
1021 				&buf->rb_recv_bufs[i]->rr_handle,
1022 				&buf->rb_recv_bufs[i]->rr_iov);
1023 		if (rc)
1024 			goto out;
1025 
1026 	}
1027 	dprintk("RPC:       %s: max_requests %d\n",
1028 		__func__, buf->rb_max_requests);
1029 	/* done */
1030 	return 0;
1031 out:
1032 	rpcrdma_buffer_destroy(buf);
1033 	return rc;
1034 }
1035 
1036 /*
1037  * Unregister and destroy buffer memory. Need to deal with
1038  * partial initialization, so it's callable from failed create.
1039  * Must be called before destroying endpoint, as registrations
1040  * reference it.
1041  */
1042 void
1043 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1044 {
1045 	int rc, i;
1046 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1047 
1048 	/* clean up in reverse order from create
1049 	 *   1.  recv mr memory (mr free, then kfree)
1050 	 *   1a. bind mw memory
1051 	 *   2.  send mr memory (mr free, then kfree)
1052 	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1053 	 *   4.  arrays
1054 	 */
1055 	dprintk("RPC:       %s: entering\n", __func__);
1056 
1057 	for (i = 0; i < buf->rb_max_requests; i++) {
1058 		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1059 			rpcrdma_deregister_internal(ia,
1060 					buf->rb_recv_bufs[i]->rr_handle,
1061 					&buf->rb_recv_bufs[i]->rr_iov);
1062 			kfree(buf->rb_recv_bufs[i]);
1063 		}
1064 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1065 			while (!list_empty(&buf->rb_mws)) {
1066 				struct rpcrdma_mw *r;
1067 				r = list_entry(buf->rb_mws.next,
1068 					struct rpcrdma_mw, mw_list);
1069 				list_del(&r->mw_list);
1070 				switch (ia->ri_memreg_strategy) {
1071 				case RPCRDMA_MTHCAFMR:
1072 					rc = ib_dealloc_fmr(r->r.fmr);
1073 					if (rc)
1074 						dprintk("RPC:       %s:"
1075 							" ib_dealloc_fmr"
1076 							" failed %i\n",
1077 							__func__, rc);
1078 					break;
1079 				case RPCRDMA_MEMWINDOWS_ASYNC:
1080 				case RPCRDMA_MEMWINDOWS:
1081 					rc = ib_dealloc_mw(r->r.mw);
1082 					if (rc)
1083 						dprintk("RPC:       %s:"
1084 							" ib_dealloc_mw"
1085 							" failed %i\n",
1086 							__func__, rc);
1087 					break;
1088 				default:
1089 					break;
1090 				}
1091 			}
1092 			rpcrdma_deregister_internal(ia,
1093 					buf->rb_send_bufs[i]->rl_handle,
1094 					&buf->rb_send_bufs[i]->rl_iov);
1095 			kfree(buf->rb_send_bufs[i]);
1096 		}
1097 	}
1098 
1099 	kfree(buf->rb_pool);
1100 }
1101 
1102 /*
1103  * Get a set of request/reply buffers.
1104  *
1105  * Reply buffer (if needed) is attached to send buffer upon return.
1106  * Rule:
1107  *    rb_send_index and rb_recv_index MUST always be pointing to the
1108  *    *next* available buffer (non-NULL). They are incremented after
1109  *    removing buffers, and decremented *before* returning them.
1110  */
1111 struct rpcrdma_req *
1112 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1113 {
1114 	struct rpcrdma_req *req;
1115 	unsigned long flags;
1116 
1117 	spin_lock_irqsave(&buffers->rb_lock, flags);
1118 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1119 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1120 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1121 		return ((struct rpcrdma_req *)NULL);
1122 	}
1123 
1124 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1125 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1126 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1127 			__func__,
1128 			buffers->rb_recv_index - buffers->rb_send_index);
1129 		req->rl_reply = NULL;
1130 	} else {
1131 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1132 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1133 	}
1134 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1135 	if (!list_empty(&buffers->rb_mws)) {
1136 		int i = RPCRDMA_MAX_SEGS - 1;
1137 		do {
1138 			struct rpcrdma_mw *r;
1139 			r = list_entry(buffers->rb_mws.next,
1140 					struct rpcrdma_mw, mw_list);
1141 			list_del(&r->mw_list);
1142 			req->rl_segments[i].mr_chunk.rl_mw = r;
1143 		} while (--i >= 0);
1144 	}
1145 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1146 	return req;
1147 }
1148 
1149 /*
1150  * Put request/reply buffers back into pool.
1151  * Pre-decrement counter/array index.
1152  */
1153 void
1154 rpcrdma_buffer_put(struct rpcrdma_req *req)
1155 {
1156 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1157 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1158 	int i;
1159 	unsigned long flags;
1160 
1161 	BUG_ON(req->rl_nchunks != 0);
1162 	spin_lock_irqsave(&buffers->rb_lock, flags);
1163 	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1164 	req->rl_niovs = 0;
1165 	if (req->rl_reply) {
1166 		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1167 		init_waitqueue_head(&req->rl_reply->rr_unbind);
1168 		req->rl_reply->rr_func = NULL;
1169 		req->rl_reply = NULL;
1170 	}
1171 	switch (ia->ri_memreg_strategy) {
1172 	case RPCRDMA_MTHCAFMR:
1173 	case RPCRDMA_MEMWINDOWS_ASYNC:
1174 	case RPCRDMA_MEMWINDOWS:
1175 		/*
1176 		 * Cycle mw's back in reverse order, and "spin" them.
1177 		 * This delays and scrambles reuse as much as possible.
1178 		 */
1179 		i = 1;
1180 		do {
1181 			struct rpcrdma_mw **mw;
1182 			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1183 			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1184 			*mw = NULL;
1185 		} while (++i < RPCRDMA_MAX_SEGS);
1186 		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1187 					&buffers->rb_mws);
1188 		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1189 		break;
1190 	default:
1191 		break;
1192 	}
1193 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1194 }
1195 
1196 /*
1197  * Recover reply buffers from pool.
1198  * This happens when recovering from error conditions.
1199  * Post-increment counter/array index.
1200  */
1201 void
1202 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1203 {
1204 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1205 	unsigned long flags;
1206 
1207 	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1208 		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1209 	spin_lock_irqsave(&buffers->rb_lock, flags);
1210 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1211 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1212 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1213 	}
1214 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1215 }
1216 
1217 /*
1218  * Put reply buffers back into pool when not attached to
1219  * request. This happens in error conditions, and when
1220  * aborting unbinds. Pre-decrement counter/array index.
1221  */
1222 void
1223 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1224 {
1225 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1226 	unsigned long flags;
1227 
1228 	rep->rr_func = NULL;
1229 	spin_lock_irqsave(&buffers->rb_lock, flags);
1230 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1231 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1232 }
1233 
1234 /*
1235  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1236  */
1237 
1238 int
1239 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1240 				struct ib_mr **mrp, struct ib_sge *iov)
1241 {
1242 	struct ib_phys_buf ipb;
1243 	struct ib_mr *mr;
1244 	int rc;
1245 
1246 	/*
1247 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1248 	 */
1249 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1250 			va, len, DMA_BIDIRECTIONAL);
1251 	iov->length = len;
1252 
1253 	if (ia->ri_bind_mem != NULL) {
1254 		*mrp = NULL;
1255 		iov->lkey = ia->ri_bind_mem->lkey;
1256 		return 0;
1257 	}
1258 
1259 	ipb.addr = iov->addr;
1260 	ipb.size = iov->length;
1261 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1262 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1263 
1264 	dprintk("RPC:       %s: phys convert: 0x%llx "
1265 			"registered 0x%llx length %d\n",
1266 			__func__, ipb.addr, iov->addr, len);
1267 
1268 	if (IS_ERR(mr)) {
1269 		*mrp = NULL;
1270 		rc = PTR_ERR(mr);
1271 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1272 	} else {
1273 		*mrp = mr;
1274 		iov->lkey = mr->lkey;
1275 		rc = 0;
1276 	}
1277 
1278 	return rc;
1279 }
1280 
1281 int
1282 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1283 				struct ib_mr *mr, struct ib_sge *iov)
1284 {
1285 	int rc;
1286 
1287 	ib_dma_unmap_single(ia->ri_id->device,
1288 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1289 
1290 	if (NULL == mr)
1291 		return 0;
1292 
1293 	rc = ib_dereg_mr(mr);
1294 	if (rc)
1295 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1296 	return rc;
1297 }
1298 
1299 /*
1300  * Wrappers for chunk registration, shared by read/write chunk code.
1301  */
1302 
1303 static void
1304 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1305 {
1306 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1307 	seg->mr_dmalen = seg->mr_len;
1308 	if (seg->mr_page)
1309 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1310 				seg->mr_page, offset_in_page(seg->mr_offset),
1311 				seg->mr_dmalen, seg->mr_dir);
1312 	else
1313 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1314 				seg->mr_offset,
1315 				seg->mr_dmalen, seg->mr_dir);
1316 }
1317 
1318 static void
1319 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1320 {
1321 	if (seg->mr_page)
1322 		ib_dma_unmap_page(ia->ri_id->device,
1323 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1324 	else
1325 		ib_dma_unmap_single(ia->ri_id->device,
1326 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1327 }
1328 
1329 int
1330 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1331 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1332 {
1333 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1334 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1335 				  IB_ACCESS_REMOTE_READ);
1336 	struct rpcrdma_mr_seg *seg1 = seg;
1337 	int i;
1338 	int rc = 0;
1339 
1340 	switch (ia->ri_memreg_strategy) {
1341 
1342 #if RPCRDMA_PERSISTENT_REGISTRATION
1343 	case RPCRDMA_ALLPHYSICAL:
1344 		rpcrdma_map_one(ia, seg, writing);
1345 		seg->mr_rkey = ia->ri_bind_mem->rkey;
1346 		seg->mr_base = seg->mr_dma;
1347 		seg->mr_nsegs = 1;
1348 		nsegs = 1;
1349 		break;
1350 #endif
1351 
1352 	/* Registration using fast memory registration */
1353 	case RPCRDMA_MTHCAFMR:
1354 		{
1355 		u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1356 		int len, pageoff = offset_in_page(seg->mr_offset);
1357 		seg1->mr_offset -= pageoff;	/* start of page */
1358 		seg1->mr_len += pageoff;
1359 		len = -pageoff;
1360 		if (nsegs > RPCRDMA_MAX_DATA_SEGS)
1361 			nsegs = RPCRDMA_MAX_DATA_SEGS;
1362 		for (i = 0; i < nsegs;) {
1363 			rpcrdma_map_one(ia, seg, writing);
1364 			physaddrs[i] = seg->mr_dma;
1365 			len += seg->mr_len;
1366 			++seg;
1367 			++i;
1368 			/* Check for holes */
1369 			if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
1370 			    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1371 				break;
1372 		}
1373 		nsegs = i;
1374 		rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1375 					physaddrs, nsegs, seg1->mr_dma);
1376 		if (rc) {
1377 			dprintk("RPC:       %s: failed ib_map_phys_fmr "
1378 				"%u@0x%llx+%i (%d)... status %i\n", __func__,
1379 				len, (unsigned long long)seg1->mr_dma,
1380 				pageoff, nsegs, rc);
1381 			while (nsegs--)
1382 				rpcrdma_unmap_one(ia, --seg);
1383 		} else {
1384 			seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1385 			seg1->mr_base = seg1->mr_dma + pageoff;
1386 			seg1->mr_nsegs = nsegs;
1387 			seg1->mr_len = len;
1388 		}
1389 		}
1390 		break;
1391 
1392 	/* Registration using memory windows */
1393 	case RPCRDMA_MEMWINDOWS_ASYNC:
1394 	case RPCRDMA_MEMWINDOWS:
1395 		{
1396 		struct ib_mw_bind param;
1397 		rpcrdma_map_one(ia, seg, writing);
1398 		param.mr = ia->ri_bind_mem;
1399 		param.wr_id = 0ULL;	/* no send cookie */
1400 		param.addr = seg->mr_dma;
1401 		param.length = seg->mr_len;
1402 		param.send_flags = 0;
1403 		param.mw_access_flags = mem_priv;
1404 
1405 		DECR_CQCOUNT(&r_xprt->rx_ep);
1406 		rc = ib_bind_mw(ia->ri_id->qp,
1407 					seg->mr_chunk.rl_mw->r.mw, &param);
1408 		if (rc) {
1409 			dprintk("RPC:       %s: failed ib_bind_mw "
1410 				"%u@0x%llx status %i\n",
1411 				__func__, seg->mr_len,
1412 				(unsigned long long)seg->mr_dma, rc);
1413 			rpcrdma_unmap_one(ia, seg);
1414 		} else {
1415 			seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1416 			seg->mr_base = param.addr;
1417 			seg->mr_nsegs = 1;
1418 			nsegs = 1;
1419 		}
1420 		}
1421 		break;
1422 
1423 	/* Default registration each time */
1424 	default:
1425 		{
1426 		struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1427 		int len = 0;
1428 		if (nsegs > RPCRDMA_MAX_DATA_SEGS)
1429 			nsegs = RPCRDMA_MAX_DATA_SEGS;
1430 		for (i = 0; i < nsegs;) {
1431 			rpcrdma_map_one(ia, seg, writing);
1432 			ipb[i].addr = seg->mr_dma;
1433 			ipb[i].size = seg->mr_len;
1434 			len += seg->mr_len;
1435 			++seg;
1436 			++i;
1437 			/* Check for holes */
1438 			if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
1439 			    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1440 				break;
1441 		}
1442 		nsegs = i;
1443 		seg1->mr_base = seg1->mr_dma;
1444 		seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1445 					ipb, nsegs, mem_priv, &seg1->mr_base);
1446 		if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1447 			rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1448 			dprintk("RPC:       %s: failed ib_reg_phys_mr "
1449 				"%u@0x%llx (%d)... status %i\n",
1450 				__func__, len,
1451 				(unsigned long long)seg1->mr_dma, nsegs, rc);
1452 			while (nsegs--)
1453 				rpcrdma_unmap_one(ia, --seg);
1454 		} else {
1455 			seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1456 			seg1->mr_nsegs = nsegs;
1457 			seg1->mr_len = len;
1458 		}
1459 		}
1460 		break;
1461 	}
1462 	if (rc)
1463 		return -1;
1464 
1465 	return nsegs;
1466 }
1467 
1468 int
1469 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1470 		struct rpcrdma_xprt *r_xprt, void *r)
1471 {
1472 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1473 	struct rpcrdma_mr_seg *seg1 = seg;
1474 	int nsegs = seg->mr_nsegs, rc;
1475 
1476 	switch (ia->ri_memreg_strategy) {
1477 
1478 #if RPCRDMA_PERSISTENT_REGISTRATION
1479 	case RPCRDMA_ALLPHYSICAL:
1480 		BUG_ON(nsegs != 1);
1481 		rpcrdma_unmap_one(ia, seg);
1482 		rc = 0;
1483 		break;
1484 #endif
1485 
1486 	case RPCRDMA_MTHCAFMR:
1487 		{
1488 		LIST_HEAD(l);
1489 		list_add(&seg->mr_chunk.rl_mw->r.fmr->list, &l);
1490 		rc = ib_unmap_fmr(&l);
1491 		while (seg1->mr_nsegs--)
1492 			rpcrdma_unmap_one(ia, seg++);
1493 		}
1494 		if (rc)
1495 			dprintk("RPC:       %s: failed ib_unmap_fmr,"
1496 				" status %i\n", __func__, rc);
1497 		break;
1498 
1499 	case RPCRDMA_MEMWINDOWS_ASYNC:
1500 	case RPCRDMA_MEMWINDOWS:
1501 		{
1502 		struct ib_mw_bind param;
1503 		BUG_ON(nsegs != 1);
1504 		param.mr = ia->ri_bind_mem;
1505 		param.addr = 0ULL;	/* unbind */
1506 		param.length = 0;
1507 		param.mw_access_flags = 0;
1508 		if (r) {
1509 			param.wr_id = (u64) (unsigned long) r;
1510 			param.send_flags = IB_SEND_SIGNALED;
1511 			INIT_CQCOUNT(&r_xprt->rx_ep);
1512 		} else {
1513 			param.wr_id = 0ULL;
1514 			param.send_flags = 0;
1515 			DECR_CQCOUNT(&r_xprt->rx_ep);
1516 		}
1517 		rc = ib_bind_mw(ia->ri_id->qp,
1518 				seg->mr_chunk.rl_mw->r.mw, &param);
1519 		rpcrdma_unmap_one(ia, seg);
1520 		}
1521 		if (rc)
1522 			dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1523 				" status %i\n", __func__, rc);
1524 		else
1525 			r = NULL;	/* will upcall on completion */
1526 		break;
1527 
1528 	default:
1529 		rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1530 		seg1->mr_chunk.rl_mr = NULL;
1531 		while (seg1->mr_nsegs--)
1532 			rpcrdma_unmap_one(ia, seg++);
1533 		if (rc)
1534 			dprintk("RPC:       %s: failed ib_dereg_mr,"
1535 				" status %i\n", __func__, rc);
1536 		break;
1537 	}
1538 	if (r) {
1539 		struct rpcrdma_rep *rep = r;
1540 		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1541 		rep->rr_func = NULL;
1542 		func(rep);	/* dereg done, callback now */
1543 	}
1544 	return nsegs;
1545 }
1546 
1547 /*
1548  * Prepost any receive buffer, then post send.
1549  *
1550  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1551  */
1552 int
1553 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1554 		struct rpcrdma_ep *ep,
1555 		struct rpcrdma_req *req)
1556 {
1557 	struct ib_send_wr send_wr, *send_wr_fail;
1558 	struct rpcrdma_rep *rep = req->rl_reply;
1559 	int rc;
1560 
1561 	if (rep) {
1562 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1563 		if (rc)
1564 			goto out;
1565 		req->rl_reply = NULL;
1566 	}
1567 
1568 	send_wr.next = NULL;
1569 	send_wr.wr_id = 0ULL;	/* no send cookie */
1570 	send_wr.sg_list = req->rl_send_iov;
1571 	send_wr.num_sge = req->rl_niovs;
1572 	send_wr.opcode = IB_WR_SEND;
1573 	send_wr.imm_data = 0;
1574 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1575 		ib_dma_sync_single_for_device(ia->ri_id->device,
1576 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1577 			DMA_TO_DEVICE);
1578 	ib_dma_sync_single_for_device(ia->ri_id->device,
1579 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1580 		DMA_TO_DEVICE);
1581 	ib_dma_sync_single_for_device(ia->ri_id->device,
1582 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1583 		DMA_TO_DEVICE);
1584 
1585 	if (DECR_CQCOUNT(ep) > 0)
1586 		send_wr.send_flags = 0;
1587 	else { /* Provider must take a send completion every now and then */
1588 		INIT_CQCOUNT(ep);
1589 		send_wr.send_flags = IB_SEND_SIGNALED;
1590 	}
1591 
1592 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1593 	if (rc)
1594 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1595 			rc);
1596 out:
1597 	return rc;
1598 }
1599 
1600 /*
1601  * (Re)post a receive buffer.
1602  */
1603 int
1604 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1605 		     struct rpcrdma_ep *ep,
1606 		     struct rpcrdma_rep *rep)
1607 {
1608 	struct ib_recv_wr recv_wr, *recv_wr_fail;
1609 	int rc;
1610 
1611 	recv_wr.next = NULL;
1612 	recv_wr.wr_id = (u64) (unsigned long) rep;
1613 	recv_wr.sg_list = &rep->rr_iov;
1614 	recv_wr.num_sge = 1;
1615 
1616 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1617 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1618 
1619 	DECR_CQCOUNT(ep);
1620 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1621 
1622 	if (rc)
1623 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1624 			rc);
1625 	return rc;
1626 }
1627