xref: /freebsd/sys/ofed/drivers/infiniband/ulp/sdp/sdp_zcopy.c (revision 51015e6d0f570239b0c2088dc6cf2b018928375d)
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3  *
4  * Copyright (c) 2006 Mellanox Technologies Ltd.  All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the
10  * OpenIB.org BSD license below:
11  *
12  *     Redistribution and use in source and binary forms, with or
13  *     without modification, are permitted provided that the following
14  *     conditions are met:
15  *
16  *      - Redistributions of source code must retain the above
17  *        copyright notice, this list of conditions and the following
18  *        disclaimer.
19  *
20  *      - Redistributions in binary form must reproduce the above
21  *        copyright notice, this list of conditions and the following
22  *        disclaimer in the documentation and/or other materials
23  *        provided with the distribution.
24  *
25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32  * SOFTWARE.
33  */
34 #include <linux/tcp.h>
35 #include <asm/ioctls.h>
36 #include <linux/workqueue.h>
37 #include <linux/net.h>
38 #include <linux/socket.h>
39 #include <net/protocol.h>
40 #include <net/inet_common.h>
41 #include <rdma/rdma_cm.h>
42 #include <rdma/ib_verbs.h>
43 #include <rdma/ib_fmr_pool.h>
44 #include <rdma/ib_umem.h>
45 #include <net/tcp.h> /* for memcpy_toiovec */
46 #include <asm/io.h>
47 #include <asm/uaccess.h>
48 #include <linux/delay.h>
49 #include "sdp.h"
50 
51 static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa)
52 {
53 	struct sdp_sock *ssk = sdp_sk(sk);
54 	struct mbuf *mb;
55 	int payload_len;
56 	struct page *payload_pg;
57 	int off, len;
58 	struct ib_umem_chunk *chunk;
59 
60 	WARN_ON(ssk->tx_sa);
61 
62 	BUG_ON(!tx_sa);
63 	BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
64 	BUG_ON(!tx_sa->umem);
65 	BUG_ON(!tx_sa->umem->chunk_list.next);
66 
67 	chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
68 	BUG_ON(!chunk->nmap);
69 
70 	off = tx_sa->umem->offset;
71 	len = tx_sa->umem->length;
72 
73 	tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
74 
75 	mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0);
76 	if (!mb) {
77 		return -ENOMEM;
78 	}
79 	sdp_dbg_data(sk, "sending SrcAvail\n");
80 
81 	TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb
82 					 * but continue to live after mb is freed */
83 	ssk->tx_sa = tx_sa;
84 
85 	/* must have payload inlined in SrcAvail packet in combined mode */
86 	payload_len = MIN(tx_sa->umem->page_size - off, len);
87 	payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah));
88 	payload_pg  = sg_page(&chunk->page_list[0]);
89 	get_page(payload_pg);
90 
91 	sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
92 		off, payload_pg, payload_len);
93 
94 	mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags,
95 			payload_pg, off, payload_len);
96 
97 	mb->len             += payload_len;
98 	mb->data_len         = payload_len;
99 	mb->truesize        += payload_len;
100 //	sk->sk_wmem_queued   += payload_len;
101 //	sk->sk_forward_alloc -= payload_len;
102 
103 	mb_entail(sk, ssk, mb);
104 
105 	ssk->write_seq += payload_len;
106 	SDP_SKB_CB(mb)->end_seq += payload_len;
107 
108 	tx_sa->bytes_sent = tx_sa->umem->length;
109 	tx_sa->bytes_acked = payload_len;
110 
111 	/* TODO: pushing the mb into the tx_queue should be enough */
112 
113 	return 0;
114 }
115 
116 static int sdp_post_srcavail_cancel(struct socket *sk)
117 {
118 	struct sdp_sock *ssk = sdp_sk(sk);
119 	struct mbuf *mb;
120 
121 	sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n");
122 
123 	mb = sdp_alloc_mb_srcavail_cancel(sk, 0);
124 	mb_entail(sk, ssk, mb);
125 
126 	sdp_post_sends(ssk, 0);
127 
128 	schedule_delayed_work(&ssk->srcavail_cancel_work,
129 			SDP_SRCAVAIL_CANCEL_TIMEOUT);
130 
131 	return 0;
132 }
133 
134 void srcavail_cancel_timeout(struct work_struct *work)
135 {
136 	struct sdp_sock *ssk =
137 		container_of(work, struct sdp_sock, srcavail_cancel_work.work);
138 	struct socket *sk = ssk->socket;
139 
140 	lock_sock(sk);
141 
142 	sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
143 			" closing connection\n");
144 	sdp_set_error(sk, -ECONNRESET);
145 	wake_up(&ssk->wq);
146 
147 	release_sock(sk);
148 }
149 
150 static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
151 		int ignore_signals)
152 {
153 	struct socket *sk = ssk->socket;
154 	int err = 0;
155 	long vm_wait = 0;
156 	long current_timeo = *timeo_p;
157 	struct tx_srcavail_state *tx_sa = ssk->tx_sa;
158 	DEFINE_WAIT(wait);
159 
160 	sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p);
161 	sdp_prf1(sk, NULL, "Going to sleep");
162 	while (ssk->qp_active) {
163 		prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
164 
165 		if (unlikely(!*timeo_p)) {
166 			err = -ETIME;
167 			tx_sa->abort_flags |= TX_SA_TIMEDOUT;
168 			sdp_prf1(sk, NULL, "timeout");
169 			SDPSTATS_COUNTER_INC(zcopy_tx_timeout);
170 			break;
171 		}
172 
173 		else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
174 			err = -EINVAL;
175 			sdp_dbg_data(sk, "acked bytes > sent bytes\n");
176 			tx_sa->abort_flags |= TX_SA_ERROR;
177 			break;
178 		}
179 
180 		if (tx_sa->abort_flags & TX_SA_SENDSM) {
181 			sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
182 			SDPSTATS_COUNTER_INC(zcopy_tx_aborted);
183 			err = -EAGAIN;
184 			break ;
185 		}
186 
187 		if (!ignore_signals) {
188 			if (signal_pending(current)) {
189 				err = -EINTR;
190 				sdp_prf1(sk, NULL, "signalled");
191 				tx_sa->abort_flags |= TX_SA_INTRRUPTED;
192 				break;
193 			}
194 
195 			if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) {
196 				sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n");
197 				tx_sa->abort_flags |= TX_SA_CROSS_SEND;
198 				SDPSTATS_COUNTER_INC(zcopy_cross_send);
199 				err = -ETIME;
200 				break ;
201 			}
202 		}
203 
204 		posts_handler_put(ssk);
205 
206 		sk_wait_event(sk, &current_timeo,
207 				tx_sa->abort_flags &&
208 				ssk->rx_sa &&
209 				(tx_sa->bytes_acked < tx_sa->bytes_sent) &&
210 				vm_wait);
211 		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
212 
213 		posts_handler_get(ssk);
214 
215 		if (tx_sa->bytes_acked == tx_sa->bytes_sent)
216 			break;
217 
218 		if (vm_wait) {
219 			vm_wait -= current_timeo;
220 			current_timeo = *timeo_p;
221 			if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
222 			    (current_timeo -= vm_wait) < 0)
223 				current_timeo = 0;
224 			vm_wait = 0;
225 		}
226 		*timeo_p = current_timeo;
227 	}
228 
229 	finish_wait(sk->sk_sleep, &wait);
230 
231 	sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
232 			tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
233 
234 	if (!ssk->qp_active) {
235 		sdp_dbg(sk, "QP destroyed while waiting\n");
236 		return -EINVAL;
237 	}
238 	return err;
239 }
240 
241 static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
242 {
243 	struct socket *sk = ssk->socket;
244 	long timeo = HZ * 5; /* Timeout for RDMA read */
245 	DEFINE_WAIT(wait);
246 
247 	sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
248 	while (1) {
249 		prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
250 
251 		if (!ssk->tx_ring.rdma_inflight->busy) {
252 			sdp_dbg_data(sk, "got rdma cqe\n");
253 			break;
254 		}
255 
256 		if (!ssk->qp_active) {
257 			sdp_dbg_data(sk, "QP destroyed\n");
258 			break;
259 		}
260 
261 		if (!timeo) {
262 			sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
263 			WARN_ON(1);
264 			break;
265 		}
266 
267 		posts_handler_put(ssk);
268 
269 		sdp_prf1(sk, NULL, "Going to sleep");
270 		sk_wait_event(sk, &timeo,
271 			!ssk->tx_ring.rdma_inflight->busy);
272 		sdp_prf1(sk, NULL, "Woke up");
273 		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
274 
275 		posts_handler_get(ssk);
276 	}
277 
278 	finish_wait(sk->sk_sleep, &wait);
279 
280 	sdp_dbg_data(sk, "Finished waiting\n");
281 }
282 
283 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
284 		struct rx_srcavail_state *rx_sa)
285 {
286 	struct mbuf *mb;
287 	int copied = rx_sa->used - rx_sa->reported;
288 
289 	if (rx_sa->used <= rx_sa->reported)
290 		return 0;
291 
292 	mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0);
293 
294 	rx_sa->reported += copied;
295 
296 	/* TODO: What if no tx_credits available? */
297 	sdp_post_send(ssk, mb);
298 
299 	return 0;
300 }
301 
302 int sdp_post_sendsm(struct socket *sk)
303 {
304 	struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0);
305 
306 	sdp_post_send(sdp_sk(sk), mb);
307 
308 	return 0;
309 }
310 
311 static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len)
312 {
313 	sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len);
314 	while (len > 0) {
315 		if (iov->iov_len) {
316 			int copy = min_t(unsigned int, iov->iov_len, len);
317 			len -= copy;
318 			iov->iov_len -= copy;
319 			iov->iov_base += copy;
320 		}
321 		iov++;
322 	}
323 
324 	return 0;
325 }
326 
327 static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
328 {
329 	int bytes = 0;
330 
331 	while (sge_cnt > 0) {
332 		bytes += sge->length;
333 		sge++;
334 		sge_cnt--;
335 	}
336 
337 	return bytes;
338 }
339 void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
340 {
341 	struct socket *sk = ssk->socket;
342 	unsigned long flags;
343 
344 	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
345 
346 	if (!ssk->tx_sa) {
347 		sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
348 		goto out;
349 	}
350 
351 	if (ssk->tx_sa->mseq > mseq_ack) {
352 		sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. "
353 			"SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
354 			mseq_ack, ssk->tx_sa->mseq);
355 		goto out;
356 	}
357 
358 	sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
359 
360 	ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
361 	cancel_delayed_work(&ssk->srcavail_cancel_work);
362 
363 	wake_up(sk->sk_sleep);
364 	sdp_dbg_data(sk, "woke up sleepers\n");
365 
366 out:
367 	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
368 }
369 
370 void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
371 		u32 bytes_completed)
372 {
373 	struct socket *sk = ssk->socket;
374 	unsigned long flags;
375 
376 	sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
377 	sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
378 
379 	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
380 
381 	BUG_ON(!ssk);
382 
383 	if (!ssk->tx_sa) {
384 		sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
385 		goto out;
386 	}
387 
388 	if (ssk->tx_sa->mseq > mseq_ack) {
389 		sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. "
390 			"SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
391 			mseq_ack, ssk->tx_sa->mseq);
392 		goto out;
393 	}
394 
395 	ssk->tx_sa->bytes_acked += bytes_completed;
396 
397 	wake_up(sk->sk_sleep);
398 	sdp_dbg_data(sk, "woke up sleepers\n");
399 
400 out:
401 	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
402 	return;
403 }
404 
405 static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset)
406 {
407 	unsigned long avail;
408 	unsigned long lock_limit;
409 
410 	if (capable(CAP_IPC_LOCK))
411 		return ULONG_MAX;
412 
413 	lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
414 	avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT);
415 
416 	return avail - offset;
417 }
418 
419 static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len,
420 	struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
421 {
422 	struct ib_pool_fmr *fmr;
423 	struct ib_umem *umem;
424 	struct ib_device *dev;
425 	u64 *pages;
426 	struct ib_umem_chunk *chunk;
427 	int n, j, k;
428 	int rc = 0;
429 	unsigned long max_lockable_bytes;
430 
431 	if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) {
432 		sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
433 			len, SDP_MAX_RDMA_READ_LEN);
434 		len = SDP_MAX_RDMA_READ_LEN;
435 	}
436 
437 	max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK);
438 	if (unlikely(len > max_lockable_bytes)) {
439 		sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n",
440 			len, max_lockable_bytes);
441 		len = max_lockable_bytes;
442 	}
443 
444 	sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n",
445 			uaddr, len, max_lockable_bytes);
446 
447 	umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
448 		IB_ACCESS_REMOTE_WRITE, 0);
449 
450 	if (IS_ERR(umem)) {
451 		rc = PTR_ERR(umem);
452 		sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc);
453 		sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n",
454 				current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur,
455 				current->signal->rlim[RLIMIT_MEMLOCK].rlim_max,
456 				capable(CAP_IPC_LOCK));
457 		goto err_umem_get;
458 	}
459 
460 	sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
461 		umem->offset, umem->length);
462 
463 	pages = (u64 *) __get_free_page(GFP_KERNEL);
464 	if (!pages)
465 		goto err_pages_alloc;
466 
467 	n = 0;
468 
469 	dev = sdp_sk(sk)->ib_device;
470 	list_for_each_entry(chunk, &umem->chunk_list, list) {
471 		for (j = 0; j < chunk->nmap; ++j) {
472 			len = ib_sg_dma_len(dev,
473 					&chunk->page_list[j]) >> PAGE_SHIFT;
474 
475 			for (k = 0; k < len; ++k) {
476 				pages[n++] = ib_sg_dma_address(dev,
477 						&chunk->page_list[j]) +
478 					umem->page_size * k;
479 
480 			}
481 		}
482 	}
483 
484 	fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
485 	if (IS_ERR(fmr)) {
486 		sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
487 		goto err_fmr_alloc;
488 	}
489 
490 	free_page((unsigned long) pages);
491 
492 	*_umem = umem;
493 	*_fmr = fmr;
494 
495 	return 0;
496 
497 err_fmr_alloc:
498 	free_page((unsigned long) pages);
499 
500 err_pages_alloc:
501 	ib_umem_release(umem);
502 
503 err_umem_get:
504 
505 	return rc;
506 }
507 
508 void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
509 {
510 	if (!sdp_sk(sk)->qp_active)
511 		return;
512 
513 	ib_fmr_pool_unmap(*_fmr);
514 	*_fmr = NULL;
515 
516 	ib_umem_release(*_umem);
517 	*_umem = NULL;
518 }
519 
520 static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa)
521 {
522 	struct sdp_sock *ssk = sdp_sk(sk);
523 	struct ib_send_wr *bad_wr;
524 	struct ib_send_wr wr = { NULL };
525 	struct ib_sge sge;
526 
527 	wr.opcode = IB_WR_RDMA_READ;
528 	wr.next = NULL;
529 	wr.wr_id = SDP_OP_RDMA;
530 	wr.wr.rdma.rkey = rx_sa->rkey;
531 	wr.send_flags = 0;
532 
533 	ssk->tx_ring.rdma_inflight = rx_sa;
534 
535 	sge.addr = rx_sa->umem->offset;
536 	sge.length = rx_sa->umem->length;
537 	sge.lkey = rx_sa->fmr->fmr->lkey;
538 
539 	wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
540 	wr.num_sge = 1;
541 	wr.sg_list = &sge;
542 	rx_sa->busy++;
543 
544 	wr.send_flags = IB_SEND_SIGNALED;
545 
546 	return ib_post_send(ssk->qp, &wr, &bad_wr);
547 }
548 
549 int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb,
550 		unsigned long *used)
551 {
552 	struct sdp_sock *ssk = sdp_sk(sk);
553 	struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb);
554 	int got_srcavail_cancel;
555 	int rc = 0;
556 	int len = *used;
557 	int copied;
558 
559 	sdp_dbg_data(ssk->socket, "preparing RDMA read."
560 		" len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
561 
562 	sock_hold(sk, SOCK_REF_RDMA_RD);
563 
564 	if (len > rx_sa->len) {
565 		sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
566 		WARN_ON(1);
567 		len = rx_sa->len;
568 	}
569 
570 	rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
571 	if (rc) {
572 		sdp_warn(sk, "Error allocating fmr: %d\n", rc);
573 		goto err_alloc_fmr;
574 	}
575 
576 	rc = sdp_post_rdma_read(sk, rx_sa);
577 	if (unlikely(rc)) {
578 		sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
579 		sdp_set_error(ssk->socket, -ECONNRESET);
580 		wake_up(&ssk->wq);
581 		goto err_post_send;
582 	}
583 
584 	sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc);
585 
586 	got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
587 
588 	sdp_arm_tx_cq(sk);
589 
590 	sdp_wait_rdma_wr_finished(ssk);
591 
592 	sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc);
593 	if (!ssk->qp_active) {
594 		sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
595 		rc = -EPIPE;
596 		goto err_post_send;
597 	}
598 
599 	copied = rx_sa->umem->length;
600 
601 	sdp_update_iov_used(sk, iov, copied);
602 	rx_sa->used += copied;
603 	atomic_add(copied, &ssk->rcv_nxt);
604 	*used = copied;
605 
606 	ssk->tx_ring.rdma_inflight = NULL;
607 
608 err_post_send:
609 	sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
610 
611 err_alloc_fmr:
612 	if (rc && ssk->qp_active) {
613 		sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
614 		rx_sa->flags |= RX_SA_ABORTED;
615 	}
616 
617 	sock_put(sk, SOCK_REF_RDMA_RD);
618 
619 	return rc;
620 }
621 
622 static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p)
623 {
624 	struct sdp_sock *ssk = sdp_sk(sk);
625 	int ret = 0;
626 	int credits_needed = 1;
627 
628 	sdp_dbg_data(sk, "Wait for mem\n");
629 
630 	set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
631 
632 	SDPSTATS_COUNTER_INC(send_wait_for_mem);
633 
634 	sdp_do_posts(ssk);
635 
636 	sdp_xmit_poll(ssk, 1);
637 
638 	ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
639 
640 	return ret;
641 }
642 
643 static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa,
644 		struct iovec *iov, long *timeo)
645 {
646 	struct sdp_sock *ssk = sdp_sk(sk);
647 	int rc = 0;
648 	unsigned long lock_flags;
649 
650 	rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
651 			&tx_sa->fmr, &tx_sa->umem);
652 	if (rc) {
653 		sdp_warn(sk, "Error allocating fmr: %d\n", rc);
654 		goto err_alloc_fmr;
655 	}
656 
657 	if (tx_slots_free(ssk) == 0) {
658 		rc = wait_for_sndbuf(sk, timeo);
659 		if (rc) {
660 			sdp_warn(sk, "Couldn't get send buffer\n");
661 			goto err_no_tx_slots;
662 		}
663 	}
664 
665 	rc = sdp_post_srcavail(sk, tx_sa);
666 	if (rc) {
667 		sdp_dbg(sk, "Error posting SrcAvail\n");
668 		goto err_abort_send;
669 	}
670 
671 	rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
672 	if (unlikely(rc)) {
673 		enum tx_sa_flag f = tx_sa->abort_flags;
674 
675 		if (f & TX_SA_SENDSM) {
676 			sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
677 		} else if (f & TX_SA_ERROR) {
678 			sdp_dbg_data(sk, "SrcAvail error completion\n");
679 			sdp_reset(sk);
680 			SDPSTATS_COUNTER_INC(zcopy_tx_error);
681 		} else if (ssk->qp_active) {
682 			sdp_post_srcavail_cancel(sk);
683 
684 			/* Wait for RdmaRdCompl/SendSM to
685 			 * finish the transaction */
686 			*timeo = 2 * HZ;
687 			sdp_dbg_data(sk, "Waiting for SendSM\n");
688 			sdp_wait_rdmardcompl(ssk, timeo, 1);
689 			sdp_dbg_data(sk, "finished waiting\n");
690 
691 			cancel_delayed_work(&ssk->srcavail_cancel_work);
692 		} else {
693 			sdp_dbg_data(sk, "QP was destroyed while waiting\n");
694 		}
695 	} else {
696 		sdp_dbg_data(sk, "got RdmaRdCompl\n");
697 	}
698 
699 	spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
700 	ssk->tx_sa = NULL;
701 	spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
702 
703 err_abort_send:
704 	sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
705 
706 err_no_tx_slots:
707 	sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
708 
709 err_alloc_fmr:
710 	return rc;
711 }
712 
713 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov)
714 {
715 	struct sdp_sock *ssk = sdp_sk(sk);
716 	int rc = 0;
717 	long timeo;
718 	struct tx_srcavail_state *tx_sa;
719 	int offset;
720 	size_t bytes_to_copy = 0;
721 	int copied = 0;
722 
723 	sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n",
724 			iov->iov_base, iov->iov_len);
725 	sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
726 	if (ssk->rx_sa) {
727 		sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
728 		return 0;
729 	}
730 
731 	sock_hold(ssk->socket, SOCK_REF_ZCOPY);
732 
733 	SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
734 
735 	timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
736 
737 	/* Ok commence sending. */
738 	offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
739 
740 	tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
741 	if (!tx_sa) {
742 		sdp_warn(sk, "Error allocating zcopy context\n");
743 		rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
744 		goto err_alloc_tx_sa;
745 	}
746 
747 	bytes_to_copy = iov->iov_len;
748 	do {
749 		tx_sa_reset(tx_sa);
750 
751 		rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
752 
753 		if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) {
754 			sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
755 				iov->iov_len);
756 			break;
757 		}
758 	} while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
759 
760 	kfree(tx_sa);
761 err_alloc_tx_sa:
762 	copied = bytes_to_copy - iov->iov_len;
763 
764 	sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
765 
766 	sock_put(ssk->socket, SOCK_REF_ZCOPY);
767 
768 	if (rc < 0 && rc != -EAGAIN && rc != -ETIME)
769 		return rc;
770 
771 	return copied;
772 }
773 
774 void sdp_abort_srcavail(struct socket *sk)
775 {
776 	struct sdp_sock *ssk = sdp_sk(sk);
777 	struct tx_srcavail_state *tx_sa = ssk->tx_sa;
778 	unsigned long flags;
779 
780 	if (!tx_sa)
781 		return;
782 
783 	cancel_delayed_work(&ssk->srcavail_cancel_work);
784 	flush_scheduled_work();
785 
786 	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
787 
788 	sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
789 
790 	ssk->tx_sa = NULL;
791 
792 	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
793 }
794 
795 void sdp_abort_rdma_read(struct socket *sk)
796 {
797 	struct sdp_sock *ssk = sdp_sk(sk);
798 	struct rx_srcavail_state *rx_sa = ssk->rx_sa;
799 
800 	if (!rx_sa)
801 		return;
802 
803 	sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
804 
805 	ssk->rx_sa = NULL;
806 }
807