xref: /linux/net/sunrpc/xprtrdma/verbs.c (revision 3ce095c16263630dde46d6051854073edaacf3d7)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
55 
56 #include "xprt_rdma.h"
57 
58 /*
59  * Globals/Macros
60  */
61 
62 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
63 # define RPCDBG_FACILITY	RPCDBG_TRANS
64 #endif
65 
66 /*
67  * internal functions
68  */
69 
70 /*
71  * handle replies in tasklet context, using a single, global list
72  * rdma tasklet function -- just turn around and call the func
73  * for all replies on the list
74  */
75 
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
78 
79 static void
80 rpcrdma_run_tasklet(unsigned long data)
81 {
82 	struct rpcrdma_rep *rep;
83 	void (*func)(struct rpcrdma_rep *);
84 	unsigned long flags;
85 
86 	data = data;
87 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88 	while (!list_empty(&rpcrdma_tasklets_g)) {
89 		rep = list_entry(rpcrdma_tasklets_g.next,
90 				 struct rpcrdma_rep, rr_list);
91 		list_del(&rep->rr_list);
92 		func = rep->rr_func;
93 		rep->rr_func = NULL;
94 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
95 
96 		if (func)
97 			func(rep);
98 		else
99 			rpcrdma_recv_buffer_put(rep);
100 
101 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
102 	}
103 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
104 }
105 
106 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
107 
108 static const char * const async_event[] = {
109 	"CQ error",
110 	"QP fatal error",
111 	"QP request error",
112 	"QP access error",
113 	"communication established",
114 	"send queue drained",
115 	"path migration successful",
116 	"path mig error",
117 	"device fatal error",
118 	"port active",
119 	"port error",
120 	"LID change",
121 	"P_key change",
122 	"SM change",
123 	"SRQ error",
124 	"SRQ limit reached",
125 	"last WQE reached",
126 	"client reregister",
127 	"GID change",
128 };
129 
130 #define ASYNC_MSG(status)					\
131 	((status) < ARRAY_SIZE(async_event) ?			\
132 		async_event[(status)] : "unknown async error")
133 
134 static void
135 rpcrdma_schedule_tasklet(struct list_head *sched_list)
136 {
137 	unsigned long flags;
138 
139 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
140 	list_splice_tail(sched_list, &rpcrdma_tasklets_g);
141 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
142 	tasklet_schedule(&rpcrdma_tasklet_g);
143 }
144 
145 static void
146 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
147 {
148 	struct rpcrdma_ep *ep = context;
149 
150 	pr_err("RPC:       %s: %s on device %s ep %p\n",
151 	       __func__, ASYNC_MSG(event->event),
152 		event->device->name, context);
153 	if (ep->rep_connected == 1) {
154 		ep->rep_connected = -EIO;
155 		rpcrdma_conn_func(ep);
156 		wake_up_all(&ep->rep_connect_wait);
157 	}
158 }
159 
160 static void
161 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
162 {
163 	struct rpcrdma_ep *ep = context;
164 
165 	pr_err("RPC:       %s: %s on device %s ep %p\n",
166 	       __func__, ASYNC_MSG(event->event),
167 		event->device->name, context);
168 	if (ep->rep_connected == 1) {
169 		ep->rep_connected = -EIO;
170 		rpcrdma_conn_func(ep);
171 		wake_up_all(&ep->rep_connect_wait);
172 	}
173 }
174 
175 static const char * const wc_status[] = {
176 	"success",
177 	"local length error",
178 	"local QP operation error",
179 	"local EE context operation error",
180 	"local protection error",
181 	"WR flushed",
182 	"memory management operation error",
183 	"bad response error",
184 	"local access error",
185 	"remote invalid request error",
186 	"remote access error",
187 	"remote operation error",
188 	"transport retry counter exceeded",
189 	"RNR retry counter exceeded",
190 	"local RDD violation error",
191 	"remove invalid RD request",
192 	"operation aborted",
193 	"invalid EE context number",
194 	"invalid EE context state",
195 	"fatal error",
196 	"response timeout error",
197 	"general error",
198 };
199 
200 #define COMPLETION_MSG(status)					\
201 	((status) < ARRAY_SIZE(wc_status) ?			\
202 		wc_status[(status)] : "unexpected completion error")
203 
204 static void
205 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
206 {
207 	/* WARNING: Only wr_id and status are reliable at this point */
208 	if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
209 		if (wc->status != IB_WC_SUCCESS &&
210 		    wc->status != IB_WC_WR_FLUSH_ERR)
211 			pr_err("RPC:       %s: SEND: %s\n",
212 			       __func__, COMPLETION_MSG(wc->status));
213 	} else {
214 		struct rpcrdma_mw *r;
215 
216 		r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
217 		r->mw_sendcompletion(wc);
218 	}
219 }
220 
221 static int
222 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
223 {
224 	struct ib_wc *wcs;
225 	int budget, count, rc;
226 
227 	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
228 	do {
229 		wcs = ep->rep_send_wcs;
230 
231 		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
232 		if (rc <= 0)
233 			return rc;
234 
235 		count = rc;
236 		while (count-- > 0)
237 			rpcrdma_sendcq_process_wc(wcs++);
238 	} while (rc == RPCRDMA_POLLSIZE && --budget);
239 	return 0;
240 }
241 
242 /*
243  * Handle send, fast_reg_mr, and local_inv completions.
244  *
245  * Send events are typically suppressed and thus do not result
246  * in an upcall. Occasionally one is signaled, however. This
247  * prevents the provider's completion queue from wrapping and
248  * losing a completion.
249  */
250 static void
251 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
252 {
253 	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
254 	int rc;
255 
256 	rc = rpcrdma_sendcq_poll(cq, ep);
257 	if (rc) {
258 		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
259 			__func__, rc);
260 		return;
261 	}
262 
263 	rc = ib_req_notify_cq(cq,
264 			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
265 	if (rc == 0)
266 		return;
267 	if (rc < 0) {
268 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
269 			__func__, rc);
270 		return;
271 	}
272 
273 	rpcrdma_sendcq_poll(cq, ep);
274 }
275 
276 static void
277 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
278 {
279 	struct rpcrdma_rep *rep =
280 			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
281 
282 	/* WARNING: Only wr_id and status are reliable at this point */
283 	if (wc->status != IB_WC_SUCCESS)
284 		goto out_fail;
285 
286 	/* status == SUCCESS means all fields in wc are trustworthy */
287 	if (wc->opcode != IB_WC_RECV)
288 		return;
289 
290 	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
291 		__func__, rep, wc->byte_len);
292 
293 	rep->rr_len = wc->byte_len;
294 	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
295 				   rdmab_addr(rep->rr_rdmabuf),
296 				   rep->rr_len, DMA_FROM_DEVICE);
297 	prefetch(rdmab_to_msg(rep->rr_rdmabuf));
298 
299 out_schedule:
300 	list_add_tail(&rep->rr_list, sched_list);
301 	return;
302 out_fail:
303 	if (wc->status != IB_WC_WR_FLUSH_ERR)
304 		pr_err("RPC:       %s: rep %p: %s\n",
305 		       __func__, rep, COMPLETION_MSG(wc->status));
306 	rep->rr_len = ~0U;
307 	goto out_schedule;
308 }
309 
310 static int
311 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
312 {
313 	struct list_head sched_list;
314 	struct ib_wc *wcs;
315 	int budget, count, rc;
316 
317 	INIT_LIST_HEAD(&sched_list);
318 	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
319 	do {
320 		wcs = ep->rep_recv_wcs;
321 
322 		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
323 		if (rc <= 0)
324 			goto out_schedule;
325 
326 		count = rc;
327 		while (count-- > 0)
328 			rpcrdma_recvcq_process_wc(wcs++, &sched_list);
329 	} while (rc == RPCRDMA_POLLSIZE && --budget);
330 	rc = 0;
331 
332 out_schedule:
333 	rpcrdma_schedule_tasklet(&sched_list);
334 	return rc;
335 }
336 
337 /*
338  * Handle receive completions.
339  *
340  * It is reentrant but processes single events in order to maintain
341  * ordering of receives to keep server credits.
342  *
343  * It is the responsibility of the scheduled tasklet to return
344  * recv buffers to the pool. NOTE: this affects synchronization of
345  * connection shutdown. That is, the structures required for
346  * the completion of the reply handler must remain intact until
347  * all memory has been reclaimed.
348  */
349 static void
350 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
351 {
352 	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
353 	int rc;
354 
355 	rc = rpcrdma_recvcq_poll(cq, ep);
356 	if (rc) {
357 		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
358 			__func__, rc);
359 		return;
360 	}
361 
362 	rc = ib_req_notify_cq(cq,
363 			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
364 	if (rc == 0)
365 		return;
366 	if (rc < 0) {
367 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
368 			__func__, rc);
369 		return;
370 	}
371 
372 	rpcrdma_recvcq_poll(cq, ep);
373 }
374 
375 static void
376 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
377 {
378 	struct ib_wc wc;
379 	LIST_HEAD(sched_list);
380 
381 	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
382 		rpcrdma_recvcq_process_wc(&wc, &sched_list);
383 	if (!list_empty(&sched_list))
384 		rpcrdma_schedule_tasklet(&sched_list);
385 	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
386 		rpcrdma_sendcq_process_wc(&wc);
387 }
388 
389 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
390 static const char * const conn[] = {
391 	"address resolved",
392 	"address error",
393 	"route resolved",
394 	"route error",
395 	"connect request",
396 	"connect response",
397 	"connect error",
398 	"unreachable",
399 	"rejected",
400 	"established",
401 	"disconnected",
402 	"device removal",
403 	"multicast join",
404 	"multicast error",
405 	"address change",
406 	"timewait exit",
407 };
408 
409 #define CONNECTION_MSG(status)						\
410 	((status) < ARRAY_SIZE(conn) ?					\
411 		conn[(status)] : "unrecognized connection error")
412 #endif
413 
414 static int
415 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
416 {
417 	struct rpcrdma_xprt *xprt = id->context;
418 	struct rpcrdma_ia *ia = &xprt->rx_ia;
419 	struct rpcrdma_ep *ep = &xprt->rx_ep;
420 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
421 	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
422 #endif
423 	struct ib_qp_attr *attr = &ia->ri_qp_attr;
424 	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
425 	int connstate = 0;
426 
427 	switch (event->event) {
428 	case RDMA_CM_EVENT_ADDR_RESOLVED:
429 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
430 		ia->ri_async_rc = 0;
431 		complete(&ia->ri_done);
432 		break;
433 	case RDMA_CM_EVENT_ADDR_ERROR:
434 		ia->ri_async_rc = -EHOSTUNREACH;
435 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
436 			__func__, ep);
437 		complete(&ia->ri_done);
438 		break;
439 	case RDMA_CM_EVENT_ROUTE_ERROR:
440 		ia->ri_async_rc = -ENETUNREACH;
441 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
442 			__func__, ep);
443 		complete(&ia->ri_done);
444 		break;
445 	case RDMA_CM_EVENT_ESTABLISHED:
446 		connstate = 1;
447 		ib_query_qp(ia->ri_id->qp, attr,
448 			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
449 			    iattr);
450 		dprintk("RPC:       %s: %d responder resources"
451 			" (%d initiator)\n",
452 			__func__, attr->max_dest_rd_atomic,
453 			attr->max_rd_atomic);
454 		goto connected;
455 	case RDMA_CM_EVENT_CONNECT_ERROR:
456 		connstate = -ENOTCONN;
457 		goto connected;
458 	case RDMA_CM_EVENT_UNREACHABLE:
459 		connstate = -ENETDOWN;
460 		goto connected;
461 	case RDMA_CM_EVENT_REJECTED:
462 		connstate = -ECONNREFUSED;
463 		goto connected;
464 	case RDMA_CM_EVENT_DISCONNECTED:
465 		connstate = -ECONNABORTED;
466 		goto connected;
467 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
468 		connstate = -ENODEV;
469 connected:
470 		dprintk("RPC:       %s: %sconnected\n",
471 					__func__, connstate > 0 ? "" : "dis");
472 		ep->rep_connected = connstate;
473 		rpcrdma_conn_func(ep);
474 		wake_up_all(&ep->rep_connect_wait);
475 		/*FALLTHROUGH*/
476 	default:
477 		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
478 			__func__, sap, rpc_get_port(sap), ep,
479 			CONNECTION_MSG(event->event));
480 		break;
481 	}
482 
483 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
484 	if (connstate == 1) {
485 		int ird = attr->max_dest_rd_atomic;
486 		int tird = ep->rep_remote_cma.responder_resources;
487 
488 		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
489 			sap, rpc_get_port(sap),
490 			ia->ri_id->device->name,
491 			ia->ri_ops->ro_displayname,
492 			xprt->rx_buf.rb_max_requests,
493 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
494 	} else if (connstate < 0) {
495 		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
496 			sap, rpc_get_port(sap), connstate);
497 	}
498 #endif
499 
500 	return 0;
501 }
502 
503 static struct rdma_cm_id *
504 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
505 			struct rpcrdma_ia *ia, struct sockaddr *addr)
506 {
507 	struct rdma_cm_id *id;
508 	int rc;
509 
510 	init_completion(&ia->ri_done);
511 
512 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
513 	if (IS_ERR(id)) {
514 		rc = PTR_ERR(id);
515 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
516 			__func__, rc);
517 		return id;
518 	}
519 
520 	ia->ri_async_rc = -ETIMEDOUT;
521 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
522 	if (rc) {
523 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
524 			__func__, rc);
525 		goto out;
526 	}
527 	wait_for_completion_interruptible_timeout(&ia->ri_done,
528 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
529 	rc = ia->ri_async_rc;
530 	if (rc)
531 		goto out;
532 
533 	ia->ri_async_rc = -ETIMEDOUT;
534 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
535 	if (rc) {
536 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
537 			__func__, rc);
538 		goto out;
539 	}
540 	wait_for_completion_interruptible_timeout(&ia->ri_done,
541 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
542 	rc = ia->ri_async_rc;
543 	if (rc)
544 		goto out;
545 
546 	return id;
547 
548 out:
549 	rdma_destroy_id(id);
550 	return ERR_PTR(rc);
551 }
552 
553 /*
554  * Drain any cq, prior to teardown.
555  */
556 static void
557 rpcrdma_clean_cq(struct ib_cq *cq)
558 {
559 	struct ib_wc wc;
560 	int count = 0;
561 
562 	while (1 == ib_poll_cq(cq, 1, &wc))
563 		++count;
564 
565 	if (count)
566 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
567 			__func__, count, wc.opcode);
568 }
569 
570 /*
571  * Exported functions.
572  */
573 
574 /*
575  * Open and initialize an Interface Adapter.
576  *  o initializes fields of struct rpcrdma_ia, including
577  *    interface and provider attributes and protection zone.
578  */
579 int
580 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
581 {
582 	int rc, mem_priv;
583 	struct rpcrdma_ia *ia = &xprt->rx_ia;
584 	struct ib_device_attr *devattr = &ia->ri_devattr;
585 
586 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
587 	if (IS_ERR(ia->ri_id)) {
588 		rc = PTR_ERR(ia->ri_id);
589 		goto out1;
590 	}
591 
592 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
593 	if (IS_ERR(ia->ri_pd)) {
594 		rc = PTR_ERR(ia->ri_pd);
595 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
596 			__func__, rc);
597 		goto out2;
598 	}
599 
600 	rc = ib_query_device(ia->ri_id->device, devattr);
601 	if (rc) {
602 		dprintk("RPC:       %s: ib_query_device failed %d\n",
603 			__func__, rc);
604 		goto out3;
605 	}
606 
607 	if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
608 		ia->ri_have_dma_lkey = 1;
609 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
610 	}
611 
612 	if (memreg == RPCRDMA_FRMR) {
613 		/* Requires both frmr reg and local dma lkey */
614 		if (((devattr->device_cap_flags &
615 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
616 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
617 		      (devattr->max_fast_reg_page_list_len == 0)) {
618 			dprintk("RPC:       %s: FRMR registration "
619 				"not supported by HCA\n", __func__);
620 			memreg = RPCRDMA_MTHCAFMR;
621 		}
622 	}
623 	if (memreg == RPCRDMA_MTHCAFMR) {
624 		if (!ia->ri_id->device->alloc_fmr) {
625 			dprintk("RPC:       %s: MTHCAFMR registration "
626 				"not supported by HCA\n", __func__);
627 			memreg = RPCRDMA_ALLPHYSICAL;
628 		}
629 	}
630 
631 	/*
632 	 * Optionally obtain an underlying physical identity mapping in
633 	 * order to do a memory window-based bind. This base registration
634 	 * is protected from remote access - that is enabled only by binding
635 	 * for the specific bytes targeted during each RPC operation, and
636 	 * revoked after the corresponding completion similar to a storage
637 	 * adapter.
638 	 */
639 	switch (memreg) {
640 	case RPCRDMA_FRMR:
641 		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
642 		break;
643 	case RPCRDMA_ALLPHYSICAL:
644 		ia->ri_ops = &rpcrdma_physical_memreg_ops;
645 		mem_priv = IB_ACCESS_LOCAL_WRITE |
646 				IB_ACCESS_REMOTE_WRITE |
647 				IB_ACCESS_REMOTE_READ;
648 		goto register_setup;
649 	case RPCRDMA_MTHCAFMR:
650 		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
651 		if (ia->ri_have_dma_lkey)
652 			break;
653 		mem_priv = IB_ACCESS_LOCAL_WRITE;
654 	register_setup:
655 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
656 		if (IS_ERR(ia->ri_bind_mem)) {
657 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
658 				"phys register failed with %lX\n",
659 				__func__, PTR_ERR(ia->ri_bind_mem));
660 			rc = -ENOMEM;
661 			goto out3;
662 		}
663 		break;
664 	default:
665 		printk(KERN_ERR "RPC: Unsupported memory "
666 				"registration mode: %d\n", memreg);
667 		rc = -ENOMEM;
668 		goto out3;
669 	}
670 	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
671 		__func__, ia->ri_ops->ro_displayname);
672 
673 	/* Else will do memory reg/dereg for each chunk */
674 	ia->ri_memreg_strategy = memreg;
675 
676 	rwlock_init(&ia->ri_qplock);
677 	return 0;
678 
679 out3:
680 	ib_dealloc_pd(ia->ri_pd);
681 	ia->ri_pd = NULL;
682 out2:
683 	rdma_destroy_id(ia->ri_id);
684 	ia->ri_id = NULL;
685 out1:
686 	return rc;
687 }
688 
689 /*
690  * Clean up/close an IA.
691  *   o if event handles and PD have been initialized, free them.
692  *   o close the IA
693  */
694 void
695 rpcrdma_ia_close(struct rpcrdma_ia *ia)
696 {
697 	int rc;
698 
699 	dprintk("RPC:       %s: entering\n", __func__);
700 	if (ia->ri_bind_mem != NULL) {
701 		rc = ib_dereg_mr(ia->ri_bind_mem);
702 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
703 			__func__, rc);
704 	}
705 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
706 		if (ia->ri_id->qp)
707 			rdma_destroy_qp(ia->ri_id);
708 		rdma_destroy_id(ia->ri_id);
709 		ia->ri_id = NULL;
710 	}
711 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
712 		rc = ib_dealloc_pd(ia->ri_pd);
713 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
714 			__func__, rc);
715 	}
716 }
717 
718 /*
719  * Create unconnected endpoint.
720  */
721 int
722 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
723 				struct rpcrdma_create_data_internal *cdata)
724 {
725 	struct ib_device_attr *devattr = &ia->ri_devattr;
726 	struct ib_cq *sendcq, *recvcq;
727 	int rc, err;
728 
729 	/* check provider's send/recv wr limits */
730 	if (cdata->max_requests > devattr->max_qp_wr)
731 		cdata->max_requests = devattr->max_qp_wr;
732 
733 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
734 	ep->rep_attr.qp_context = ep;
735 	ep->rep_attr.srq = NULL;
736 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
737 	rc = ia->ri_ops->ro_open(ia, ep, cdata);
738 	if (rc)
739 		return rc;
740 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
741 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
742 	ep->rep_attr.cap.max_recv_sge = 1;
743 	ep->rep_attr.cap.max_inline_data = 0;
744 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
745 	ep->rep_attr.qp_type = IB_QPT_RC;
746 	ep->rep_attr.port_num = ~0;
747 
748 	if (cdata->padding) {
749 		ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
750 						      GFP_KERNEL);
751 		if (IS_ERR(ep->rep_padbuf))
752 			return PTR_ERR(ep->rep_padbuf);
753 	} else
754 		ep->rep_padbuf = NULL;
755 
756 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
757 		"iovs: send %d recv %d\n",
758 		__func__,
759 		ep->rep_attr.cap.max_send_wr,
760 		ep->rep_attr.cap.max_recv_wr,
761 		ep->rep_attr.cap.max_send_sge,
762 		ep->rep_attr.cap.max_recv_sge);
763 
764 	/* set trigger for requesting send completion */
765 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
766 	if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
767 		ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
768 	else if (ep->rep_cqinit <= 2)
769 		ep->rep_cqinit = 0;
770 	INIT_CQCOUNT(ep);
771 	init_waitqueue_head(&ep->rep_connect_wait);
772 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
773 
774 	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
775 				  rpcrdma_cq_async_error_upcall, ep,
776 				  ep->rep_attr.cap.max_send_wr + 1, 0);
777 	if (IS_ERR(sendcq)) {
778 		rc = PTR_ERR(sendcq);
779 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
780 			__func__, rc);
781 		goto out1;
782 	}
783 
784 	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
785 	if (rc) {
786 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
787 			__func__, rc);
788 		goto out2;
789 	}
790 
791 	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
792 				  rpcrdma_cq_async_error_upcall, ep,
793 				  ep->rep_attr.cap.max_recv_wr + 1, 0);
794 	if (IS_ERR(recvcq)) {
795 		rc = PTR_ERR(recvcq);
796 		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
797 			__func__, rc);
798 		goto out2;
799 	}
800 
801 	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
802 	if (rc) {
803 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
804 			__func__, rc);
805 		ib_destroy_cq(recvcq);
806 		goto out2;
807 	}
808 
809 	ep->rep_attr.send_cq = sendcq;
810 	ep->rep_attr.recv_cq = recvcq;
811 
812 	/* Initialize cma parameters */
813 
814 	/* RPC/RDMA does not use private data */
815 	ep->rep_remote_cma.private_data = NULL;
816 	ep->rep_remote_cma.private_data_len = 0;
817 
818 	/* Client offers RDMA Read but does not initiate */
819 	ep->rep_remote_cma.initiator_depth = 0;
820 	if (devattr->max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
821 		ep->rep_remote_cma.responder_resources = 32;
822 	else
823 		ep->rep_remote_cma.responder_resources =
824 						devattr->max_qp_rd_atom;
825 
826 	ep->rep_remote_cma.retry_count = 7;
827 	ep->rep_remote_cma.flow_control = 0;
828 	ep->rep_remote_cma.rnr_retry_count = 0;
829 
830 	return 0;
831 
832 out2:
833 	err = ib_destroy_cq(sendcq);
834 	if (err)
835 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
836 			__func__, err);
837 out1:
838 	rpcrdma_free_regbuf(ia, ep->rep_padbuf);
839 	return rc;
840 }
841 
842 /*
843  * rpcrdma_ep_destroy
844  *
845  * Disconnect and destroy endpoint. After this, the only
846  * valid operations on the ep are to free it (if dynamically
847  * allocated) or re-create it.
848  */
849 void
850 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
851 {
852 	int rc;
853 
854 	dprintk("RPC:       %s: entering, connected is %d\n",
855 		__func__, ep->rep_connected);
856 
857 	cancel_delayed_work_sync(&ep->rep_connect_worker);
858 
859 	if (ia->ri_id->qp) {
860 		rpcrdma_ep_disconnect(ep, ia);
861 		rdma_destroy_qp(ia->ri_id);
862 		ia->ri_id->qp = NULL;
863 	}
864 
865 	rpcrdma_free_regbuf(ia, ep->rep_padbuf);
866 
867 	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
868 	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
869 	if (rc)
870 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
871 			__func__, rc);
872 
873 	rpcrdma_clean_cq(ep->rep_attr.send_cq);
874 	rc = ib_destroy_cq(ep->rep_attr.send_cq);
875 	if (rc)
876 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
877 			__func__, rc);
878 }
879 
880 /*
881  * Connect unconnected endpoint.
882  */
883 int
884 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
885 {
886 	struct rdma_cm_id *id, *old;
887 	int rc = 0;
888 	int retry_count = 0;
889 
890 	if (ep->rep_connected != 0) {
891 		struct rpcrdma_xprt *xprt;
892 retry:
893 		dprintk("RPC:       %s: reconnecting...\n", __func__);
894 
895 		rpcrdma_ep_disconnect(ep, ia);
896 		rpcrdma_flush_cqs(ep);
897 
898 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
899 		ia->ri_ops->ro_reset(xprt);
900 
901 		id = rpcrdma_create_id(xprt, ia,
902 				(struct sockaddr *)&xprt->rx_data.addr);
903 		if (IS_ERR(id)) {
904 			rc = -EHOSTUNREACH;
905 			goto out;
906 		}
907 		/* TEMP TEMP TEMP - fail if new device:
908 		 * Deregister/remarshal *all* requests!
909 		 * Close and recreate adapter, pd, etc!
910 		 * Re-determine all attributes still sane!
911 		 * More stuff I haven't thought of!
912 		 * Rrrgh!
913 		 */
914 		if (ia->ri_id->device != id->device) {
915 			printk("RPC:       %s: can't reconnect on "
916 				"different device!\n", __func__);
917 			rdma_destroy_id(id);
918 			rc = -ENETUNREACH;
919 			goto out;
920 		}
921 		/* END TEMP */
922 		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
923 		if (rc) {
924 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
925 				__func__, rc);
926 			rdma_destroy_id(id);
927 			rc = -ENETUNREACH;
928 			goto out;
929 		}
930 
931 		write_lock(&ia->ri_qplock);
932 		old = ia->ri_id;
933 		ia->ri_id = id;
934 		write_unlock(&ia->ri_qplock);
935 
936 		rdma_destroy_qp(old);
937 		rdma_destroy_id(old);
938 	} else {
939 		dprintk("RPC:       %s: connecting...\n", __func__);
940 		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
941 		if (rc) {
942 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
943 				__func__, rc);
944 			/* do not update ep->rep_connected */
945 			return -ENETUNREACH;
946 		}
947 	}
948 
949 	ep->rep_connected = 0;
950 
951 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
952 	if (rc) {
953 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
954 				__func__, rc);
955 		goto out;
956 	}
957 
958 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
959 
960 	/*
961 	 * Check state. A non-peer reject indicates no listener
962 	 * (ECONNREFUSED), which may be a transient state. All
963 	 * others indicate a transport condition which has already
964 	 * undergone a best-effort.
965 	 */
966 	if (ep->rep_connected == -ECONNREFUSED &&
967 	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
968 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
969 		goto retry;
970 	}
971 	if (ep->rep_connected <= 0) {
972 		/* Sometimes, the only way to reliably connect to remote
973 		 * CMs is to use same nonzero values for ORD and IRD. */
974 		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
975 		    (ep->rep_remote_cma.responder_resources == 0 ||
976 		     ep->rep_remote_cma.initiator_depth !=
977 				ep->rep_remote_cma.responder_resources)) {
978 			if (ep->rep_remote_cma.responder_resources == 0)
979 				ep->rep_remote_cma.responder_resources = 1;
980 			ep->rep_remote_cma.initiator_depth =
981 				ep->rep_remote_cma.responder_resources;
982 			goto retry;
983 		}
984 		rc = ep->rep_connected;
985 	} else {
986 		dprintk("RPC:       %s: connected\n", __func__);
987 	}
988 
989 out:
990 	if (rc)
991 		ep->rep_connected = rc;
992 	return rc;
993 }
994 
995 /*
996  * rpcrdma_ep_disconnect
997  *
998  * This is separate from destroy to facilitate the ability
999  * to reconnect without recreating the endpoint.
1000  *
1001  * This call is not reentrant, and must not be made in parallel
1002  * on the same endpoint.
1003  */
1004 void
1005 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1006 {
1007 	int rc;
1008 
1009 	rpcrdma_flush_cqs(ep);
1010 	rc = rdma_disconnect(ia->ri_id);
1011 	if (!rc) {
1012 		/* returns without wait if not connected */
1013 		wait_event_interruptible(ep->rep_connect_wait,
1014 							ep->rep_connected != 1);
1015 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1016 			(ep->rep_connected == 1) ? "still " : "dis");
1017 	} else {
1018 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1019 		ep->rep_connected = rc;
1020 	}
1021 }
1022 
1023 static struct rpcrdma_req *
1024 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1025 {
1026 	struct rpcrdma_req *req;
1027 
1028 	req = kzalloc(sizeof(*req), GFP_KERNEL);
1029 	if (req == NULL)
1030 		return ERR_PTR(-ENOMEM);
1031 
1032 	req->rl_buffer = &r_xprt->rx_buf;
1033 	return req;
1034 }
1035 
1036 static struct rpcrdma_rep *
1037 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1038 {
1039 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1040 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1041 	struct rpcrdma_rep *rep;
1042 	int rc;
1043 
1044 	rc = -ENOMEM;
1045 	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1046 	if (rep == NULL)
1047 		goto out;
1048 
1049 	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1050 					       GFP_KERNEL);
1051 	if (IS_ERR(rep->rr_rdmabuf)) {
1052 		rc = PTR_ERR(rep->rr_rdmabuf);
1053 		goto out_free;
1054 	}
1055 
1056 	rep->rr_buffer = &r_xprt->rx_buf;
1057 	return rep;
1058 
1059 out_free:
1060 	kfree(rep);
1061 out:
1062 	return ERR_PTR(rc);
1063 }
1064 
1065 int
1066 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1067 {
1068 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1069 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1070 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1071 	char *p;
1072 	size_t len;
1073 	int i, rc;
1074 
1075 	buf->rb_max_requests = cdata->max_requests;
1076 	spin_lock_init(&buf->rb_lock);
1077 
1078 	/* Need to allocate:
1079 	 *   1.  arrays for send and recv pointers
1080 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
1081 	 *   3.  array of struct rpcrdma_rep for replies
1082 	 * Send/recv buffers in req/rep need to be registered
1083 	 */
1084 	len = buf->rb_max_requests *
1085 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1086 
1087 	p = kzalloc(len, GFP_KERNEL);
1088 	if (p == NULL) {
1089 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1090 			__func__, len);
1091 		rc = -ENOMEM;
1092 		goto out;
1093 	}
1094 	buf->rb_pool = p;	/* for freeing it later */
1095 
1096 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1097 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1098 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1099 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1100 
1101 	rc = ia->ri_ops->ro_init(r_xprt);
1102 	if (rc)
1103 		goto out;
1104 
1105 	for (i = 0; i < buf->rb_max_requests; i++) {
1106 		struct rpcrdma_req *req;
1107 		struct rpcrdma_rep *rep;
1108 
1109 		req = rpcrdma_create_req(r_xprt);
1110 		if (IS_ERR(req)) {
1111 			dprintk("RPC:       %s: request buffer %d alloc"
1112 				" failed\n", __func__, i);
1113 			rc = PTR_ERR(req);
1114 			goto out;
1115 		}
1116 		buf->rb_send_bufs[i] = req;
1117 
1118 		rep = rpcrdma_create_rep(r_xprt);
1119 		if (IS_ERR(rep)) {
1120 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1121 				__func__, i);
1122 			rc = PTR_ERR(rep);
1123 			goto out;
1124 		}
1125 		buf->rb_recv_bufs[i] = rep;
1126 	}
1127 
1128 	return 0;
1129 out:
1130 	rpcrdma_buffer_destroy(buf);
1131 	return rc;
1132 }
1133 
1134 static void
1135 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1136 {
1137 	if (!rep)
1138 		return;
1139 
1140 	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1141 	kfree(rep);
1142 }
1143 
1144 static void
1145 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1146 {
1147 	if (!req)
1148 		return;
1149 
1150 	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1151 	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1152 	kfree(req);
1153 }
1154 
1155 void
1156 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1157 {
1158 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1159 	int i;
1160 
1161 	/* clean up in reverse order from create
1162 	 *   1.  recv mr memory (mr free, then kfree)
1163 	 *   2.  send mr memory (mr free, then kfree)
1164 	 *   3.  MWs
1165 	 */
1166 	dprintk("RPC:       %s: entering\n", __func__);
1167 
1168 	for (i = 0; i < buf->rb_max_requests; i++) {
1169 		if (buf->rb_recv_bufs)
1170 			rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1171 		if (buf->rb_send_bufs)
1172 			rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1173 	}
1174 
1175 	ia->ri_ops->ro_destroy(buf);
1176 
1177 	kfree(buf->rb_pool);
1178 }
1179 
1180 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1181  * some req segments uninitialized.
1182  */
1183 static void
1184 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1185 {
1186 	if (*mw) {
1187 		list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1188 		*mw = NULL;
1189 	}
1190 }
1191 
1192 /* Cycle mw's back in reverse order, and "spin" them.
1193  * This delays and scrambles reuse as much as possible.
1194  */
1195 static void
1196 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1197 {
1198 	struct rpcrdma_mr_seg *seg = req->rl_segments;
1199 	struct rpcrdma_mr_seg *seg1 = seg;
1200 	int i;
1201 
1202 	for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1203 		rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1204 	rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1205 }
1206 
1207 static void
1208 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1209 {
1210 	buf->rb_send_bufs[--buf->rb_send_index] = req;
1211 	req->rl_niovs = 0;
1212 	if (req->rl_reply) {
1213 		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1214 		req->rl_reply->rr_func = NULL;
1215 		req->rl_reply = NULL;
1216 	}
1217 }
1218 
1219 /* rpcrdma_unmap_one() was already done during deregistration.
1220  * Redo only the ib_post_send().
1221  */
1222 static void
1223 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1224 {
1225 	struct rpcrdma_xprt *r_xprt =
1226 				container_of(ia, struct rpcrdma_xprt, rx_ia);
1227 	struct ib_send_wr invalidate_wr, *bad_wr;
1228 	int rc;
1229 
1230 	dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1231 
1232 	/* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1233 	r->r.frmr.fr_state = FRMR_IS_INVALID;
1234 
1235 	memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1236 	invalidate_wr.wr_id = (unsigned long)(void *)r;
1237 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1238 	invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1239 	DECR_CQCOUNT(&r_xprt->rx_ep);
1240 
1241 	dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1242 		__func__, r, r->r.frmr.fr_mr->rkey);
1243 
1244 	read_lock(&ia->ri_qplock);
1245 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1246 	read_unlock(&ia->ri_qplock);
1247 	if (rc) {
1248 		/* Force rpcrdma_buffer_get() to retry */
1249 		r->r.frmr.fr_state = FRMR_IS_STALE;
1250 		dprintk("RPC:       %s: ib_post_send failed, %i\n",
1251 			__func__, rc);
1252 	}
1253 }
1254 
1255 static void
1256 rpcrdma_retry_flushed_linv(struct list_head *stale,
1257 			   struct rpcrdma_buffer *buf)
1258 {
1259 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1260 	struct list_head *pos;
1261 	struct rpcrdma_mw *r;
1262 	unsigned long flags;
1263 
1264 	list_for_each(pos, stale) {
1265 		r = list_entry(pos, struct rpcrdma_mw, mw_list);
1266 		rpcrdma_retry_local_inv(r, ia);
1267 	}
1268 
1269 	spin_lock_irqsave(&buf->rb_lock, flags);
1270 	list_splice_tail(stale, &buf->rb_mws);
1271 	spin_unlock_irqrestore(&buf->rb_lock, flags);
1272 }
1273 
1274 static struct rpcrdma_req *
1275 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1276 			 struct list_head *stale)
1277 {
1278 	struct rpcrdma_mw *r;
1279 	int i;
1280 
1281 	i = RPCRDMA_MAX_SEGS - 1;
1282 	while (!list_empty(&buf->rb_mws)) {
1283 		r = list_entry(buf->rb_mws.next,
1284 			       struct rpcrdma_mw, mw_list);
1285 		list_del(&r->mw_list);
1286 		if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1287 			list_add(&r->mw_list, stale);
1288 			continue;
1289 		}
1290 		req->rl_segments[i].rl_mw = r;
1291 		if (unlikely(i-- == 0))
1292 			return req;	/* Success */
1293 	}
1294 
1295 	/* Not enough entries on rb_mws for this req */
1296 	rpcrdma_buffer_put_sendbuf(req, buf);
1297 	rpcrdma_buffer_put_mrs(req, buf);
1298 	return NULL;
1299 }
1300 
1301 static struct rpcrdma_req *
1302 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1303 {
1304 	struct rpcrdma_mw *r;
1305 	int i;
1306 
1307 	i = RPCRDMA_MAX_SEGS - 1;
1308 	while (!list_empty(&buf->rb_mws)) {
1309 		r = list_entry(buf->rb_mws.next,
1310 			       struct rpcrdma_mw, mw_list);
1311 		list_del(&r->mw_list);
1312 		req->rl_segments[i].rl_mw = r;
1313 		if (unlikely(i-- == 0))
1314 			return req;	/* Success */
1315 	}
1316 
1317 	/* Not enough entries on rb_mws for this req */
1318 	rpcrdma_buffer_put_sendbuf(req, buf);
1319 	rpcrdma_buffer_put_mrs(req, buf);
1320 	return NULL;
1321 }
1322 
1323 /*
1324  * Get a set of request/reply buffers.
1325  *
1326  * Reply buffer (if needed) is attached to send buffer upon return.
1327  * Rule:
1328  *    rb_send_index and rb_recv_index MUST always be pointing to the
1329  *    *next* available buffer (non-NULL). They are incremented after
1330  *    removing buffers, and decremented *before* returning them.
1331  */
1332 struct rpcrdma_req *
1333 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1334 {
1335 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1336 	struct list_head stale;
1337 	struct rpcrdma_req *req;
1338 	unsigned long flags;
1339 
1340 	spin_lock_irqsave(&buffers->rb_lock, flags);
1341 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1342 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1343 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1344 		return ((struct rpcrdma_req *)NULL);
1345 	}
1346 
1347 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1348 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1349 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1350 			__func__,
1351 			buffers->rb_recv_index - buffers->rb_send_index);
1352 		req->rl_reply = NULL;
1353 	} else {
1354 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1355 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1356 	}
1357 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1358 
1359 	INIT_LIST_HEAD(&stale);
1360 	switch (ia->ri_memreg_strategy) {
1361 	case RPCRDMA_FRMR:
1362 		req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1363 		break;
1364 	case RPCRDMA_MTHCAFMR:
1365 		req = rpcrdma_buffer_get_fmrs(req, buffers);
1366 		break;
1367 	default:
1368 		break;
1369 	}
1370 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1371 	if (!list_empty(&stale))
1372 		rpcrdma_retry_flushed_linv(&stale, buffers);
1373 	return req;
1374 }
1375 
1376 /*
1377  * Put request/reply buffers back into pool.
1378  * Pre-decrement counter/array index.
1379  */
1380 void
1381 rpcrdma_buffer_put(struct rpcrdma_req *req)
1382 {
1383 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1384 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1385 	unsigned long flags;
1386 
1387 	spin_lock_irqsave(&buffers->rb_lock, flags);
1388 	rpcrdma_buffer_put_sendbuf(req, buffers);
1389 	switch (ia->ri_memreg_strategy) {
1390 	case RPCRDMA_FRMR:
1391 	case RPCRDMA_MTHCAFMR:
1392 		rpcrdma_buffer_put_mrs(req, buffers);
1393 		break;
1394 	default:
1395 		break;
1396 	}
1397 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1398 }
1399 
1400 /*
1401  * Recover reply buffers from pool.
1402  * This happens when recovering from error conditions.
1403  * Post-increment counter/array index.
1404  */
1405 void
1406 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1407 {
1408 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1409 	unsigned long flags;
1410 
1411 	spin_lock_irqsave(&buffers->rb_lock, flags);
1412 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1413 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1414 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1415 	}
1416 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1417 }
1418 
1419 /*
1420  * Put reply buffers back into pool when not attached to
1421  * request. This happens in error conditions.
1422  */
1423 void
1424 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1425 {
1426 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1427 	unsigned long flags;
1428 
1429 	rep->rr_func = NULL;
1430 	spin_lock_irqsave(&buffers->rb_lock, flags);
1431 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1432 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1433 }
1434 
1435 /*
1436  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1437  */
1438 
1439 void
1440 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1441 {
1442 	dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
1443 		seg->mr_offset,
1444 		(unsigned long long)seg->mr_dma, seg->mr_dmalen);
1445 }
1446 
1447 static int
1448 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1449 				struct ib_mr **mrp, struct ib_sge *iov)
1450 {
1451 	struct ib_phys_buf ipb;
1452 	struct ib_mr *mr;
1453 	int rc;
1454 
1455 	/*
1456 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1457 	 */
1458 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1459 			va, len, DMA_BIDIRECTIONAL);
1460 	if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1461 		return -ENOMEM;
1462 
1463 	iov->length = len;
1464 
1465 	if (ia->ri_have_dma_lkey) {
1466 		*mrp = NULL;
1467 		iov->lkey = ia->ri_dma_lkey;
1468 		return 0;
1469 	} else if (ia->ri_bind_mem != NULL) {
1470 		*mrp = NULL;
1471 		iov->lkey = ia->ri_bind_mem->lkey;
1472 		return 0;
1473 	}
1474 
1475 	ipb.addr = iov->addr;
1476 	ipb.size = iov->length;
1477 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1478 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1479 
1480 	dprintk("RPC:       %s: phys convert: 0x%llx "
1481 			"registered 0x%llx length %d\n",
1482 			__func__, (unsigned long long)ipb.addr,
1483 			(unsigned long long)iov->addr, len);
1484 
1485 	if (IS_ERR(mr)) {
1486 		*mrp = NULL;
1487 		rc = PTR_ERR(mr);
1488 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1489 	} else {
1490 		*mrp = mr;
1491 		iov->lkey = mr->lkey;
1492 		rc = 0;
1493 	}
1494 
1495 	return rc;
1496 }
1497 
1498 static int
1499 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1500 				struct ib_mr *mr, struct ib_sge *iov)
1501 {
1502 	int rc;
1503 
1504 	ib_dma_unmap_single(ia->ri_id->device,
1505 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1506 
1507 	if (NULL == mr)
1508 		return 0;
1509 
1510 	rc = ib_dereg_mr(mr);
1511 	if (rc)
1512 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1513 	return rc;
1514 }
1515 
1516 /**
1517  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1518  * @ia: controlling rpcrdma_ia
1519  * @size: size of buffer to be allocated, in bytes
1520  * @flags: GFP flags
1521  *
1522  * Returns pointer to private header of an area of internally
1523  * registered memory, or an ERR_PTR. The registered buffer follows
1524  * the end of the private header.
1525  *
1526  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1527  * receiving the payload of RDMA RECV operations. regbufs are not
1528  * used for RDMA READ/WRITE operations, thus are registered only for
1529  * LOCAL access.
1530  */
1531 struct rpcrdma_regbuf *
1532 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1533 {
1534 	struct rpcrdma_regbuf *rb;
1535 	int rc;
1536 
1537 	rc = -ENOMEM;
1538 	rb = kmalloc(sizeof(*rb) + size, flags);
1539 	if (rb == NULL)
1540 		goto out;
1541 
1542 	rb->rg_size = size;
1543 	rb->rg_owner = NULL;
1544 	rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1545 				       &rb->rg_mr, &rb->rg_iov);
1546 	if (rc)
1547 		goto out_free;
1548 
1549 	return rb;
1550 
1551 out_free:
1552 	kfree(rb);
1553 out:
1554 	return ERR_PTR(rc);
1555 }
1556 
1557 /**
1558  * rpcrdma_free_regbuf - deregister and free registered buffer
1559  * @ia: controlling rpcrdma_ia
1560  * @rb: regbuf to be deregistered and freed
1561  */
1562 void
1563 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1564 {
1565 	if (rb) {
1566 		rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1567 		kfree(rb);
1568 	}
1569 }
1570 
1571 /*
1572  * Prepost any receive buffer, then post send.
1573  *
1574  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1575  */
1576 int
1577 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1578 		struct rpcrdma_ep *ep,
1579 		struct rpcrdma_req *req)
1580 {
1581 	struct ib_send_wr send_wr, *send_wr_fail;
1582 	struct rpcrdma_rep *rep = req->rl_reply;
1583 	int rc;
1584 
1585 	if (rep) {
1586 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1587 		if (rc)
1588 			goto out;
1589 		req->rl_reply = NULL;
1590 	}
1591 
1592 	send_wr.next = NULL;
1593 	send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
1594 	send_wr.sg_list = req->rl_send_iov;
1595 	send_wr.num_sge = req->rl_niovs;
1596 	send_wr.opcode = IB_WR_SEND;
1597 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1598 		ib_dma_sync_single_for_device(ia->ri_id->device,
1599 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1600 			DMA_TO_DEVICE);
1601 	ib_dma_sync_single_for_device(ia->ri_id->device,
1602 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1603 		DMA_TO_DEVICE);
1604 	ib_dma_sync_single_for_device(ia->ri_id->device,
1605 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1606 		DMA_TO_DEVICE);
1607 
1608 	if (DECR_CQCOUNT(ep) > 0)
1609 		send_wr.send_flags = 0;
1610 	else { /* Provider must take a send completion every now and then */
1611 		INIT_CQCOUNT(ep);
1612 		send_wr.send_flags = IB_SEND_SIGNALED;
1613 	}
1614 
1615 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1616 	if (rc)
1617 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1618 			rc);
1619 out:
1620 	return rc;
1621 }
1622 
1623 /*
1624  * (Re)post a receive buffer.
1625  */
1626 int
1627 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1628 		     struct rpcrdma_ep *ep,
1629 		     struct rpcrdma_rep *rep)
1630 {
1631 	struct ib_recv_wr recv_wr, *recv_wr_fail;
1632 	int rc;
1633 
1634 	recv_wr.next = NULL;
1635 	recv_wr.wr_id = (u64) (unsigned long) rep;
1636 	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1637 	recv_wr.num_sge = 1;
1638 
1639 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1640 				   rdmab_addr(rep->rr_rdmabuf),
1641 				   rdmab_length(rep->rr_rdmabuf),
1642 				   DMA_BIDIRECTIONAL);
1643 
1644 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1645 
1646 	if (rc)
1647 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1648 			rc);
1649 	return rc;
1650 }
1651 
1652 /* How many chunk list items fit within our inline buffers?
1653  */
1654 unsigned int
1655 rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
1656 {
1657 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1658 	int bytes, segments;
1659 
1660 	bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
1661 	bytes -= RPCRDMA_HDRLEN_MIN;
1662 	if (bytes < sizeof(struct rpcrdma_segment) * 2) {
1663 		pr_warn("RPC:       %s: inline threshold too small\n",
1664 			__func__);
1665 		return 0;
1666 	}
1667 
1668 	segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
1669 	dprintk("RPC:       %s: max chunk list size = %d segments\n",
1670 		__func__, segments);
1671 	return segments;
1672 }
1673