1 /*
2 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
3 */
4
5 /*
6 * This file contains code imported from the OFED rds source file send.c
7 * Oracle elects to have and use the contents of send.c under and governed
8 * by the OpenIB.org BSD license (see below for full license text). However,
9 * the following notice accompanied the original version of this file:
10 */
11
12 /*
13 * Copyright (c) 2006 Oracle. All rights reserved.
14 *
15 * This software is available to you under a choice of one of two
16 * licenses. You may choose to be licensed under the terms of the GNU
17 * General Public License (GPL) Version 2, available from the file
18 * COPYING in the main directory of this source tree, or the
19 * OpenIB.org BSD license below:
20 *
21 * Redistribution and use in source and binary forms, with or
22 * without modification, are permitted provided that the following
23 * conditions are met:
24 *
25 * - Redistributions of source code must retain the above
26 * copyright notice, this list of conditions and the following
27 * disclaimer.
28 *
29 * - Redistributions in binary form must reproduce the above
30 * copyright notice, this list of conditions and the following
31 * disclaimer in the documentation and/or other materials
32 * provided with the distribution.
33 *
34 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
35 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
36 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
37 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
38 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
39 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
40 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
41 * SOFTWARE.
42 *
43 */
44 #include <sys/stropts.h>
45 #include <sys/systm.h>
46
47 #include <sys/rds.h>
48 #include <sys/socket.h>
49 #include <sys/socketvar.h>
50
51 #include <sys/ib/clients/rdsv3/rdsv3.h>
52 #include <sys/ib/clients/rdsv3/rdma.h>
53 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
54
55 /*
56 * When transmitting messages in rdsv3_send_xmit, we need to emerge from
57 * time to time and briefly release the CPU. Otherwise the softlock watchdog
58 * will kick our shin.
59 * Also, it seems fairer to not let one busy connection stall all the
60 * others.
61 *
62 * send_batch_count is the number of times we'll loop in send_xmit. Setting
63 * it to 0 will restore the old behavior (where we looped until we had
64 * drained the queue).
65 */
66 static int send_batch_count = 64;
67
68 extern void rdsv3_ib_send_unmap_rdma(void *ic, struct rdsv3_rdma_op *op);
69 /*
70 * Reset the send state. Caller must hold c_send_lock when calling here.
71 */
72 void
rdsv3_send_reset(struct rdsv3_connection * conn)73 rdsv3_send_reset(struct rdsv3_connection *conn)
74 {
75 struct rdsv3_message *rm, *tmp;
76 struct rdsv3_rdma_op *ro;
77
78 RDSV3_DPRINTF4("rdsv3_send_reset", "Enter(conn: %p)", conn);
79
80 ASSERT(MUTEX_HELD(&conn->c_send_lock));
81
82 if (conn->c_xmit_rm) {
83 rm = conn->c_xmit_rm;
84 ro = rm->m_rdma_op;
85 if (ro && ro->r_mapped) {
86 RDSV3_DPRINTF2("rdsv3_send_reset",
87 "rm %p mflg 0x%x map %d mihdl %p sgl %p",
88 rm, rm->m_flags, ro->r_mapped,
89 ro->r_rdma_sg[0].mihdl,
90 ro->r_rdma_sg[0].swr.wr_sgl);
91 rdsv3_ib_send_unmap_rdma(conn->c_transport_data, ro);
92 }
93 /*
94 * Tell the user the RDMA op is no longer mapped by the
95 * transport. This isn't entirely true (it's flushed out
96 * independently) but as the connection is down, there's
97 * no ongoing RDMA to/from that memory
98 */
99 rdsv3_message_unmapped(conn->c_xmit_rm);
100 rdsv3_message_put(conn->c_xmit_rm);
101 conn->c_xmit_rm = NULL;
102 }
103
104 conn->c_xmit_sg = 0;
105 conn->c_xmit_hdr_off = 0;
106 conn->c_xmit_data_off = 0;
107 conn->c_xmit_rdma_sent = 0;
108 conn->c_map_queued = 0;
109
110 conn->c_unacked_packets = rdsv3_sysctl_max_unacked_packets;
111 conn->c_unacked_bytes = rdsv3_sysctl_max_unacked_bytes;
112
113 /* Mark messages as retransmissions, and move them to the send q */
114 mutex_enter(&conn->c_lock);
115 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
116 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
117 set_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags);
118 if (rm->m_rdma_op && rm->m_rdma_op->r_mapped) {
119 RDSV3_DPRINTF4("_send_reset",
120 "RT rm %p mflg 0x%x sgl %p",
121 rm, rm->m_flags,
122 rm->m_rdma_op->r_rdma_sg[0].swr.wr_sgl);
123 }
124 }
125 list_move_tail(&conn->c_send_queue, &conn->c_retrans);
126 mutex_exit(&conn->c_lock);
127
128 RDSV3_DPRINTF4("rdsv3_send_reset", "Return(conn: %p)", conn);
129 }
130
131 /*
132 * We're making the concious trade-off here to only send one message
133 * down the connection at a time.
134 * Pro:
135 * - tx queueing is a simple fifo list
136 * - reassembly is optional and easily done by transports per conn
137 * - no per flow rx lookup at all, straight to the socket
138 * - less per-frag memory and wire overhead
139 * Con:
140 * - queued acks can be delayed behind large messages
141 * Depends:
142 * - small message latency is higher behind queued large messages
143 * - large message latency isn't starved by intervening small sends
144 */
145 int
rdsv3_send_xmit(struct rdsv3_connection * conn)146 rdsv3_send_xmit(struct rdsv3_connection *conn)
147 {
148 struct rdsv3_message *rm;
149 unsigned int tmp;
150 unsigned int send_quota = send_batch_count;
151 struct rdsv3_scatterlist *sg;
152 int ret = 0;
153 int was_empty = 0;
154 list_t to_be_dropped;
155
156 if (!rdsv3_conn_up(conn))
157 goto out;
158
159 RDSV3_DPRINTF4("rdsv3_send_xmit", "Enter(conn: %p)", conn);
160
161 list_create(&to_be_dropped, sizeof (struct rdsv3_message),
162 offsetof(struct rdsv3_message, m_conn_item));
163
164 /*
165 * sendmsg calls here after having queued its message on the send
166 * queue. We only have one task feeding the connection at a time. If
167 * another thread is already feeding the queue then we back off. This
168 * avoids blocking the caller and trading per-connection data between
169 * caches per message.
170 */
171 if (!mutex_tryenter(&conn->c_send_lock)) {
172 RDSV3_DPRINTF4("rdsv3_send_xmit",
173 "Another thread running(conn: %p)", conn);
174 rdsv3_stats_inc(s_send_sem_contention);
175 ret = -ENOMEM;
176 goto out;
177 }
178 atomic_inc_32(&conn->c_senders);
179
180 if (conn->c_trans->xmit_prepare)
181 conn->c_trans->xmit_prepare(conn);
182
183 /*
184 * spin trying to push headers and data down the connection until
185 * the connection doesn't make forward progress.
186 */
187 while (--send_quota) {
188 /*
189 * See if need to send a congestion map update if we're
190 * between sending messages. The send_sem protects our sole
191 * use of c_map_offset and _bytes.
192 * Note this is used only by transports that define a special
193 * xmit_cong_map function. For all others, we create allocate
194 * a cong_map message and treat it just like any other send.
195 */
196 if (conn->c_map_bytes) {
197 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
198 conn->c_map_offset);
199 if (ret <= 0)
200 break;
201
202 conn->c_map_offset += ret;
203 conn->c_map_bytes -= ret;
204 if (conn->c_map_bytes)
205 continue;
206 }
207
208 /*
209 * If we're done sending the current message, clear the
210 * offset and S/G temporaries.
211 */
212 rm = conn->c_xmit_rm;
213 if (rm != NULL &&
214 conn->c_xmit_hdr_off == sizeof (struct rdsv3_header) &&
215 conn->c_xmit_sg == rm->m_nents) {
216 conn->c_xmit_rm = NULL;
217 conn->c_xmit_sg = 0;
218 conn->c_xmit_hdr_off = 0;
219 conn->c_xmit_data_off = 0;
220 conn->c_xmit_rdma_sent = 0;
221
222 /* Release the reference to the previous message. */
223 rdsv3_message_put(rm);
224 rm = NULL;
225 }
226
227 /* If we're asked to send a cong map update, do so. */
228 if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) {
229 if (conn->c_trans->xmit_cong_map != NULL) {
230 conn->c_map_offset = 0;
231 conn->c_map_bytes =
232 sizeof (struct rdsv3_header) +
233 RDSV3_CONG_MAP_BYTES;
234 continue;
235 }
236
237 rm = rdsv3_cong_update_alloc(conn);
238 if (IS_ERR(rm)) {
239 ret = PTR_ERR(rm);
240 break;
241 }
242
243 conn->c_xmit_rm = rm;
244 }
245
246 /*
247 * Grab the next message from the send queue, if there is one.
248 *
249 * c_xmit_rm holds a ref while we're sending this message down
250 * the connction. We can use this ref while holding the
251 * send_sem.. rdsv3_send_reset() is serialized with it.
252 */
253 if (rm == NULL) {
254 unsigned int len;
255
256 mutex_enter(&conn->c_lock);
257
258 if (!list_is_empty(&conn->c_send_queue)) {
259 rm = list_remove_head(&conn->c_send_queue);
260 rdsv3_message_addref(rm);
261
262 /*
263 * Move the message from the send queue to
264 * the retransmit
265 * list right away.
266 */
267 list_insert_tail(&conn->c_retrans, rm);
268 }
269
270 mutex_exit(&conn->c_lock);
271
272 if (rm == NULL) {
273 was_empty = 1;
274 break;
275 }
276
277 /*
278 * Unfortunately, the way Infiniband deals with
279 * RDMA to a bad MR key is by moving the entire
280 * queue pair to error state. We cold possibly
281 * recover from that, but right now we drop the
282 * connection.
283 * Therefore, we never retransmit messages with
284 * RDMA ops.
285 */
286 if (rm->m_rdma_op &&
287 test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags)) {
288 mutex_enter(&conn->c_lock);
289 if (test_and_clear_bit(RDSV3_MSG_ON_CONN,
290 &rm->m_flags))
291 list_remove_node(&rm->m_conn_item);
292 list_insert_tail(&to_be_dropped, rm);
293 mutex_exit(&conn->c_lock);
294 rdsv3_message_put(rm);
295 continue;
296 }
297
298 /* Require an ACK every once in a while */
299 len = ntohl(rm->m_inc.i_hdr.h_len);
300 if (conn->c_unacked_packets == 0 ||
301 conn->c_unacked_bytes < len) {
302 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
303
304 conn->c_unacked_packets =
305 rdsv3_sysctl_max_unacked_packets;
306 conn->c_unacked_bytes =
307 rdsv3_sysctl_max_unacked_bytes;
308 rdsv3_stats_inc(s_send_ack_required);
309 } else {
310 conn->c_unacked_bytes -= len;
311 conn->c_unacked_packets--;
312 }
313
314 conn->c_xmit_rm = rm;
315 }
316
317 /*
318 * Try and send an rdma message. Let's see if we can
319 * keep this simple and require that the transport either
320 * send the whole rdma or none of it.
321 */
322 if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) {
323 ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op);
324 if (ret)
325 break;
326 conn->c_xmit_rdma_sent = 1;
327 /*
328 * The transport owns the mapped memory for now.
329 * You can't unmap it while it's on the send queue
330 */
331 set_bit(RDSV3_MSG_MAPPED, &rm->m_flags);
332 }
333
334 if (conn->c_xmit_hdr_off < sizeof (struct rdsv3_header) ||
335 conn->c_xmit_sg < rm->m_nents) {
336 ret = conn->c_trans->xmit(conn, rm,
337 conn->c_xmit_hdr_off,
338 conn->c_xmit_sg,
339 conn->c_xmit_data_off);
340 if (ret <= 0)
341 break;
342
343 if (conn->c_xmit_hdr_off <
344 sizeof (struct rdsv3_header)) {
345 tmp = min(ret,
346 sizeof (struct rdsv3_header) -
347 conn->c_xmit_hdr_off);
348 conn->c_xmit_hdr_off += tmp;
349 ret -= tmp;
350 }
351
352 sg = &rm->m_sg[conn->c_xmit_sg];
353 while (ret) {
354 tmp = min(ret, rdsv3_sg_len(sg) -
355 conn->c_xmit_data_off);
356 conn->c_xmit_data_off += tmp;
357 ret -= tmp;
358 if (conn->c_xmit_data_off == rdsv3_sg_len(sg)) {
359 conn->c_xmit_data_off = 0;
360 sg++;
361 conn->c_xmit_sg++;
362 ASSERT(!(ret != 0 &&
363 conn->c_xmit_sg == rm->m_nents));
364 }
365 }
366 }
367 }
368
369 /* Nuke any messages we decided not to retransmit. */
370 if (!list_is_empty(&to_be_dropped))
371 rdsv3_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
372
373 if (conn->c_trans->xmit_complete)
374 conn->c_trans->xmit_complete(conn);
375
376 /*
377 * We might be racing with another sender who queued a message but
378 * backed off on noticing that we held the c_send_lock. If we check
379 * for queued messages after dropping the sem then either we'll
380 * see the queued message or the queuer will get the sem. If we
381 * notice the queued message then we trigger an immediate retry.
382 *
383 * We need to be careful only to do this when we stopped processing
384 * the send queue because it was empty. It's the only way we
385 * stop processing the loop when the transport hasn't taken
386 * responsibility for forward progress.
387 */
388 mutex_exit(&conn->c_send_lock);
389
390 if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) {
391 /*
392 * We exhausted the send quota, but there's work left to
393 * do. Return and (re-)schedule the send worker.
394 */
395 ret = -EAGAIN;
396 }
397
398 atomic_dec_32(&conn->c_senders);
399
400 if (ret == 0 && was_empty) {
401 /*
402 * A simple bit test would be way faster than taking the
403 * spin lock
404 */
405 mutex_enter(&conn->c_lock);
406 if (!list_is_empty(&conn->c_send_queue)) {
407 rdsv3_stats_inc(s_send_sem_queue_raced);
408 ret = -EAGAIN;
409 }
410 mutex_exit(&conn->c_lock);
411 }
412
413 out:
414 RDSV3_DPRINTF4("rdsv3_send_xmit", "Return(conn: %p, ret: %d)",
415 conn, ret);
416 return (ret);
417 }
418
419 static void
rdsv3_send_sndbuf_remove(struct rdsv3_sock * rs,struct rdsv3_message * rm)420 rdsv3_send_sndbuf_remove(struct rdsv3_sock *rs, struct rdsv3_message *rm)
421 {
422 uint32_t len = ntohl(rm->m_inc.i_hdr.h_len);
423
424 ASSERT(mutex_owned(&rs->rs_lock));
425
426 ASSERT(rs->rs_snd_bytes >= len);
427 rs->rs_snd_bytes -= len;
428
429 if (rs->rs_snd_bytes == 0)
430 rdsv3_stats_inc(s_send_queue_empty);
431 }
432
433 static inline int
rdsv3_send_is_acked(struct rdsv3_message * rm,uint64_t ack,is_acked_func is_acked)434 rdsv3_send_is_acked(struct rdsv3_message *rm, uint64_t ack,
435 is_acked_func is_acked)
436 {
437 if (is_acked)
438 return (is_acked(rm, ack));
439 return (ntohll(rm->m_inc.i_hdr.h_sequence) <= ack);
440 }
441
442 /*
443 * Returns true if there are no messages on the send and retransmit queues
444 * which have a sequence number greater than or equal to the given sequence
445 * number.
446 */
447 int
rdsv3_send_acked_before(struct rdsv3_connection * conn,uint64_t seq)448 rdsv3_send_acked_before(struct rdsv3_connection *conn, uint64_t seq)
449 {
450 struct rdsv3_message *rm;
451 int ret = 1;
452
453 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Enter(conn: %p)", conn);
454
455 mutex_enter(&conn->c_lock);
456
457 /* XXX - original code spits out warning */
458 rm = list_head(&conn->c_retrans);
459 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq)
460 ret = 0;
461
462 /* XXX - original code spits out warning */
463 rm = list_head(&conn->c_send_queue);
464 if (ntohll(rm->m_inc.i_hdr.h_sequence) < seq)
465 ret = 0;
466
467 mutex_exit(&conn->c_lock);
468
469 RDSV3_DPRINTF4("rdsv3_send_acked_before", "Return(conn: %p)", conn);
470
471 return (ret);
472 }
473
474 /*
475 * This is pretty similar to what happens below in the ACK
476 * handling code - except that we call here as soon as we get
477 * the IB send completion on the RDMA op and the accompanying
478 * message.
479 */
480 void
rdsv3_rdma_send_complete(struct rdsv3_message * rm,int status)481 rdsv3_rdma_send_complete(struct rdsv3_message *rm, int status)
482 {
483 struct rdsv3_sock *rs = NULL;
484 struct rdsv3_rdma_op *ro;
485 struct rdsv3_notifier *notifier;
486
487 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Enter(rm: %p)", rm);
488
489 mutex_enter(&rm->m_rs_lock);
490
491 ro = rm->m_rdma_op;
492 if (test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags) &&
493 ro && ro->r_notify && ro->r_notifier) {
494 notifier = ro->r_notifier;
495 rs = rm->m_rs;
496 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs));
497
498 notifier->n_status = status;
499 mutex_enter(&rs->rs_lock);
500 list_insert_tail(&rs->rs_notify_queue, notifier);
501 mutex_exit(&rs->rs_lock);
502 ro->r_notifier = NULL;
503 }
504
505 mutex_exit(&rm->m_rs_lock);
506
507 if (rs) {
508 struct rsock *sk = rdsv3_rs_to_sk(rs);
509 int error;
510
511 rdsv3_wake_sk_sleep(rs);
512
513 /* wake up anyone waiting in poll */
514 sk->sk_upcalls->su_recv(sk->sk_upper_handle, NULL,
515 0, 0, &error, NULL);
516 if (error != 0) {
517 RDSV3_DPRINTF2("rdsv3_recv_incoming",
518 "su_recv returned: %d", error);
519 }
520
521 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
522 }
523
524 RDSV3_DPRINTF4("rdsv3_rdma_send_complete", "Return(rm: %p)", rm);
525 }
526
527 /*
528 * This is the same as rdsv3_rdma_send_complete except we
529 * don't do any locking - we have all the ingredients (message,
530 * socket, socket lock) and can just move the notifier.
531 */
532 static inline void
__rdsv3_rdma_send_complete(struct rdsv3_sock * rs,struct rdsv3_message * rm,int status)533 __rdsv3_rdma_send_complete(struct rdsv3_sock *rs, struct rdsv3_message *rm,
534 int status)
535 {
536 struct rdsv3_rdma_op *ro;
537 void *ic;
538
539 RDSV3_DPRINTF4("__rdsv3_rdma_send_complete",
540 "Enter(rs: %p, rm: %p)", rs, rm);
541
542 ro = rm->m_rdma_op;
543 if (ro && ro->r_notify && ro->r_notifier) {
544 ro->r_notifier->n_status = status;
545 list_insert_tail(&rs->rs_notify_queue, ro->r_notifier);
546 ro->r_notifier = NULL;
547 }
548
549 /* No need to wake the app - caller does this */
550 }
551
552 /*
553 * This is called from the IB send completion when we detect
554 * a RDMA operation that failed with remote access error.
555 * So speed is not an issue here.
556 */
557 struct rdsv3_message *
rdsv3_send_get_message(struct rdsv3_connection * conn,struct rdsv3_rdma_op * op)558 rdsv3_send_get_message(struct rdsv3_connection *conn,
559 struct rdsv3_rdma_op *op)
560 {
561 struct rdsv3_message *rm, *tmp, *found = NULL;
562
563 RDSV3_DPRINTF4("rdsv3_send_get_message", "Enter(conn: %p)", conn);
564
565 mutex_enter(&conn->c_lock);
566
567 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
568 if (rm->m_rdma_op == op) {
569 atomic_inc_32(&rm->m_refcount);
570 found = rm;
571 goto out;
572 }
573 }
574
575 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_send_queue,
576 m_conn_item) {
577 if (rm->m_rdma_op == op) {
578 atomic_inc_32(&rm->m_refcount);
579 found = rm;
580 break;
581 }
582 }
583
584 out:
585 mutex_exit(&conn->c_lock);
586
587 return (found);
588 }
589
590 /*
591 * This removes messages from the socket's list if they're on it. The list
592 * argument must be private to the caller, we must be able to modify it
593 * without locks. The messages must have a reference held for their
594 * position on the list. This function will drop that reference after
595 * removing the messages from the 'messages' list regardless of if it found
596 * the messages on the socket list or not.
597 */
598 void
rdsv3_send_remove_from_sock(struct list * messages,int status)599 rdsv3_send_remove_from_sock(struct list *messages, int status)
600 {
601 struct rdsv3_sock *rs = NULL;
602 struct rdsv3_message *rm;
603
604 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Enter");
605
606 while (!list_is_empty(messages)) {
607 int was_on_sock = 0;
608 rm = list_remove_head(messages);
609
610 /*
611 * If we see this flag cleared then we're *sure* that someone
612 * else beat us to removing it from the sock. If we race
613 * with their flag update we'll get the lock and then really
614 * see that the flag has been cleared.
615 *
616 * The message spinlock makes sure nobody clears rm->m_rs
617 * while we're messing with it. It does not prevent the
618 * message from being removed from the socket, though.
619 */
620 mutex_enter(&rm->m_rs_lock);
621 if (!test_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags))
622 goto unlock_and_drop;
623
624 if (rs != rm->m_rs) {
625 if (rs) {
626 rdsv3_wake_sk_sleep(rs);
627 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
628 }
629 rs = rm->m_rs;
630 rdsv3_sk_sock_hold(rdsv3_rs_to_sk(rs));
631 }
632
633 mutex_enter(&rs->rs_lock);
634 if (test_and_clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags)) {
635 struct rdsv3_rdma_op *ro = rm->m_rdma_op;
636 struct rdsv3_notifier *notifier;
637
638 list_remove_node(&rm->m_sock_item);
639 rdsv3_send_sndbuf_remove(rs, rm);
640 if (ro && ro->r_notifier &&
641 (status || ro->r_notify)) {
642 notifier = ro->r_notifier;
643 list_insert_tail(&rs->rs_notify_queue,
644 notifier);
645 if (!notifier->n_status)
646 notifier->n_status = status;
647 rm->m_rdma_op->r_notifier = NULL;
648 }
649 was_on_sock = 1;
650 rm->m_rs = NULL;
651 }
652 mutex_exit(&rs->rs_lock);
653
654 unlock_and_drop:
655 mutex_exit(&rm->m_rs_lock);
656 rdsv3_message_put(rm);
657 if (was_on_sock)
658 rdsv3_message_put(rm);
659 }
660
661 if (rs) {
662 rdsv3_wake_sk_sleep(rs);
663 rdsv3_sk_sock_put(rdsv3_rs_to_sk(rs));
664 }
665
666 RDSV3_DPRINTF4("rdsv3_send_remove_from_sock", "Return");
667 }
668
669 /*
670 * Transports call here when they've determined that the receiver queued
671 * messages up to, and including, the given sequence number. Messages are
672 * moved to the retrans queue when rdsv3_send_xmit picks them off the send
673 * queue. This means that in the TCP case, the message may not have been
674 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
675 * checks the RDSV3_MSG_HAS_ACK_SEQ bit.
676 *
677 * XXX It's not clear to me how this is safely serialized with socket
678 * destruction. Maybe it should bail if it sees SOCK_DEAD.
679 */
680 void
rdsv3_send_drop_acked(struct rdsv3_connection * conn,uint64_t ack,is_acked_func is_acked)681 rdsv3_send_drop_acked(struct rdsv3_connection *conn, uint64_t ack,
682 is_acked_func is_acked)
683 {
684 struct rdsv3_message *rm, *tmp;
685 list_t list;
686
687 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Enter(conn: %p)", conn);
688
689 list_create(&list, sizeof (struct rdsv3_message),
690 offsetof(struct rdsv3_message, m_conn_item));
691
692 mutex_enter(&conn->c_lock);
693
694 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &conn->c_retrans, m_conn_item) {
695 if (!rdsv3_send_is_acked(rm, ack, is_acked))
696 break;
697
698 list_remove_node(&rm->m_conn_item);
699 list_insert_tail(&list, rm);
700 clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
701 }
702
703 #if 0
704 XXX
705 /* order flag updates with spin locks */
706 if (!list_is_empty(&list))
707 smp_mb__after_clear_bit();
708 #endif
709
710 mutex_exit(&conn->c_lock);
711
712 /* now remove the messages from the sock list as needed */
713 rdsv3_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
714
715 RDSV3_DPRINTF4("rdsv3_send_drop_acked", "Return(conn: %p)", conn);
716 }
717
718 void
rdsv3_send_drop_to(struct rdsv3_sock * rs,struct sockaddr_in * dest)719 rdsv3_send_drop_to(struct rdsv3_sock *rs, struct sockaddr_in *dest)
720 {
721 struct rdsv3_message *rm, *tmp;
722 struct rdsv3_connection *conn;
723 list_t list;
724 int wake = 0;
725
726 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Enter(rs: %p)", rs);
727
728 list_create(&list, sizeof (struct rdsv3_message),
729 offsetof(struct rdsv3_message, m_sock_item));
730
731 /* get all the messages we're dropping under the rs lock */
732 mutex_enter(&rs->rs_lock);
733
734 RDSV3_FOR_EACH_LIST_NODE_SAFE(rm, tmp, &rs->rs_send_queue,
735 m_sock_item) {
736 if (dest && (dest->sin_addr.s_addr != rm->m_daddr ||
737 dest->sin_port != rm->m_inc.i_hdr.h_dport))
738 continue;
739 wake = 1;
740 list_remove(&rs->rs_send_queue, rm);
741 list_insert_tail(&list, rm);
742 rdsv3_send_sndbuf_remove(rs, rm);
743 clear_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags);
744 }
745
746 mutex_exit(&rs->rs_lock);
747
748 conn = NULL;
749
750 /* now remove the messages from the conn list as needed */
751 RDSV3_FOR_EACH_LIST_NODE(rm, &list, m_sock_item) {
752 /*
753 * We do this here rather than in the loop above, so that
754 * we don't have to nest m_rs_lock under rs->rs_lock
755 */
756 mutex_enter(&rm->m_rs_lock);
757 /* If this is a RDMA operation, notify the app. */
758 __rdsv3_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
759 rm->m_rs = NULL;
760 mutex_exit(&rm->m_rs_lock);
761
762 /*
763 * If we see this flag cleared then we're *sure* that someone
764 * else beat us to removing it from the conn. If we race
765 * with their flag update we'll get the lock and then really
766 * see that the flag has been cleared.
767 */
768 if (!test_bit(RDSV3_MSG_ON_CONN, &rm->m_flags))
769 continue;
770
771 if (conn != rm->m_inc.i_conn) {
772 if (conn)
773 mutex_exit(&conn->c_lock);
774 conn = rm->m_inc.i_conn;
775 mutex_enter(&conn->c_lock);
776 }
777
778 if (test_and_clear_bit(RDSV3_MSG_ON_CONN, &rm->m_flags)) {
779 list_remove_node(&rm->m_conn_item);
780 rdsv3_message_put(rm);
781 }
782 }
783
784 if (conn)
785 mutex_exit(&conn->c_lock);
786
787 if (wake)
788 rdsv3_wake_sk_sleep(rs);
789
790 while (!list_is_empty(&list)) {
791 rm = list_remove_head(&list);
792
793 rdsv3_message_wait(rm);
794 rdsv3_message_put(rm);
795 }
796
797 RDSV3_DPRINTF4("rdsv3_send_drop_to", "Return(rs: %p)", rs);
798 }
799
800 /*
801 * we only want this to fire once so we use the callers 'queued'. It's
802 * possible that another thread can race with us and remove the
803 * message from the flow with RDSV3_CANCEL_SENT_TO.
804 */
805 static int
rdsv3_send_queue_rm(struct rdsv3_sock * rs,struct rdsv3_connection * conn,struct rdsv3_message * rm,uint16_be_t sport,uint16_be_t dport,int * queued)806 rdsv3_send_queue_rm(struct rdsv3_sock *rs, struct rdsv3_connection *conn,
807 struct rdsv3_message *rm, uint16_be_t sport,
808 uint16_be_t dport, int *queued)
809 {
810 uint32_t len;
811
812 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Enter(rs: %p, rm: %p)", rs, rm);
813
814 if (*queued)
815 goto out;
816
817 len = ntohl(rm->m_inc.i_hdr.h_len);
818
819 /*
820 * this is the only place which holds both the socket's rs_lock
821 * and the connection's c_lock
822 */
823 mutex_enter(&rs->rs_lock);
824
825 /*
826 * If there is a little space in sndbuf, we don't queue anything,
827 * and userspace gets -EAGAIN. But poll() indicates there's send
828 * room. This can lead to bad behavior (spinning) if snd_bytes isn't
829 * freed up by incoming acks. So we check the *old* value of
830 * rs_snd_bytes here to allow the last msg to exceed the buffer,
831 * and poll() now knows no more data can be sent.
832 */
833 if (rs->rs_snd_bytes < rdsv3_sk_sndbuf(rs)) {
834 rs->rs_snd_bytes += len;
835
836 /*
837 * let recv side know we are close to send space exhaustion.
838 * This is probably not the optimal way to do it, as this
839 * means we set the flag on *all* messages as soon as our
840 * throughput hits a certain threshold.
841 */
842 if (rs->rs_snd_bytes >= rdsv3_sk_sndbuf(rs) / 2)
843 set_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags);
844
845 list_insert_tail(&rs->rs_send_queue, rm);
846 set_bit(RDSV3_MSG_ON_SOCK, &rm->m_flags);
847
848 rdsv3_message_addref(rm);
849 rm->m_rs = rs;
850
851 /*
852 * The code ordering is a little weird, but we're
853 * trying to minimize the time we hold c_lock
854 */
855 rdsv3_message_populate_header(&rm->m_inc.i_hdr, sport,
856 dport, 0);
857 rm->m_inc.i_conn = conn;
858 rdsv3_message_addref(rm); /* XXX - called twice */
859
860 mutex_enter(&conn->c_lock);
861 rm->m_inc.i_hdr.h_sequence = htonll(conn->c_next_tx_seq++);
862 list_insert_tail(&conn->c_send_queue, rm);
863 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
864 mutex_exit(&conn->c_lock);
865
866 RDSV3_DPRINTF5("rdsv3_send_queue_rm",
867 "queued msg %p len %d, rs %p bytes %d seq %llu",
868 rm, len, rs, rs->rs_snd_bytes,
869 (unsigned long long)ntohll(
870 rm->m_inc.i_hdr.h_sequence));
871
872 *queued = 1;
873 }
874
875 mutex_exit(&rs->rs_lock);
876
877 RDSV3_DPRINTF4("rdsv3_send_queue_rm", "Return(rs: %p)", rs);
878 out:
879 return (*queued);
880 }
881
882 static int
rdsv3_cmsg_send(struct rdsv3_sock * rs,struct rdsv3_message * rm,struct msghdr * msg,int * allocated_mr)883 rdsv3_cmsg_send(struct rdsv3_sock *rs, struct rdsv3_message *rm,
884 struct msghdr *msg, int *allocated_mr)
885 {
886 struct cmsghdr *cmsg;
887 int ret = 0;
888
889 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Enter(rs: %p)", rs);
890
891 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
892
893 if (cmsg->cmsg_level != SOL_RDS)
894 continue;
895
896 RDSV3_DPRINTF4("rdsv3_cmsg_send", "cmsg(%p, %p) type %d",
897 cmsg, rm, cmsg->cmsg_type);
898 /*
899 * As a side effect, RDMA_DEST and RDMA_MAP will set
900 * rm->m_rdma_cookie and rm->m_rdma_mr.
901 */
902 switch (cmsg->cmsg_type) {
903 case RDS_CMSG_RDMA_ARGS:
904 ret = rdsv3_cmsg_rdma_args(rs, rm, cmsg);
905 break;
906
907 case RDS_CMSG_RDMA_DEST:
908 ret = rdsv3_cmsg_rdma_dest(rs, rm, cmsg);
909 break;
910
911 case RDS_CMSG_RDMA_MAP:
912 ret = rdsv3_cmsg_rdma_map(rs, rm, cmsg);
913 if (ret)
914 *allocated_mr = 1;
915 break;
916
917 default:
918 return (-EINVAL);
919 }
920
921 if (ret)
922 break;
923 }
924
925 RDSV3_DPRINTF4("rdsv3_cmsg_send", "Return(rs: %p)", rs);
926
927 return (ret);
928 }
929
930 extern unsigned long rdsv3_max_bcopy_size;
931
932 int
rdsv3_sendmsg(struct rdsv3_sock * rs,uio_t * uio,struct nmsghdr * msg,size_t payload_len)933 rdsv3_sendmsg(struct rdsv3_sock *rs, uio_t *uio, struct nmsghdr *msg,
934 size_t payload_len)
935 {
936 struct rsock *sk = rdsv3_rs_to_sk(rs);
937 struct sockaddr_in *usin = (struct sockaddr_in *)msg->msg_name;
938 uint32_be_t daddr;
939 uint16_be_t dport;
940 struct rdsv3_message *rm = NULL;
941 struct rdsv3_connection *conn;
942 int ret = 0;
943 int queued = 0, allocated_mr = 0;
944 int nonblock = msg->msg_flags & MSG_DONTWAIT;
945 long timeo = rdsv3_sndtimeo(sk, nonblock);
946
947 RDSV3_DPRINTF4("rdsv3_sendmsg", "Enter(rs: %p)", rs);
948
949 if (msg->msg_namelen) {
950 /* XXX fail non-unicast destination IPs? */
951 if (msg->msg_namelen < sizeof (*usin) ||
952 usin->sin_family != AF_INET_OFFLOAD) {
953 ret = -EINVAL;
954 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret);
955 goto out;
956 }
957 daddr = usin->sin_addr.s_addr;
958 dport = usin->sin_port;
959 } else {
960 /* We only care about consistency with ->connect() */
961 mutex_enter(&sk->sk_lock);
962 daddr = rs->rs_conn_addr;
963 dport = rs->rs_conn_port;
964 mutex_exit(&sk->sk_lock);
965 }
966
967 /* racing with another thread binding seems ok here */
968 if (daddr == 0 || rs->rs_bound_addr == 0) {
969 ret = -ENOTCONN; /* XXX not a great errno */
970 RDSV3_DPRINTF2("rdsv3_sendmsg", "returning: %d", -ret);
971 goto out;
972 }
973
974 if (payload_len > rdsv3_max_bcopy_size) {
975 RDSV3_DPRINTF2("rdsv3_sendmsg", "Message too large: %d",
976 payload_len);
977 ret = -EMSGSIZE;
978 goto out;
979 }
980
981 rm = rdsv3_message_copy_from_user(uio, payload_len);
982 if (IS_ERR(rm)) {
983 ret = PTR_ERR(rm);
984 RDSV3_DPRINTF2("rdsv3_sendmsg",
985 "rdsv3_message_copy_from_user failed %d", -ret);
986 rm = NULL;
987 goto out;
988 }
989
990 rm->m_daddr = daddr;
991
992 /* Parse any control messages the user may have included. */
993 ret = rdsv3_cmsg_send(rs, rm, msg, &allocated_mr);
994 if (ret) {
995 RDSV3_DPRINTF2("rdsv3_sendmsg",
996 "rdsv3_cmsg_send(rs: %p rm: %p msg: %p) returned: %d",
997 rs, rm, msg, ret);
998 goto out;
999 }
1000
1001 /*
1002 * rdsv3_conn_create has a spinlock that runs with IRQ off.
1003 * Caching the conn in the socket helps a lot.
1004 */
1005 mutex_enter(&rs->rs_conn_lock);
1006 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) {
1007 conn = rs->rs_conn;
1008 } else {
1009 conn = rdsv3_conn_create_outgoing(rs->rs_bound_addr,
1010 daddr, rs->rs_transport, KM_NOSLEEP);
1011 if (IS_ERR(conn)) {
1012 mutex_exit(&rs->rs_conn_lock);
1013 ret = PTR_ERR(conn);
1014 RDSV3_DPRINTF2("rdsv3_sendmsg",
1015 "rdsv3_conn_create_outgoing failed %d",
1016 -ret);
1017 goto out;
1018 }
1019 rs->rs_conn = conn;
1020 }
1021 mutex_exit(&rs->rs_conn_lock);
1022
1023 if ((rm->m_rdma_cookie || rm->m_rdma_op) &&
1024 conn->c_trans->xmit_rdma == NULL) {
1025 RDSV3_DPRINTF2("rdsv3_sendmsg", "rdma_op %p conn xmit_rdma %p",
1026 rm->m_rdma_op, conn->c_trans->xmit_rdma);
1027 ret = -EOPNOTSUPP;
1028 goto out;
1029 }
1030
1031 /*
1032 * If the connection is down, trigger a connect. We may
1033 * have scheduled a delayed reconnect however - in this case
1034 * we should not interfere.
1035 */
1036 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN &&
1037 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags))
1038 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0);
1039
1040 ret = rdsv3_cong_wait(conn->c_fcong, dport, nonblock, rs);
1041 if (ret) {
1042 mutex_enter(&rs->rs_congested_lock);
1043 rs->rs_seen_congestion = 1;
1044 cv_signal(&rs->rs_congested_cv);
1045 mutex_exit(&rs->rs_congested_lock);
1046
1047 RDSV3_DPRINTF2("rdsv3_sendmsg",
1048 "rdsv3_cong_wait (dport: %d) returned: %d", dport, ret);
1049 goto out;
1050 }
1051
1052 (void) rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport,
1053 &queued);
1054 if (!queued) {
1055 /* rdsv3_stats_inc(s_send_queue_full); */
1056 /* XXX make sure this is reasonable */
1057 if (payload_len > rdsv3_sk_sndbuf(rs)) {
1058 ret = -EMSGSIZE;
1059 RDSV3_DPRINTF2("rdsv3_sendmsg",
1060 "msgsize(%d) too big, returning: %d",
1061 payload_len, -ret);
1062 goto out;
1063 }
1064 if (nonblock) {
1065 ret = -EAGAIN;
1066 RDSV3_DPRINTF3("rdsv3_sendmsg",
1067 "send queue full (%d), returning: %d",
1068 payload_len, -ret);
1069 goto out;
1070 }
1071
1072 #if 0
1073 ret = rdsv3_wait_sig(sk->sk_sleep,
1074 (rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
1075 dport, &queued)));
1076 if (ret == 0) {
1077 /* signal/timeout pending */
1078 RDSV3_DPRINTF2("rdsv3_sendmsg",
1079 "woke due to signal: %d", ret);
1080 ret = -ERESTART;
1081 goto out;
1082 }
1083 #else
1084 mutex_enter(&sk->sk_sleep->waitq_mutex);
1085 sk->sk_sleep->waitq_waiters++;
1086 while (!rdsv3_send_queue_rm(rs, conn, rm, rs->rs_bound_port,
1087 dport, &queued)) {
1088 ret = cv_wait_sig(&sk->sk_sleep->waitq_cv,
1089 &sk->sk_sleep->waitq_mutex);
1090 if (ret == 0) {
1091 /* signal/timeout pending */
1092 RDSV3_DPRINTF2("rdsv3_sendmsg",
1093 "woke due to signal: %d", ret);
1094 ret = -EINTR;
1095 sk->sk_sleep->waitq_waiters--;
1096 mutex_exit(&sk->sk_sleep->waitq_mutex);
1097 goto out;
1098 }
1099 }
1100 sk->sk_sleep->waitq_waiters--;
1101 mutex_exit(&sk->sk_sleep->waitq_mutex);
1102 #endif
1103
1104 RDSV3_DPRINTF5("rdsv3_sendmsg", "sendmsg woke queued %d",
1105 queued);
1106
1107 ASSERT(queued);
1108 ret = 0;
1109 }
1110
1111 /*
1112 * By now we've committed to the send. We reuse rdsv3_send_worker()
1113 * to retry sends in the rds thread if the transport asks us to.
1114 */
1115 rdsv3_stats_inc(s_send_queued);
1116
1117 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
1118 (void) rdsv3_send_worker(&conn->c_send_w.work);
1119
1120 rdsv3_message_put(rm);
1121 RDSV3_DPRINTF4("rdsv3_sendmsg", "Return(rs: %p, len: %d)",
1122 rs, payload_len);
1123 return (payload_len);
1124
1125 out:
1126 /*
1127 * If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
1128 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
1129 * or in any other way, we need to destroy the MR again
1130 */
1131 if (allocated_mr)
1132 rdsv3_rdma_unuse(rs, rdsv3_rdma_cookie_key(rm->m_rdma_cookie),
1133 1);
1134
1135 if (rm)
1136 rdsv3_message_put(rm);
1137 return (ret);
1138 }
1139
1140 /*
1141 * Reply to a ping packet.
1142 */
1143 int
rdsv3_send_pong(struct rdsv3_connection * conn,uint16_be_t dport)1144 rdsv3_send_pong(struct rdsv3_connection *conn, uint16_be_t dport)
1145 {
1146 struct rdsv3_message *rm;
1147 int ret = 0;
1148
1149 RDSV3_DPRINTF4("rdsv3_send_pong", "Enter(conn: %p)", conn);
1150
1151 rm = rdsv3_message_alloc(0, KM_NOSLEEP);
1152 if (!rm) {
1153 ret = -ENOMEM;
1154 goto out;
1155 }
1156
1157 rm->m_daddr = conn->c_faddr;
1158
1159 /*
1160 * If the connection is down, trigger a connect. We may
1161 * have scheduled a delayed reconnect however - in this case
1162 * we should not interfere.
1163 */
1164 if (rdsv3_conn_state(conn) == RDSV3_CONN_DOWN &&
1165 !test_and_set_bit(RDSV3_RECONNECT_PENDING, &conn->c_flags))
1166 rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_conn_w, 0);
1167
1168 ret = rdsv3_cong_wait(conn->c_fcong, dport, 1, NULL);
1169 if (ret)
1170 goto out;
1171
1172 mutex_enter(&conn->c_lock);
1173 list_insert_tail(&conn->c_send_queue, rm);
1174 set_bit(RDSV3_MSG_ON_CONN, &rm->m_flags);
1175 rdsv3_message_addref(rm);
1176 rm->m_inc.i_conn = conn;
1177
1178 rdsv3_message_populate_header(&rm->m_inc.i_hdr, 0, dport,
1179 conn->c_next_tx_seq);
1180 conn->c_next_tx_seq++;
1181 mutex_exit(&conn->c_lock);
1182
1183 rdsv3_stats_inc(s_send_queued);
1184 rdsv3_stats_inc(s_send_pong);
1185
1186 if (!test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
1187 (void) rdsv3_send_xmit(conn);
1188
1189 rdsv3_message_put(rm);
1190
1191 RDSV3_DPRINTF4("rdsv3_send_pong", "Return(conn: %p)", conn);
1192 return (0);
1193
1194 out:
1195 if (rm)
1196 rdsv3_message_put(rm);
1197 return (ret);
1198 }
1199