xref: /illumos-gate/usr/src/uts/common/io/ib/clients/rdsv3/send.c (revision 33efde4275d24731ef87927237b0ffb0630b6b2d)
1 /*
2  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
3  */
4 
5 /*
6  * This file contains code imported from the OFED rds source file send.c
7  * Oracle elects to have and use the contents of send.c under and governed
8  * by the OpenIB.org BSD license (see below for full license text). However,
9  * the following notice accompanied the original version of this file:
10  */
11 
12 /*
13  * Copyright (c) 2006 Oracle.  All rights reserved.
14  *
15  * This software is available to you under a choice of one of two
16  * licenses.  You may choose to be licensed under the terms of the GNU
17  * General Public License (GPL) Version 2, available from the file
18  * COPYING in the main directory of this source tree, or the
19  * OpenIB.org BSD license below:
20  *
21  *     Redistribution and use in source and binary forms, with or
22  *     without modification, are permitted provided that the following
23  *     conditions are met:
24  *
25  *      - Redistributions of source code must retain the above
26  *        copyright notice, this list of conditions and the following
27  *        disclaimer.
28  *
29  *      - Redistributions in binary form must reproduce the above
30  *        copyright notice, this list of conditions and the following
31  *        disclaimer in the documentation and/or other materials
32  *        provided with the distribution.
33  *
34  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
35  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
36  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
37  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
38  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
39  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
40  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
41  * SOFTWARE.
42  *
43  */
44 #include <sys/stropts.h>
45 #include <sys/systm.h>
46 
47 #include <sys/rds.h>
48 #include <sys/socket.h>
49 #include <sys/socketvar.h>
50 
51 #include <sys/ib/clients/rdsv3/rdsv3.h>
52 #include <sys/ib/clients/rdsv3/rdma.h>
53 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
54 
55 /*
56  * When transmitting messages in rdsv3_send_xmit, we need to emerge from
57  * time to time and briefly release the CPU. Otherwise the softlock watchdog
58  * will kick our shin.
59  * Also, it seems fairer to not let one busy connection stall all the
60  * others.
61  *
62  * send_batch_count is the number of times we'll loop in send_xmit. Setting
63  * it to 0 will restore the old behavior (where we looped until we had
64  * drained the queue).
65  */
66 static int send_batch_count = 64;
67 
68 extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op);
69 /*
70  * Reset the send state. Caller must hold c_send_lock when calling here.
71  */
72 void
rdsv3_send_reset(struct rdsv3_connection * conn)73 rdsv3_send_reset(struct rdsv3_connection *conn)
74 {
75 	struct rdsv3_message *rm, *tmp;
76 	struct rdsv3_rdma_op *ro;
77 
78 	RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn);
79 
80 	ASSERT(MUTEX_HELD(&conn->c_send_lock));
81 
82 	if (conn->c_xmit_rm) {
83 		rm = conn->c_xmit_rm;
84 		ro = rm->m_rdma_op;
85 		if (ro && ro->r_mapped) {
86 			RDSV3_DPRINTF2("rdsv3_send_reset",
87 			    "rm %p mflg 0x%x map %d mihdl %p sgl %p",
88 			    rm, rm->m_flags, ro->r_mapped,
89 			    ro->r_rdma_sg[0].mihdl,
90 			    ro->r_rdma_sg[0].swr.wr_sgl);
91 			rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro);
92 		}
93 		/*
94 		 * Tell the user the RDMA op is no longer mapped by the
95 		 * transport. This isn't entirely true (it's flushed out
96 		 * independently) but as the connection is down, there's
97 		 * no ongoing RDMA to/from that memory
98 		 */
99 		rdsv3_message_unmapped(conn->c_xmit_rm);
100 		rdsv3_message_put(conn->c_xmit_rm);
101 		conn->c_xmit_rm = NULL;
102 	}
103 
104 	conn->c_xmit_sg = 0;
105 	conn->c_xmit_hdr_off = 0;
106 	conn->c_xmit_data_off = 0;
107 	conn->c_xmit_rdma_sent = 0;
108 	conn->c_map_queued = 0;
109 
110 	conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets;
111 	conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes;
112 
113 	/* Mark messages as retransmissions, and move them to the send q */
114 	mutex_enter(&conn->c_lock);
115 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
116 		set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
117 		set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags);
118 		if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) {
119 			RDSV3_DPRINTF4("_send_reset",
120 			    "RT rm %p mflg 0x%x sgl %p",
121 			    rm, rm->m_flags,
122 			    rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl);
123 		}
124 	}
125 	list_move_tail(&conn->c_send_queue, &conn->c_retrans);
126 	mutex_exit(&conn->c_lock);
127 
128 	RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn);
129 }
130 
131 /*
132  * We're making the concious trade-off here to only send one message
133  * down the connection at a time.
134  *   Pro:
135  *      - tx queueing is a simple fifo list
136  *   	- reassembly is optional and easily done by transports per conn
137  *      - no per flow rx lookup at all, straight to the socket
138  *   	- less per-frag memory and wire overhead
139  *   Con:
140  *      - queued acks can be delayed behind large messages
141  *   Depends:
142  *      - small message latency is higher behind queued large messages
143  *      - large message latency isn't starved by intervening small sends
144  */
145 int
rdsv3_send_xmit(struct rdsv3_connection * conn)146 rdsv3_send_xmit(struct rdsv3_connection *conn)
147 {
148 	struct rdsv3_message *rm;
149 	unsigned int tmp;
150 	unsigned int send_quota = send_batch_count;
151 	struct rdsv3_scatterlist *sg;
152 	int ret = 0;
153 	int was_empty = 0;
154 	list_t to_be_dropped;
155 
156 	if (!rdsv3_conn_up(conn))
157 		goto out;
158 
159 	RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn);
160 
161 	list_create(&to_be_dropped, sizeof (struct rdsv3_message),
162 	    offsetof(struct rdsv3_message, m_conn_item));
163 
164 	/*
165 	 * sendmsg calls here after having queued its message on the send
166 	 * queue.  We only have one task feeding the connection at a time.  If
167 	 * another thread is already feeding the queue then we back off.  This
168 	 * avoids blocking the caller and trading per-connection data between
169 	 * caches per message.
170 	 */
171 	if (!mutex_tryenter(&conn->c_send_lock)) {
172 		RDSV3_DPRINTF4("rdsv3_send_xmit",
173 		    "Another thread running(conn: %p)", conn);
174 		rdsv3_stats_inc(s_send_sem_contention);
175 		ret = -ENOMEM;
176 		goto out;
177 	}
178 	atomic_inc_32(&conn->c_senders);
179 
180 	if (conn->c_trans->xmit_prepare)
181 		conn->c_trans->xmit_prepare(conn);
182 
183 	/*
184 	 * spin trying to push headers and data down the connection until
185 	 * the connection doesn't make forward progress.
186 	 */
187 	while (--send_quota) {
188 		/*
189 		 * See if need to send a congestion map update if we're
190 		 * between sending messages.  The send_sem protects our sole
191 		 * use of c_map_offset and _bytes.
192 		 * Note this is used only by transports that define a special
193 		 * xmit_cong_map function. For all others, we create allocate
194 		 * a cong_map message and treat it just like any other send.
195 		 */
196 		if (conn->c_map_bytes) {
197 			ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
198 			    conn->c_map_offset);
199 			if (ret <= 0)
200 				break;
201 
202 			conn->c_map_offset += ret;
203 			conn->c_map_bytes -= ret;
204 			if (conn->c_map_bytes)
205 				continue;
206 		}
207 
208 		/*
209 		 * If we're done sending the current message, clear the
210 		 * offset and S/G temporaries.
211 		 */
212 		rm = conn->c_xmit_rm;
213 		if (rm != NULL &&
214 		    conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) &&
215 		    conn->c_xmit_sg == rm->m_nents) {
216 			conn->c_xmit_rm = NULL;
217 			conn->c_xmit_sg = 0;
218 			conn->c_xmit_hdr_off = 0;
219 			conn->c_xmit_data_off = 0;
220 			conn->c_xmit_rdma_sent = 0;
221 
222 			/* Release the reference to the previous message. */
223 			rdsv3_message_put(rm);
224 			rm = NULL;
225 		}
226 
227 		/* If we're asked to send a cong map update, do so. */
228 		if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) {
229 			if (conn->c_trans->xmit_cong_map != NULL) {
230 				conn->c_map_offset = 0;
231 				conn->c_map_bytes =
232 				    sizeof (struct rdsv3_header) +
233 				    RDSV3_CONG_MAP_BYTES;
234 				continue;
235 			}
236 
237 			rm = rdsv3_cong_update_alloc(conn);
238 			if (IS_ERR(rm)) {
239 				ret = PTR_ERR(rm);
240 				break;
241 			}
242 
243 			conn->c_xmit_rm = rm;
244 		}
245 
246 		/*
247 		 * Grab the next message from the send queue, if there is one.
248 		 *
249 		 * c_xmit_rm holds a ref while we're sending this message down
250 		 * the connction.  We can use this ref while holding the
251 		 * send_sem.. rdsv3_send_reset() is serialized with it.
252 		 */
253 		if (rm == NULL) {
254 			unsigned int len;
255 
256 			mutex_enter(&conn->c_lock);
257 
258 			if (!list_is_empty(&conn->c_send_queue)) {
259 				rm = list_remove_head(&conn->c_send_queue);
260 				rdsv3_message_addref(rm);
261 
262 				/*
263 				 * Move the message from the send queue to
264 				 * the retransmit
265 				 * list right away.
266 				 */
267 				list_insert_tail(&conn->c_retrans, rm);
268 			}
269 
270 			mutex_exit(&conn->c_lock);
271 
272 			if (rm == NULL) {
273 				was_empty = 1;
274 				break;
275 			}
276 
277 			/*
278 			 * Unfortunately, the way Infiniband deals with
279 			 * RDMA to a bad MR key is by moving the entire
280 			 * queue pair to error state. We cold possibly
281 			 * recover from that, but right now we drop the
282 			 * connection.
283 			 * Therefore, we never retransmit messages with
284 			 * RDMA ops.
285 			 */
286 			if (rm->m_rdma_op &&
287 			    test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) {
288 				mutex_enter(&conn->c_lock);
289 				if (test_and_clear_bit(RDSV3_MSG_ON_CONN,
290 				    &rm->m_flags))
291 					list_remove_node(&rm->m_conn_item);
292 					list_insert_tail(&to_be_dropped, rm);
293 				mutex_exit(&conn->c_lock);
294 				rdsv3_message_put(rm);
295 				continue;
296 			}
297 
298 			/* Require an ACK every once in a while */
299 			len = ntohl(rm->m_inc.i_hdr.h_len);
300 			if (conn->c_unacked_packets == 0 ||
301 			    conn->c_unacked_bytes < len) {
302 				set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
303 
304 				conn->c_unacked_packets =
305 				    rdsv3_sysctl_max_unacked_packets;
306 				conn->c_unacked_bytes =
307 				    rdsv3_sysctl_max_unacked_bytes;
308 				rdsv3_stats_inc(s_send_ack_required);
309 			} else {
310 				conn->c_unacked_bytes -= len;
311 				conn->c_unacked_packets--;
312 			}
313 
314 			conn->c_xmit_rm = rm;
315 		}
316 
317 		/*
318 		 * Try and send an rdma message.  Let's see if we can
319 		 * keep this simple and require that the transport either
320 		 * send the whole rdma or none of it.
321 		 */
322 		if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) {
323 			ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op);
324 			if (ret)
325 				break;
326 			conn->c_xmit_rdma_sent = 1;
327 			/*
328 			 * The transport owns the mapped memory for now.
329 			 * You can't unmap it while it's on the send queue
330 			 */
331 			set_bit(RDSV3_MSG_MAPPED, &rm->m_flags);
332 		}
333 
334 		if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) ||
335 		    conn->c_xmit_sg < rm->m_nents) {
336 			ret = conn->c_trans->xmit(conn, rm,
337 			    conn->c_xmit_hdr_off,
338 			    conn->c_xmit_sg,
339 			    conn->c_xmit_data_off);
340 			if (ret <= 0)
341 				break;
342 
343 			if (conn->c_xmit_hdr_off <
344 			    sizeof (struct rdsv3_header)) {
345 				tmp = min(ret,
346 				    sizeof (struct rdsv3_header) -
347 				    conn->c_xmit_hdr_off);
348 				conn->c_xmit_hdr_off += tmp;
349 				ret -= tmp;
350 			}
351 
352 			sg = &rm->m_sg[conn->c_xmit_sg];
353 			while (ret) {
354 				tmp = min(ret, rdsv3_sg_len(sg) -
355 				    conn->c_xmit_data_off);
356 				conn->c_xmit_data_off += tmp;
357 				ret -= tmp;
358 				if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) {
359 					conn->c_xmit_data_off = 0;
360 					sg++;
361 					conn->c_xmit_sg++;
362 					ASSERT(!(ret != 0 &&
363 					    conn->c_xmit_sg == rm->m_nents));
364 				}
365 			}
366 		}
367 	}
368 
369 	/* Nuke any messages we decided not to retransmit. */
370 	if (!list_is_empty(&to_be_dropped))
371 		rdsv3_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
372 
373 	if (conn->c_trans->xmit_complete)
374 		conn->c_trans->xmit_complete(conn);
375 
376 	/*
377 	 * We might be racing with another sender who queued a message but
378 	 * backed off on noticing that we held the c_send_lock.  If we check
379 	 * for queued messages after dropping the sem then either we'll
380 	 * see the queued message or the queuer will get the sem.  If we
381 	 * notice the queued message then we trigger an immediate retry.
382 	 *
383 	 * We need to be careful only to do this when we stopped processing
384 	 * the send queue because it was empty.  It's the only way we
385 	 * stop processing the loop when the transport hasn't taken
386 	 * responsibility for forward progress.
387 	 */
388 	mutex_exit(&conn->c_send_lock);
389 
390 	if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) {
391 		/*
392 		 * We exhausted the send quota, but there's work left to
393 		 * do. Return and (re-)schedule the send worker.
394 		 */
395 		ret = -EAGAIN;
396 	}
397 
398 	atomic_dec_32(&conn->c_senders);
399 
400 	if (ret == 0 && was_empty) {
401 		/*
402 		 * A simple bit test would be way faster than taking the
403 		 * spin lock
404 		 */
405 		mutex_enter(&conn->c_lock);
406 		if (!list_is_empty(&conn->c_send_queue)) {
407 			rdsv3_stats_inc(s_send_sem_queue_raced);
408 			ret = -EAGAIN;
409 		}
410 		mutex_exit(&conn->c_lock);
411 	}
412 
413 out:
414 	RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)",
415 	    conn, ret);
416 	return (ret);
417 }
418 
419 static void
rdsv3_send_sndbuf_remove(struct rdsv3_sock * rs,struct rdsv3_message * rm)420 rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm)
421 {
422 	uint32_t len = ntohl(rm->m_inc.i_hdr.h_len);
423 
424 	ASSERT(mutex_owned(&rs->rs_lock));
425 
426 	ASSERT(rs->rs_snd_bytes >= len);
427 	rs->rs_snd_bytes -= len;
428 
429 	if (rs->rs_snd_bytes == 0)
430 		rdsv3_stats_inc(s_send_queue_empty);
431 }
432 
433 static inline int
rdsv3_send_is_acked(struct rdsv3_message * rm,uint64_t ack,is_acked_func is_acked)434 rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack,
435     is_acked_func is_acked)
436 {
437 	if (is_acked)
438 		return (is_acked(rm, ack));
439 	return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack);
440 }
441 
442 /*
443  * Returns true if there are no messages on the send and retransmit queues
444  * which have a sequence number greater than or equal to the given sequence
445  * number.
446  */
447 int
rdsv3_send_acked_before(struct rdsv3_connection * conn,uint64_t seq)448 rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq)
449 {
450 	struct rdsv3_message *rm;
451 	int ret = 1;
452 
453 	RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn);
454 
455 	mutex_enter(&conn->c_lock);
456 
457 	/* XXX - original code spits out warning */
458 	rm = list_head(&conn->c_retrans);
459 	if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq)
460 		ret = 0;
461 
462 	/* XXX - original code spits out warning */
463 	rm = list_head(&conn->c_send_queue);
464 	if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq)
465 		ret = 0;
466 
467 	mutex_exit(&conn->c_lock);
468 
469 	RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn);
470 
471 	return (ret);
472 }
473 
474 /*
475  * This is pretty similar to what happens below in the ACK
476  * handling code - except that we call here as soon as we get
477  * the IB send completion on the RDMA op and the accompanying
478  * message.
479  */
480 void
rdsv3_rdma_send_complete(struct rdsv3_message * rm,int status)481 rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status)
482 {
483 	struct rdsv3_sock *rs = NULL;
484 	struct rdsv3_rdma_op *ro;
485 	struct rdsv3_notifier *notifier;
486 
487 	RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm);
488 
489 	mutex_enter(&rm->m_rs_lock);
490 
491 	ro = rm->m_rdma_op;
492 	if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) &&
493 	    ro && ro->r_notify && ro->r_notifier) {
494 		notifier = ro->r_notifier;
495 		rs = rm->m_rs;
496 		rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs));
497 
498 		notifier->n_status = status;
499 		mutex_enter(&rs->rs_lock);
500 		list_insert_tail(&rs->rs_notify_queue, notifier);
501 		mutex_exit(&rs->rs_lock);
502 		ro->r_notifier = NULL;
503 	}
504 
505 	mutex_exit(&rm->m_rs_lock);
506 
507 	if (rs) {
508 		struct rsock *sk = rdsv3_rs_to_sk(rs);
509 		int error;
510 
511 		rdsv3_wake_sk_sleep(rs);
512 
513 		/* wake up anyone waiting in poll */
514 		sk->sk_upcalls->su_recv(sk->sk_upper_handle, NULL,
515 		    0, 0, &error, NULL);
516 		if (error != 0) {
517 			RDSV3_DPRINTF2("rdsv3_recv_incoming",
518 			    "su_recv returned: %d", error);
519 		}
520 
521 		rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
522 	}
523 
524 	RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm);
525 }
526 
527 /*
528  * This is the same as rdsv3_rdma_send_complete except we
529  * don't do any locking - we have all the ingredients (message,
530  * socket, socket lock) and can just move the notifier.
531  */
532 static inline void
__rdsv3_rdma_send_complete(struct rdsv3_sock * rs,struct rdsv3_message * rm,int status)533 __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm,
534     int status)
535 {
536 	struct rdsv3_rdma_op *ro;
537 	void *ic;
538 
539 	RDSV3_DPRINTF4("__rdsv3_rdma_send_complete",
540 	    "Enter(rs: %p, rm: %p)", rs, rm);
541 
542 	ro = rm->m_rdma_op;
543 	if (ro && ro->r_notify && ro->r_notifier) {
544 		ro->r_notifier->n_status = status;
545 		list_insert_tail(&rs->rs_notify_queue, ro->r_notifier);
546 		ro->r_notifier = NULL;
547 	}
548 
549 	/* No need to wake the app - caller does this */
550 }
551 
552 /*
553  * This is called from the IB send completion when we detect
554  * a RDMA operation that failed with remote access error.
555  * So speed is not an issue here.
556  */
557 struct rdsv3_message *
rdsv3_send_get_message(struct rdsv3_connection * conn,struct rdsv3_rdma_op * op)558 rdsv3_send_get_message(struct rdsv3_connection *conn,
559     struct rdsv3_rdma_op *op)
560 {
561 	struct rdsv3_message *rm, *tmp, *found = NULL;
562 
563 	RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn);
564 
565 	mutex_enter(&conn->c_lock);
566 
567 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
568 		if (rm->m_rdma_op == op) {
569 			atomic_inc_32(&rm->m_refcount);
570 			found = rm;
571 			goto out;
572 		}
573 	}
574 
575 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue,
576 	    m_conn_item) {
577 		if (rm->m_rdma_op == op) {
578 			atomic_inc_32(&rm->m_refcount);
579 			found = rm;
580 			break;
581 		}
582 	}
583 
584 out:
585 	mutex_exit(&conn->c_lock);
586 
587 	return (found);
588 }
589 
590 /*
591  * This removes messages from the socket's list if they're on it.  The list
592  * argument must be private to the caller, we must be able to modify it
593  * without locks.  The messages must have a reference held for their
594  * position on the list.  This function will drop that reference after
595  * removing the messages from the 'messages' list regardless of if it found
596  * the messages on the socket list or not.
597  */
598 void
rdsv3_send_remove_from_sock(struct list * messages,int status)599 rdsv3_send_remove_from_sock(struct list *messages, int status)
600 {
601 	struct rdsv3_sock *rs = NULL;
602 	struct rdsv3_message *rm;
603 
604 	RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter");
605 
606 	while (!list_is_empty(messages)) {
607 		int was_on_sock = 0;
608 		rm = list_remove_head(messages);
609 
610 		/*
611 		 * If we see this flag cleared then we're *sure* that someone
612 		 * else beat us to removing it from the sock.  If we race
613 		 * with their flag update we'll get the lock and then really
614 		 * see that the flag has been cleared.
615 		 *
616 		 * The message spinlock makes sure nobody clears rm->m_rs
617 		 * while we're messing with it. It does not prevent the
618 		 * message from being removed from the socket, though.
619 		 */
620 		mutex_enter(&rm->m_rs_lock);
621 		if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags))
622 			goto unlock_and_drop;
623 
624 		if (rs != rm->m_rs) {
625 			if (rs) {
626 				rdsv3_wake_sk_sleep(rs);
627 				rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
628 			}
629 			rs = rm->m_rs;
630 			rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs));
631 		}
632 
633 		mutex_enter(&rs->rs_lock);
634 		if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) {
635 			struct rdsv3_rdma_op *ro = rm->m_rdma_op;
636 			struct rdsv3_notifier *notifier;
637 
638 			list_remove_node(&rm->m_sock_item);
639 			rdsv3_send_sndbuf_remove(rs, rm);
640 			if (ro && ro->r_notifier &&
641 			    (status || ro->r_notify)) {
642 				notifier = ro->r_notifier;
643 				list_insert_tail(&rs->rs_notify_queue,
644 				    notifier);
645 				if (!notifier->n_status)
646 					notifier->n_status = status;
647 				rm->m_rdma_op->r_notifier = NULL;
648 			}
649 			was_on_sock = 1;
650 			rm->m_rs = NULL;
651 		}
652 		mutex_exit(&rs->rs_lock);
653 
654 unlock_and_drop:
655 		mutex_exit(&rm->m_rs_lock);
656 		rdsv3_message_put(rm);
657 		if (was_on_sock)
658 			rdsv3_message_put(rm);
659 	}
660 
661 	if (rs) {
662 		rdsv3_wake_sk_sleep(rs);
663 		rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
664 	}
665 
666 	RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return");
667 }
668 
669 /*
670  * Transports call here when they've determined that the receiver queued
671  * messages up to, and including, the given sequence number.  Messages are
672  * moved to the retrans queue when rdsv3_send_xmit picks them off the send
673  * queue. This means that in the TCP case, the message may not have been
674  * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
675  * checks the RDSV3_MSG_HAS_ACK_SEQ bit.
676  *
677  * XXX It's not clear to me how this is safely serialized with socket
678  * destruction.  Maybe it should bail if it sees SOCK_DEAD.
679  */
680 void
rdsv3_send_drop_acked(struct rdsv3_connection * conn,uint64_t ack,is_acked_func is_acked)681 rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack,
682     is_acked_func is_acked)
683 {
684 	struct rdsv3_message *rm, *tmp;
685 	list_t list;
686 
687 	RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn);
688 
689 	list_create(&list, sizeof (struct rdsv3_message),
690 	    offsetof(struct rdsv3_message, m_conn_item));
691 
692 	mutex_enter(&conn->c_lock);
693 
694 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
695 		if (!rdsv3_send_is_acked(rm, ack, is_acked))
696 			break;
697 
698 		list_remove_node(&rm->m_conn_item);
699 		list_insert_tail(&list, rm);
700 		clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
701 	}
702 
703 #if 0
704 XXX
705 	/* order flag updates with spin locks */
706 	if (!list_is_empty(&list))
707 		smp_mb__after_clear_bit();
708 #endif
709 
710 	mutex_exit(&conn->c_lock);
711 
712 	/* now remove the messages from the sock list as needed */
713 	rdsv3_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
714 
715 	RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn);
716 }
717 
718 void
rdsv3_send_drop_to(struct rdsv3_sock * rs,struct sockaddr_in * dest)719 rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest)
720 {
721 	struct rdsv3_message *rm, *tmp;
722 	struct rdsv3_connection *conn;
723 	list_t list;
724 	int wake = 0;
725 
726 	RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs);
727 
728 	list_create(&list, sizeof (struct rdsv3_message),
729 	    offsetof(struct rdsv3_message, m_sock_item));
730 
731 	/* get all the messages we're dropping under the rs lock */
732 	mutex_enter(&rs->rs_lock);
733 
734 	RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue,
735 	    m_sock_item) {
736 		if (dest && (dest->sin_addr.s_addr != rm->m_daddr ||
737 		    dest->sin_port != rm->m_inc.i_hdr.h_dport))
738 			continue;
739 		wake = 1;
740 		list_remove(&rs->rs_send_queue, rm);
741 		list_insert_tail(&list, rm);
742 		rdsv3_send_sndbuf_remove(rs, rm);
743 		clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags);
744 	}
745 
746 	mutex_exit(&rs->rs_lock);
747 
748 	conn = NULL;
749 
750 	/* now remove the messages from the conn list as needed */
751 	RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) {
752 		/*
753 		 * We do this here rather than in the loop above, so that
754 		 * we don't have to nest m_rs_lock under rs->rs_lock
755 		 */
756 		mutex_enter(&rm->m_rs_lock);
757 		/* If this is a RDMA operation, notify the app. */
758 		__rdsv3_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
759 		rm->m_rs = NULL;
760 		mutex_exit(&rm->m_rs_lock);
761 
762 		/*
763 		 * If we see this flag cleared then we're *sure* that someone
764 		 * else beat us to removing it from the conn.  If we race
765 		 * with their flag update we'll get the lock and then really
766 		 * see that the flag has been cleared.
767 		 */
768 		if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags))
769 			continue;
770 
771 		if (conn != rm->m_inc.i_conn) {
772 			if (conn)
773 				mutex_exit(&conn->c_lock);
774 			conn = rm->m_inc.i_conn;
775 			mutex_enter(&conn->c_lock);
776 		}
777 
778 		if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) {
779 			list_remove_node(&rm->m_conn_item);
780 			rdsv3_message_put(rm);
781 		}
782 	}
783 
784 	if (conn)
785 		mutex_exit(&conn->c_lock);
786 
787 	if (wake)
788 		rdsv3_wake_sk_sleep(rs);
789 
790 	while (!list_is_empty(&list)) {
791 		rm = list_remove_head(&list);
792 
793 		rdsv3_message_wait(rm);
794 		rdsv3_message_put(rm);
795 	}
796 
797 	RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs);
798 }
799 
800 /*
801  * we only want this to fire once so we use the callers 'queued'.  It's
802  * possible that another thread can race with us and remove the
803  * message from the flow with RDSV3_CANCEL_SENT_TO.
804  */
805 static int
rdsv3_send_queue_rm(struct rdsv3_sock * rs,struct rdsv3_connection * conn,struct rdsv3_message * rm,uint16_be_t sport,uint16_be_t dport,int * queued)806 rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn,
807     struct rdsv3_message *rm, uint16_be_t sport,
808     uint16_be_t dport, int *queued)
809 {
810 	uint32_t len;
811 
812 	RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm);
813 
814 	if (*queued)
815 		goto out;
816 
817 	len = ntohl(rm->m_inc.i_hdr.h_len);
818 
819 	/*
820 	 * this is the only place which holds both the socket's rs_lock
821 	 * and the connection's c_lock
822 	 */
823 	mutex_enter(&rs->rs_lock);
824 
825 	/*
826 	 * If there is a little space in sndbuf, we don't queue anything,
827 	 * and userspace gets -EAGAIN. But poll() indicates there's send
828 	 * room. This can lead to bad behavior (spinning) if snd_bytes isn't
829 	 * freed up by incoming acks. So we check the *old* value of
830 	 * rs_snd_bytes here to allow the last msg to exceed the buffer,
831 	 * and poll() now knows no more data can be sent.
832 	 */
833 	if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) {
834 		rs->rs_snd_bytes += len;
835 
836 		/*
837 		 * let recv side know we are close to send space exhaustion.
838 		 * This is probably not the optimal way to do it, as this
839 		 * means we set the flag on *all* messages as soon as our
840 		 * throughput hits a certain threshold.
841 		 */
842 		if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2)
843 			set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
844 
845 		list_insert_tail(&rs->rs_send_queue, rm);
846 		set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags);
847 
848 		rdsv3_message_addref(rm);
849 		rm->m_rs = rs;
850 
851 		/*
852 		 * The code ordering is a little weird, but we're
853 		 * trying to minimize the time we hold c_lock
854 		 */
855 		rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport,
856 		    dport, 0);
857 		rm->m_inc.i_conn = conn;
858 		rdsv3_message_addref(rm);	/* XXX - called twice */
859 
860 		mutex_enter(&conn->c_lock);
861 		rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++);
862 		list_insert_tail(&conn->c_send_queue, rm);
863 		set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
864 		mutex_exit(&conn->c_lock);
865 
866 		RDSV3_DPRINTF5("rdsv3_send_queue_rm",
867 		    "queued msg %p len %d, rs %p bytes %d seq %llu",
868 		    rm, len, rs, rs->rs_snd_bytes,
869 		    (unsigned long long)ntohll(
870 		    rm->m_inc.i_hdr.h_sequence));
871 
872 		*queued = 1;
873 	}
874 
875 	mutex_exit(&rs->rs_lock);
876 
877 	RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs);
878 out:
879 	return (*queued);
880 }
881 
882 static int
rdsv3_cmsg_send(struct rdsv3_sock * rs,struct rdsv3_message * rm,struct msghdr * msg,int * allocated_mr)883 rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm,
884     struct msghdr *msg, int *allocated_mr)
885 {
886 	struct cmsghdr *cmsg;
887 	int ret = 0;
888 
889 	RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs);
890 
891 	for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
892 
893 		if (cmsg->cmsg_level != SOL_RDS)
894 			continue;
895 
896 		RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d",
897 		    cmsg, rm, cmsg->cmsg_type);
898 		/*
899 		 * As a side effect, RDMA_DEST and RDMA_MAP will set
900 		 * rm->m_rdma_cookie and rm->m_rdma_mr.
901 		 */
902 		switch (cmsg->cmsg_type) {
903 		case RDS_CMSG_RDMA_ARGS:
904 			ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg);
905 			break;
906 
907 		case RDS_CMSG_RDMA_DEST:
908 			ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg);
909 			break;
910 
911 		case RDS_CMSG_RDMA_MAP:
912 			ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg);
913 			if (ret)
914 				*allocated_mr = 1;
915 			break;
916 
917 		default:
918 			return (-EINVAL);
919 		}
920 
921 		if (ret)
922 			break;
923 	}
924 
925 	RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs);
926 
927 	return (ret);
928 }
929 
930 extern unsigned long rdsv3_max_bcopy_size;
931 
932 int
rdsv3_sendmsg(struct rdsv3_sock * rs,uio_t * uio,struct nmsghdr * msg,size_t payload_len)933 rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg,
934     size_t payload_len)
935 {
936 	struct rsock *sk = rdsv3_rs_to_sk(rs);
937 	struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name;
938 	uint32_be_t daddr;
939 	uint16_be_t dport;
940 	struct rdsv3_message *rm = NULL;
941 	struct rdsv3_connection *conn;
942 	int ret = 0;
943 	int queued = 0, allocated_mr = 0;
944 	int nonblock = msg->msg_flags & MSG_DONTWAIT;
945 	long timeo = rdsv3_sndtimeo(sk, nonblock);
946 
947 	RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs);
948 
949 	if (msg->msg_namelen) {
950 		/* XXX fail non-unicast destination IPs? */
951 		if (msg->msg_namelen < sizeof (*usin) ||
952 		    usin->sin_family != AF_INET_OFFLOAD) {
953 			ret = -EINVAL;
954 			RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret);
955 			goto out;
956 		}
957 		daddr = usin->sin_addr.s_addr;
958 		dport = usin->sin_port;
959 	} else {
960 		/* We only care about consistency with ->connect() */
961 		mutex_enter(&sk->sk_lock);
962 		daddr = rs->rs_conn_addr;
963 		dport = rs->rs_conn_port;
964 		mutex_exit(&sk->sk_lock);
965 	}
966 
967 	/* racing with another thread binding seems ok here */
968 	if (daddr == 0 || rs->rs_bound_addr == 0) {
969 		ret = -ENOTCONN; /* XXX not a great errno */
970 		RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret);
971 		goto out;
972 	}
973 
974 	if (payload_len > rdsv3_max_bcopy_size) {
975 		RDSV3_DPRINTF2("rdsv3_sendmsg", "Message too large: %d",
976 		    payload_len);
977 		ret = -EMSGSIZE;
978 		goto out;
979 	}
980 
981 	rm = rdsv3_message_copy_from_user(uio, payload_len);
982 	if (IS_ERR(rm)) {
983 		ret = PTR_ERR(rm);
984 		RDSV3_DPRINTF2("rdsv3_sendmsg",
985 		    "rdsv3_message_copy_from_user failed %d", -ret);
986 		rm = NULL;
987 		goto out;
988 	}
989 
990 	rm->m_daddr = daddr;
991 
992 	/* Parse any control messages the user may have included. */
993 	ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr);
994 	if (ret) {
995 		RDSV3_DPRINTF2("rdsv3_sendmsg",
996 		    "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d",
997 		    rs, rm, msg, ret);
998 		goto out;
999 	}
1000 
1001 	/*
1002 	 * rdsv3_conn_create has a spinlock that runs with IRQ off.
1003 	 * Caching the conn in the socket helps a lot.
1004 	 */
1005 	mutex_enter(&rs->rs_conn_lock);
1006 	if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) {
1007 		conn = rs->rs_conn;
1008 	} else {
1009 		conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr,
1010 		    daddr, rs->rs_transport, KM_NOSLEEP);
1011 		if (IS_ERR(conn)) {
1012 			mutex_exit(&rs->rs_conn_lock);
1013 			ret = PTR_ERR(conn);
1014 			RDSV3_DPRINTF2("rdsv3_sendmsg",
1015 			    "rdsv3_conn_create_outgoing failed %d",
1016 			    -ret);
1017 			goto out;
1018 		}
1019 		rs->rs_conn = conn;
1020 	}
1021 	mutex_exit(&rs->rs_conn_lock);
1022 
1023 	if ((rm->m_rdma_cookie || rm->m_rdma_op) &&
1024 	    conn->c_trans->xmit_rdma == NULL) {
1025 		RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p",
1026 		    rm->m_rdma_op, conn->c_trans->xmit_rdma);
1027 		ret = -EOPNOTSUPP;
1028 		goto out;
1029 	}
1030 
1031 	/*
1032 	 * If the connection is down, trigger a connect. We may
1033 	 * have scheduled a delayed reconnect however - in this case
1034 	 * we should not interfere.
1035 	 */
1036 	if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN &&
1037 	    !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags))
1038 		rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0);
1039 
1040 	ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs);
1041 	if (ret) {
1042 		mutex_enter(&rs->rs_congested_lock);
1043 		rs->rs_seen_congestion = 1;
1044 		cv_signal(&rs->rs_congested_cv);
1045 		mutex_exit(&rs->rs_congested_lock);
1046 
1047 		RDSV3_DPRINTF2("rdsv3_sendmsg",
1048 		    "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret);
1049 		goto out;
1050 	}
1051 
1052 	(void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport,
1053 	    &queued);
1054 	if (!queued) {
1055 		/* rdsv3_stats_inc(s_send_queue_full); */
1056 		/* XXX make sure this is reasonable */
1057 		if (payload_len > rdsv3_sk_sndbuf(rs)) {
1058 			ret = -EMSGSIZE;
1059 			RDSV3_DPRINTF2("rdsv3_sendmsg",
1060 			    "msgsize(%d) too big, returning: %d",
1061 			    payload_len, -ret);
1062 			goto out;
1063 		}
1064 		if (nonblock) {
1065 			ret = -EAGAIN;
1066 			RDSV3_DPRINTF3("rdsv3_sendmsg",
1067 			    "send queue full (%d), returning: %d",
1068 			    payload_len, -ret);
1069 			goto out;
1070 		}
1071 
1072 #if 0
1073 		ret = rdsv3_wait_sig(sk->sk_sleep,
1074 		    (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
1075 		    dport, &queued)));
1076 		if (ret == 0) {
1077 			/* signal/timeout pending */
1078 			RDSV3_DPRINTF2("rdsv3_sendmsg",
1079 			    "woke due to signal: %d", ret);
1080 			ret = -ERESTART;
1081 			goto out;
1082 		}
1083 #else
1084 		mutex_enter(&sk->sk_sleep->waitq_mutex);
1085 		sk->sk_sleep->waitq_waiters++;
1086 		while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
1087 		    dport, &queued)) {
1088 			ret = cv_wait_sig(&sk->sk_sleep->waitq_cv,
1089 			    &sk->sk_sleep->waitq_mutex);
1090 			if (ret == 0) {
1091 				/* signal/timeout pending */
1092 				RDSV3_DPRINTF2("rdsv3_sendmsg",
1093 				    "woke due to signal: %d", ret);
1094 				ret = -EINTR;
1095 				sk->sk_sleep->waitq_waiters--;
1096 				mutex_exit(&sk->sk_sleep->waitq_mutex);
1097 				goto out;
1098 			}
1099 		}
1100 		sk->sk_sleep->waitq_waiters--;
1101 		mutex_exit(&sk->sk_sleep->waitq_mutex);
1102 #endif
1103 
1104 		RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d",
1105 		    queued);
1106 
1107 		ASSERT(queued);
1108 		ret = 0;
1109 	}
1110 
1111 	/*
1112 	 * By now we've committed to the send.  We reuse rdsv3_send_worker()
1113 	 * to retry sends in the rds thread if the transport asks us to.
1114 	 */
1115 	rdsv3_stats_inc(s_send_queued);
1116 
1117 	if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
1118 		(void) rdsv3_send_worker(&conn->c_send_w.work);
1119 
1120 	rdsv3_message_put(rm);
1121 	RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)",
1122 	    rs, payload_len);
1123 	return (payload_len);
1124 
1125 out:
1126 	/*
1127 	 * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
1128 	 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
1129 	 * or in any other way, we need to destroy the MR again
1130 	 */
1131 	if (allocated_mr)
1132 		rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie),
1133 		    1);
1134 
1135 	if (rm)
1136 		rdsv3_message_put(rm);
1137 	return (ret);
1138 }
1139 
1140 /*
1141  * Reply to a ping packet.
1142  */
1143 int
rdsv3_send_pong(struct rdsv3_connection * conn,uint16_be_t dport)1144 rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport)
1145 {
1146 	struct rdsv3_message *rm;
1147 	int ret = 0;
1148 
1149 	RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn);
1150 
1151 	rm = rdsv3_message_alloc(0, KM_NOSLEEP);
1152 	if (!rm) {
1153 		ret = -ENOMEM;
1154 		goto out;
1155 	}
1156 
1157 	rm->m_daddr = conn->c_faddr;
1158 
1159 	/*
1160 	 * If the connection is down, trigger a connect. We may
1161 	 * have scheduled a delayed reconnect however - in this case
1162 	 * we should not interfere.
1163 	 */
1164 	if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN &&
1165 	    !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags))
1166 		rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0);
1167 
1168 	ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL);
1169 	if (ret)
1170 		goto out;
1171 
1172 	mutex_enter(&conn->c_lock);
1173 	list_insert_tail(&conn->c_send_queue, rm);
1174 	set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
1175 	rdsv3_message_addref(rm);
1176 	rm->m_inc.i_conn = conn;
1177 
1178 	rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport,
1179 	    conn->c_next_tx_seq);
1180 	conn->c_next_tx_seq++;
1181 	mutex_exit(&conn->c_lock);
1182 
1183 	rdsv3_stats_inc(s_send_queued);
1184 	rdsv3_stats_inc(s_send_pong);
1185 
1186 	if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
1187 		(void) rdsv3_send_xmit(conn);
1188 
1189 	rdsv3_message_put(rm);
1190 
1191 	RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn);
1192 	return (0);
1193 
1194 out:
1195 	if (rm)
1196 		rdsv3_message_put(rm);
1197 	return (ret);
1198 }
1199