Lines Matching full:rs

84 	struct rsocket *rs;  member
284 struct rsocket *rs; member
421 static void ds_insert_qp(struct rsocket *rs, struct ds_qp *qp) in ds_insert_qp() argument
423 if (!rs->qp_list) in ds_insert_qp()
426 dlist_insert_head(&qp->list, &rs->qp_list->list); in ds_insert_qp()
427 rs->qp_list = qp; in ds_insert_qp()
430 static void ds_remove_qp(struct rsocket *rs, struct ds_qp *qp) in ds_remove_qp() argument
433 rs->qp_list = ds_next_qp(qp); in ds_remove_qp()
436 rs->qp_list = NULL; in ds_remove_qp()
440 static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd) in rs_notify_svc() argument
460 msg.rs = rs; in rs_notify_svc()
574 static int rs_insert(struct rsocket *rs, int index) in rs_insert() argument
577 rs->index = idm_set(&idm, index, rs); in rs_insert()
579 return rs->index; in rs_insert()
582 static void rs_remove(struct rsocket *rs) in rs_remove() argument
585 idm_clear(&idm, rs->index); in rs_remove()
592 struct rsocket *rs; in rs_alloc() local
594 rs = calloc(1, sizeof(*rs)); in rs_alloc()
595 if (!rs) in rs_alloc()
598 rs->type = type; in rs_alloc()
599 rs->index = -1; in rs_alloc()
601 rs->udp_sock = -1; in rs_alloc()
602 rs->epfd = -1; in rs_alloc()
606 rs->sbuf_size = inherited_rs->sbuf_size; in rs_alloc()
607 rs->rbuf_size = inherited_rs->rbuf_size; in rs_alloc()
608 rs->sq_inline = inherited_rs->sq_inline; in rs_alloc()
609 rs->sq_size = inherited_rs->sq_size; in rs_alloc()
610 rs->rq_size = inherited_rs->rq_size; in rs_alloc()
612 rs->ctrl_max_seqno = inherited_rs->ctrl_max_seqno; in rs_alloc()
613 rs->target_iomap_size = inherited_rs->target_iomap_size; in rs_alloc()
616 rs->sbuf_size = def_wmem; in rs_alloc()
617 rs->rbuf_size = def_mem; in rs_alloc()
618 rs->sq_inline = def_inline; in rs_alloc()
619 rs->sq_size = def_sqsize; in rs_alloc()
620 rs->rq_size = def_rqsize; in rs_alloc()
622 rs->ctrl_max_seqno = RS_QP_CTRL_SIZE; in rs_alloc()
623 rs->target_iomap_size = def_iomap_size; in rs_alloc()
626 fastlock_init(&rs->slock); in rs_alloc()
627 fastlock_init(&rs->rlock); in rs_alloc()
628 fastlock_init(&rs->cq_lock); in rs_alloc()
629 fastlock_init(&rs->cq_wait_lock); in rs_alloc()
630 fastlock_init(&rs->map_lock); in rs_alloc()
631 dlist_init(&rs->iomap_list); in rs_alloc()
632 dlist_init(&rs->iomap_queue); in rs_alloc()
633 return rs; in rs_alloc()
636 static int rs_set_nonblocking(struct rsocket *rs, int arg) in rs_set_nonblocking() argument
641 if (rs->type == SOCK_STREAM) { in rs_set_nonblocking()
642 if (rs->cm_id->recv_cq_channel) in rs_set_nonblocking()
643 ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg); in rs_set_nonblocking()
645 if (!ret && rs->state < rs_connected) in rs_set_nonblocking()
646 ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg); in rs_set_nonblocking()
648 ret = fcntl(rs->epfd, F_SETFL, arg); in rs_set_nonblocking()
649 if (!ret && rs->qp_list) { in rs_set_nonblocking()
650 qp = rs->qp_list; in rs_set_nonblocking()
655 } while (qp != rs->qp_list && !ret); in rs_set_nonblocking()
662 static void rs_set_qp_size(struct rsocket *rs) in rs_set_qp_size() argument
666 max_size = min(ucma_max_qpsize(rs->cm_id), RS_QP_MAX_SIZE); in rs_set_qp_size()
668 if (rs->sq_size > max_size) in rs_set_qp_size()
669 rs->sq_size = max_size; in rs_set_qp_size()
670 else if (rs->sq_size < RS_QP_MIN_SIZE) in rs_set_qp_size()
671 rs->sq_size = RS_QP_MIN_SIZE; in rs_set_qp_size()
673 if (rs->rq_size > max_size) in rs_set_qp_size()
674 rs->rq_size = max_size; in rs_set_qp_size()
675 else if (rs->rq_size < RS_QP_MIN_SIZE) in rs_set_qp_size()
676 rs->rq_size = RS_QP_MIN_SIZE; in rs_set_qp_size()
679 static void ds_set_qp_size(struct rsocket *rs) in ds_set_qp_size() argument
685 if (rs->sq_size > max_size) in ds_set_qp_size()
686 rs->sq_size = max_size; in ds_set_qp_size()
687 if (rs->rq_size > max_size) in ds_set_qp_size()
688 rs->rq_size = max_size; in ds_set_qp_size()
690 if (rs->rq_size > (rs->rbuf_size / RS_SNDLOWAT)) in ds_set_qp_size()
691 rs->rq_size = rs->rbuf_size / RS_SNDLOWAT; in ds_set_qp_size()
693 rs->rbuf_size = rs->rq_size * RS_SNDLOWAT; in ds_set_qp_size()
695 if (rs->sq_size > (rs->sbuf_size / RS_SNDLOWAT)) in ds_set_qp_size()
696 rs->sq_size = rs->sbuf_size / RS_SNDLOWAT; in ds_set_qp_size()
698 rs->sbuf_size = rs->sq_size * RS_SNDLOWAT; in ds_set_qp_size()
701 static int rs_init_bufs(struct rsocket *rs) in rs_init_bufs() argument
706 rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg)); in rs_init_bufs()
707 if (!rs->rmsg) in rs_init_bufs()
710 total_sbuf_size = rs->sbuf_size; in rs_init_bufs()
711 if (rs->sq_inline < RS_MAX_CTRL_MSG) in rs_init_bufs()
713 rs->sbuf = calloc(total_sbuf_size, 1); in rs_init_bufs()
714 if (!rs->sbuf) in rs_init_bufs()
717 rs->smr = rdma_reg_msgs(rs->cm_id, rs->sbuf, total_sbuf_size); in rs_init_bufs()
718 if (!rs->smr) in rs_init_bufs()
721 len = sizeof(*rs->target_sgl) * RS_SGL_SIZE + in rs_init_bufs()
722 sizeof(*rs->target_iomap) * rs->target_iomap_size; in rs_init_bufs()
723 rs->target_buffer_list = malloc(len); in rs_init_bufs()
724 if (!rs->target_buffer_list) in rs_init_bufs()
727 rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len); in rs_init_bufs()
728 if (!rs->target_mr) in rs_init_bufs()
731 memset(rs->target_buffer_list, 0, len); in rs_init_bufs()
732 rs->target_sgl = rs->target_buffer_list; in rs_init_bufs()
733 if (rs->target_iomap_size) in rs_init_bufs()
734 rs->target_iomap = (struct rs_iomap *) (rs->target_sgl + RS_SGL_SIZE); in rs_init_bufs()
736 total_rbuf_size = rs->rbuf_size; in rs_init_bufs()
737 if (rs->opts & RS_OPT_MSG_SEND) in rs_init_bufs()
738 total_rbuf_size += rs->rq_size * RS_MSG_SIZE; in rs_init_bufs()
739 rs->rbuf = calloc(total_rbuf_size, 1); in rs_init_bufs()
740 if (!rs->rbuf) in rs_init_bufs()
743 rs->rmr = rdma_reg_write(rs->cm_id, rs->rbuf, total_rbuf_size); in rs_init_bufs()
744 if (!rs->rmr) in rs_init_bufs()
747 rs->ssgl[0].addr = rs->ssgl[1].addr = (uintptr_t) rs->sbuf; in rs_init_bufs()
748 rs->sbuf_bytes_avail = rs->sbuf_size; in rs_init_bufs()
749 rs->ssgl[0].lkey = rs->ssgl[1].lkey = rs->smr->lkey; in rs_init_bufs()
751 rs->rbuf_free_offset = rs->rbuf_size >> 1; in rs_init_bufs()
752 rs->rbuf_bytes_avail = rs->rbuf_size >> 1; in rs_init_bufs()
753 rs->sqe_avail = rs->sq_size - rs->ctrl_max_seqno; in rs_init_bufs()
754 rs->rseq_comp = rs->rq_size >> 1; in rs_init_bufs()
760 qp->rbuf = calloc(qp->rs->rbuf_size + sizeof(struct ibv_grh), 1); in ds_init_bufs()
764 qp->smr = rdma_reg_msgs(qp->cm_id, qp->rs->sbuf, qp->rs->sbuf_size); in ds_init_bufs()
768 qp->rmr = rdma_reg_msgs(qp->cm_id, qp->rbuf, qp->rs->rbuf_size + in ds_init_bufs()
781 static int rs_create_cq(struct rsocket *rs, struct rdma_cm_id *cm_id) in rs_create_cq() argument
787 cm_id->recv_cq = ibv_create_cq(cm_id->verbs, rs->sq_size + rs->rq_size, in rs_create_cq()
792 if (rs->fd_flags & O_NONBLOCK) { in rs_create_cq()
811 static inline int rs_post_recv(struct rsocket *rs) in rs_post_recv() argument
817 if (!(rs->opts & RS_OPT_MSG_SEND)) { in rs_post_recv()
822 wr.wr_id = rs_recv_wr_id(rs->rbuf_msg_index); in rs_post_recv()
823 sge.addr = (uintptr_t) rs->rbuf + rs->rbuf_size + in rs_post_recv()
824 (rs->rbuf_msg_index * RS_MSG_SIZE); in rs_post_recv()
826 sge.lkey = rs->rmr->lkey; in rs_post_recv()
830 if(++rs->rbuf_msg_index == rs->rq_size) in rs_post_recv()
831 rs->rbuf_msg_index = 0; in rs_post_recv()
834 return rdma_seterrno(ibv_post_recv(rs->cm_id->qp, &wr, &bad)); in rs_post_recv()
837 static inline int ds_post_recv(struct rsocket *rs, struct ds_qp *qp, uint32_t offset) in ds_post_recv() argument
842 sge[0].addr = (uintptr_t) qp->rbuf + rs->rbuf_size; in ds_post_recv()
857 static int rs_create_ep(struct rsocket *rs) in rs_create_ep() argument
862 rs_set_qp_size(rs); in rs_create_ep()
863 if (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IWARP) in rs_create_ep()
864 rs->opts |= RS_OPT_MSG_SEND; in rs_create_ep()
865 ret = rs_create_cq(rs, rs->cm_id); in rs_create_ep()
870 qp_attr.qp_context = rs; in rs_create_ep()
871 qp_attr.send_cq = rs->cm_id->send_cq; in rs_create_ep()
872 qp_attr.recv_cq = rs->cm_id->recv_cq; in rs_create_ep()
875 qp_attr.cap.max_send_wr = rs->sq_size; in rs_create_ep()
876 qp_attr.cap.max_recv_wr = rs->rq_size; in rs_create_ep()
879 qp_attr.cap.max_inline_data = rs->sq_inline; in rs_create_ep()
881 ret = rdma_create_qp(rs->cm_id, NULL, &qp_attr); in rs_create_ep()
885 rs->sq_inline = qp_attr.cap.max_inline_data; in rs_create_ep()
886 if ((rs->opts & RS_OPT_MSG_SEND) && (rs->sq_inline < RS_MSG_SIZE)) in rs_create_ep()
889 ret = rs_init_bufs(rs); in rs_create_ep()
893 for (i = 0; i < rs->rq_size; i++) { in rs_create_ep()
894 ret = rs_post_recv(rs); in rs_create_ep()
914 static void rs_free_iomappings(struct rsocket *rs) in rs_free_iomappings() argument
918 while (!dlist_empty(&rs->iomap_list)) { in rs_free_iomappings()
919 iomr = container_of(rs->iomap_list.next, in rs_free_iomappings()
921 riounmap(rs->index, iomr->mr->addr, iomr->mr->length); in rs_free_iomappings()
923 while (!dlist_empty(&rs->iomap_queue)) { in rs_free_iomappings()
924 iomr = container_of(rs->iomap_queue.next, in rs_free_iomappings()
926 riounmap(rs->index, iomr->mr->addr, iomr->mr->length); in rs_free_iomappings()
943 tdelete(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr); in ds_free_qp()
944 epoll_ctl(qp->rs->epfd, EPOLL_CTL_DEL, in ds_free_qp()
954 static void ds_free(struct rsocket *rs) in ds_free() argument
958 if (rs->udp_sock >= 0) in ds_free()
959 close(rs->udp_sock); in ds_free()
961 if (rs->index >= 0) in ds_free()
962 rs_remove(rs); in ds_free()
964 if (rs->dmsg) in ds_free()
965 free(rs->dmsg); in ds_free()
967 while ((qp = rs->qp_list)) { in ds_free()
968 ds_remove_qp(rs, qp); in ds_free()
972 if (rs->epfd >= 0) in ds_free()
973 close(rs->epfd); in ds_free()
975 if (rs->sbuf) in ds_free()
976 free(rs->sbuf); in ds_free()
978 tdestroy(rs->dest_map, free); in ds_free()
979 fastlock_destroy(&rs->map_lock); in ds_free()
980 fastlock_destroy(&rs->cq_wait_lock); in ds_free()
981 fastlock_destroy(&rs->cq_lock); in ds_free()
982 fastlock_destroy(&rs->rlock); in ds_free()
983 fastlock_destroy(&rs->slock); in ds_free()
984 free(rs); in ds_free()
987 static void rs_free(struct rsocket *rs) in rs_free() argument
989 if (rs->type == SOCK_DGRAM) { in rs_free()
990 ds_free(rs); in rs_free()
994 if (rs->rmsg) in rs_free()
995 free(rs->rmsg); in rs_free()
997 if (rs->sbuf) { in rs_free()
998 if (rs->smr) in rs_free()
999 rdma_dereg_mr(rs->smr); in rs_free()
1000 free(rs->sbuf); in rs_free()
1003 if (rs->rbuf) { in rs_free()
1004 if (rs->rmr) in rs_free()
1005 rdma_dereg_mr(rs->rmr); in rs_free()
1006 free(rs->rbuf); in rs_free()
1009 if (rs->target_buffer_list) { in rs_free()
1010 if (rs->target_mr) in rs_free()
1011 rdma_dereg_mr(rs->target_mr); in rs_free()
1012 free(rs->target_buffer_list); in rs_free()
1015 if (rs->cm_id) { in rs_free()
1016 rs_free_iomappings(rs); in rs_free()
1017 if (rs->cm_id->qp) { in rs_free()
1018 ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe); in rs_free()
1019 rdma_destroy_qp(rs->cm_id); in rs_free()
1021 rdma_destroy_id(rs->cm_id); in rs_free()
1024 if (rs->index >= 0) in rs_free()
1025 rs_remove(rs); in rs_free()
1027 fastlock_destroy(&rs->map_lock); in rs_free()
1028 fastlock_destroy(&rs->cq_wait_lock); in rs_free()
1029 fastlock_destroy(&rs->cq_lock); in rs_free()
1030 fastlock_destroy(&rs->rlock); in rs_free()
1031 fastlock_destroy(&rs->slock); in rs_free()
1032 free(rs); in rs_free()
1035 static size_t rs_conn_data_offset(struct rsocket *rs) in rs_conn_data_offset() argument
1037 return (rs->cm_id->route.addr.src_addr.sa_family == AF_IB) ? in rs_conn_data_offset()
1041 static void rs_format_conn_data(struct rsocket *rs, struct rs_conn_data *conn) in rs_format_conn_data() argument
1046 conn->credits = htobe16(rs->rq_size); in rs_format_conn_data()
1048 conn->target_iomap_size = (uint8_t) rs_value_to_scale(rs->target_iomap_size, 8); in rs_format_conn_data()
1050 conn->target_sgl.addr = (__force uint64_t)htobe64((uintptr_t) rs->target_sgl); in rs_format_conn_data()
1052 conn->target_sgl.key = (__force uint32_t)htobe32(rs->target_mr->rkey); in rs_format_conn_data()
1054 conn->data_buf.addr = (__force uint64_t)htobe64((uintptr_t) rs->rbuf); in rs_format_conn_data()
1055 conn->data_buf.length = (__force uint32_t)htobe32(rs->rbuf_size >> 1); in rs_format_conn_data()
1056 conn->data_buf.key = (__force uint32_t)htobe32(rs->rmr->rkey); in rs_format_conn_data()
1059 static void rs_save_conn_data(struct rsocket *rs, struct rs_conn_data *conn) in rs_save_conn_data() argument
1061 rs->remote_sgl.addr = be64toh((__force __be64)conn->target_sgl.addr); in rs_save_conn_data()
1062 rs->remote_sgl.length = be32toh((__force __be32)conn->target_sgl.length); in rs_save_conn_data()
1063 rs->remote_sgl.key = be32toh((__force __be32)conn->target_sgl.key); in rs_save_conn_data()
1064 rs->remote_sge = 1; in rs_save_conn_data()
1067 rs->opts = RS_OPT_SWAP_SGL; in rs_save_conn_data()
1070 rs->remote_iomap.addr = rs->remote_sgl.addr + in rs_save_conn_data()
1071 sizeof(rs->remote_sgl) * rs->remote_sgl.length; in rs_save_conn_data()
1072 rs->remote_iomap.length = rs_scale_to_value(conn->target_iomap_size, 8); in rs_save_conn_data()
1073 rs->remote_iomap.key = rs->remote_sgl.key; in rs_save_conn_data()
1076 rs->target_sgl[0].addr = be64toh((__force __be64)conn->data_buf.addr); in rs_save_conn_data()
1077 rs->target_sgl[0].length = be32toh((__force __be32)conn->data_buf.length); in rs_save_conn_data()
1078 rs->target_sgl[0].key = be32toh((__force __be32)conn->data_buf.key); in rs_save_conn_data()
1080 rs->sseq_comp = be16toh(conn->credits); in rs_save_conn_data()
1083 static int ds_init(struct rsocket *rs, int domain) in ds_init() argument
1085 rs->udp_sock = socket(domain, SOCK_DGRAM, 0); in ds_init()
1086 if (rs->udp_sock < 0) in ds_init()
1087 return rs->udp_sock; in ds_init()
1089 rs->epfd = epoll_create(2); in ds_init()
1090 if (rs->epfd < 0) in ds_init()
1091 return rs->epfd; in ds_init()
1096 static int ds_init_ep(struct rsocket *rs) in ds_init_ep() argument
1101 ds_set_qp_size(rs); in ds_init_ep()
1103 rs->sbuf = calloc(rs->sq_size, RS_SNDLOWAT); in ds_init_ep()
1104 if (!rs->sbuf) in ds_init_ep()
1107 rs->dmsg = calloc(rs->rq_size + 1, sizeof(*rs->dmsg)); in ds_init_ep()
1108 if (!rs->dmsg) in ds_init_ep()
1111 rs->sqe_avail = rs->sq_size; in ds_init_ep()
1112 rs->rqe_avail = rs->rq_size; in ds_init_ep()
1114 rs->smsg_free = (struct ds_smsg *) rs->sbuf; in ds_init_ep()
1115 msg = rs->smsg_free; in ds_init_ep()
1116 for (i = 0; i < rs->sq_size - 1; i++) { in ds_init_ep()
1122 ret = rs_notify_svc(&udp_svc, rs, RS_SVC_ADD_DGRAM); in ds_init_ep()
1126 rs->state = rs_readable | rs_writable; in ds_init_ep()
1132 struct rsocket *rs; in rsocket() local
1142 rs = rs_alloc(NULL, type); in rsocket()
1143 if (!rs) in rsocket()
1147 ret = rdma_create_id(NULL, &rs->cm_id, rs, RDMA_PS_TCP); in rsocket()
1151 rs->cm_id->route.addr.src_addr.sa_family = domain; in rsocket()
1152 index = rs->cm_id->channel->fd; in rsocket()
1154 ret = ds_init(rs, domain); in rsocket()
1158 index = rs->udp_sock; in rsocket()
1161 ret = rs_insert(rs, index); in rsocket()
1165 return rs->index; in rsocket()
1168 rs_free(rs); in rsocket()
1174 struct rsocket *rs; in rbind() local
1177 rs = idm_lookup(&idm, socket); in rbind()
1178 if (!rs) in rbind()
1180 if (rs->type == SOCK_STREAM) { in rbind()
1181 ret = rdma_bind_addr(rs->cm_id, (struct sockaddr *) addr); in rbind()
1183 rs->state = rs_bound; in rbind()
1185 if (rs->state == rs_init) { in rbind()
1186 ret = ds_init_ep(rs); in rbind()
1190 ret = bind(rs->udp_sock, addr, addrlen); in rbind()
1197 struct rsocket *rs; in rlisten() local
1200 rs = idm_lookup(&idm, socket); in rlisten()
1201 if (!rs) in rlisten()
1204 if (rs->state != rs_listening) { in rlisten()
1205 ret = rdma_listen(rs->cm_id, backlog); in rlisten()
1207 rs->state = rs_listening; in rlisten()
1228 struct rsocket *rs, *new_rs; in raccept() local
1233 rs = idm_lookup(&idm, socket); in raccept()
1234 if (!rs) in raccept()
1236 new_rs = rs_alloc(rs, rs->type); in raccept()
1240 ret = rdma_get_request(rs->cm_id, &new_rs->cm_id); in raccept()
1249 (new_rs->cm_id->event->param.conn.private_data + rs_conn_data_offset(rs)); in raccept()
1255 if (rs->fd_flags & O_NONBLOCK) in raccept()
1284 static int rs_do_connect(struct rsocket *rs) in rs_do_connect() argument
1291 switch (rs->state) { in rs_do_connect()
1295 to = 1000 << rs->retries++; in rs_do_connect()
1296 ret = rdma_resolve_addr(rs->cm_id, NULL, in rs_do_connect()
1297 &rs->cm_id->route.addr.dst_addr, to); in rs_do_connect()
1301 rs->state = rs_resolving_addr; in rs_do_connect()
1304 ret = ucma_complete(rs->cm_id); in rs_do_connect()
1306 if (errno == ETIMEDOUT && rs->retries <= RS_CONN_RETRIES) in rs_do_connect()
1311 rs->retries = 0; in rs_do_connect()
1313 to = 1000 << rs->retries++; in rs_do_connect()
1314 if (rs->optval) { in rs_do_connect()
1315 ret = rdma_set_option(rs->cm_id, RDMA_OPTION_IB, in rs_do_connect()
1316 RDMA_OPTION_IB_PATH, rs->optval, in rs_do_connect()
1317 rs->optlen); in rs_do_connect()
1318 free(rs->optval); in rs_do_connect()
1319 rs->optval = NULL; in rs_do_connect()
1321 rs->state = rs_resolving_route; in rs_do_connect()
1325 ret = rdma_resolve_route(rs->cm_id, to); in rs_do_connect()
1330 rs->state = rs_resolving_route; in rs_do_connect()
1334 ret = ucma_complete(rs->cm_id); in rs_do_connect()
1336 if (errno == ETIMEDOUT && rs->retries <= RS_CONN_RETRIES) in rs_do_connect()
1341 ret = rs_create_ep(rs); in rs_do_connect()
1346 creq = (void *) &cdata + rs_conn_data_offset(rs); in rs_do_connect()
1347 rs_format_conn_data(rs, creq); in rs_do_connect()
1348 param.private_data = (void *) creq - rs_conn_data_offset(rs); in rs_do_connect()
1349 param.private_data_len = sizeof(*creq) + rs_conn_data_offset(rs); in rs_do_connect()
1354 if (rs->opts & RS_OPT_MSG_SEND) in rs_do_connect()
1356 rs->retries = 0; in rs_do_connect()
1358 ret = rdma_connect(rs->cm_id, &param); in rs_do_connect()
1362 rs->state = rs_connecting; in rs_do_connect()
1365 ret = ucma_complete(rs->cm_id); in rs_do_connect()
1369 cresp = (struct rs_conn_data *) rs->cm_id->event->param.conn.private_data; in rs_do_connect()
1375 rs_save_conn_data(rs, cresp); in rs_do_connect()
1376 rs->state = rs_connect_rdwr; in rs_do_connect()
1379 if (!(rs->fd_flags & O_NONBLOCK)) in rs_do_connect()
1380 fcntl(rs->cm_id->channel->fd, F_SETFL, 0); in rs_do_connect()
1382 ret = ucma_complete(rs->cm_id); in rs_do_connect()
1386 rs->state = rs_connect_rdwr; in rs_do_connect()
1397 rs->state = rs_connect_error; in rs_do_connect()
1398 rs->err = errno; in rs_do_connect()
1415 static int ds_get_src_addr(struct rsocket *rs, in ds_get_src_addr() argument
1423 ret = getsockname(rs->udp_sock, &src_addr->sa, src_len); in ds_get_src_addr()
1482 tsearch(&qp->dest.addr, &qp->rs->dest_map, ds_compare_addr); in ds_add_qp_dest()
1486 static int ds_create_qp(struct rsocket *rs, union socket_addr *src_addr, in ds_create_qp() argument
1498 qp->rs = rs; in ds_create_qp()
1512 ret = rs_create_cq(rs, qp->cm_id); in ds_create_qp()
1522 qp_attr.cap.max_send_wr = rs->sq_size; in ds_create_qp()
1523 qp_attr.cap.max_recv_wr = rs->rq_size; in ds_create_qp()
1526 qp_attr.cap.max_inline_data = rs->sq_inline; in ds_create_qp()
1531 rs->sq_inline = qp_attr.cap.max_inline_data; in ds_create_qp()
1538 ret = epoll_ctl(rs->epfd, EPOLL_CTL_ADD, in ds_create_qp()
1543 for (i = 0; i < rs->rq_size; i++) { in ds_create_qp()
1544 ret = ds_post_recv(rs, qp, i * RS_SNDLOWAT); in ds_create_qp()
1549 ds_insert_qp(rs, qp); in ds_create_qp()
1557 static int ds_get_qp(struct rsocket *rs, union socket_addr *src_addr, in ds_get_qp() argument
1560 if (rs->qp_list) { in ds_get_qp()
1561 *qp = rs->qp_list; in ds_get_qp()
1568 } while (*qp != rs->qp_list); in ds_get_qp()
1571 return ds_create_qp(rs, src_addr, addrlen, qp); in ds_get_qp()
1574 static int ds_get_dest(struct rsocket *rs, const struct sockaddr *addr, in ds_get_dest() argument
1583 fastlock_acquire(&rs->map_lock); in ds_get_dest()
1584 tdest = tfind(addr, &rs->dest_map, ds_compare_addr); in ds_get_dest()
1588 ret = ds_get_src_addr(rs, addr, addrlen, &src_addr, &src_len); in ds_get_dest()
1592 ret = ds_get_qp(rs, &src_addr, src_len, &qp); in ds_get_dest()
1596 tdest = tfind(addr, &rs->dest_map, ds_compare_addr); in ds_get_dest()
1606 tdest = tsearch(&new_dest->addr, &rs->dest_map, ds_compare_addr); in ds_get_dest()
1612 fastlock_release(&rs->map_lock); in ds_get_dest()
1618 struct rsocket *rs; in rconnect() local
1621 rs = idm_lookup(&idm, socket); in rconnect()
1622 if (!rs) in rconnect()
1624 if (rs->type == SOCK_STREAM) { in rconnect()
1625 memcpy(&rs->cm_id->route.addr.dst_addr, addr, addrlen); in rconnect()
1626 ret = rs_do_connect(rs); in rconnect()
1628 if (rs->state == rs_init) { in rconnect()
1629 ret = ds_init_ep(rs); in rconnect()
1634 fastlock_acquire(&rs->slock); in rconnect()
1635 ret = connect(rs->udp_sock, addr, addrlen); in rconnect()
1637 ret = ds_get_dest(rs, addr, addrlen, &rs->conn_dest); in rconnect()
1638 fastlock_release(&rs->slock); in rconnect()
1643 static void *rs_get_ctrl_buf(struct rsocket *rs) in rs_get_ctrl_buf() argument
1645 return rs->sbuf + rs->sbuf_size + in rs_get_ctrl_buf()
1646 RS_MAX_CTRL_MSG * (rs->ctrl_seqno & (RS_QP_CTRL_SIZE - 1)); in rs_get_ctrl_buf()
1649 static int rs_post_msg(struct rsocket *rs, uint32_t msg) in rs_post_msg() argument
1656 if (!(rs->opts & RS_OPT_MSG_SEND)) { in rs_post_msg()
1672 return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); in rs_post_msg()
1675 static int rs_post_write(struct rsocket *rs, in rs_post_write() argument
1691 return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); in rs_post_write()
1694 static int rs_post_write_msg(struct rsocket *rs, in rs_post_write_msg() argument
1704 if (!(rs->opts & RS_OPT_MSG_SEND)) { in rs_post_write_msg()
1714 return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); in rs_post_write_msg()
1716 ret = rs_post_write(rs, sgl, nsge, msg, flags, addr, rkey); in rs_post_write_msg()
1728 ret = rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad)); in rs_post_write_msg()
1734 static int ds_post_send(struct rsocket *rs, struct ibv_sge *sge, in ds_post_send() argument
1744 wr.send_flags = (sge->length <= rs->sq_inline) ? IBV_SEND_INLINE : 0; in ds_post_send()
1745 wr.wr.ud.ah = rs->conn_dest->ah; in ds_post_send()
1746 wr.wr.ud.remote_qpn = rs->conn_dest->qpn; in ds_post_send()
1749 return rdma_seterrno(ibv_post_send(rs->conn_dest->qp->cm_id->qp, &wr, &bad)); in ds_post_send()
1756 static int rs_write_data(struct rsocket *rs, in rs_write_data() argument
1763 rs->sseq_no++; in rs_write_data()
1764 rs->sqe_avail--; in rs_write_data()
1765 if (rs->opts & RS_OPT_MSG_SEND) in rs_write_data()
1766 rs->sqe_avail--; in rs_write_data()
1767 rs->sbuf_bytes_avail -= length; in rs_write_data()
1769 addr = rs->target_sgl[rs->target_sge].addr; in rs_write_data()
1770 rkey = rs->target_sgl[rs->target_sge].key; in rs_write_data()
1772 rs->target_sgl[rs->target_sge].addr += length; in rs_write_data()
1773 rs->target_sgl[rs->target_sge].length -= length; in rs_write_data()
1775 if (!rs->target_sgl[rs->target_sge].length) { in rs_write_data()
1776 if (++rs->target_sge == RS_SGL_SIZE) in rs_write_data()
1777 rs->target_sge = 0; in rs_write_data()
1780 return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length), in rs_write_data()
1784 static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t offset, in rs_write_direct() argument
1789 rs->sqe_avail--; in rs_write_direct()
1790 rs->sbuf_bytes_avail -= length; in rs_write_direct()
1793 return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length), in rs_write_direct()
1797 static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr, in rs_write_iomap() argument
1802 rs->sseq_no++; in rs_write_iomap()
1803 rs->sqe_avail--; in rs_write_iomap()
1804 if (rs->opts & RS_OPT_MSG_SEND) in rs_write_iomap()
1805 rs->sqe_avail--; in rs_write_iomap()
1806 rs->sbuf_bytes_avail -= sizeof(struct rs_iomap); in rs_write_iomap()
1808 addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap); in rs_write_iomap()
1809 return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL, iomr->index), in rs_write_iomap()
1810 flags, addr, rs->remote_iomap.key); in rs_write_iomap()
1813 static uint32_t rs_sbuf_left(struct rsocket *rs) in rs_sbuf_left() argument
1815 return (uint32_t) (((uint64_t) (uintptr_t) &rs->sbuf[rs->sbuf_size]) - in rs_sbuf_left()
1816 rs->ssgl[0].addr); in rs_sbuf_left()
1819 static void rs_send_credits(struct rsocket *rs) in rs_send_credits() argument
1825 rs->ctrl_seqno++; in rs_send_credits()
1826 rs->rseq_comp = rs->rseq_no + (rs->rq_size >> 1); in rs_send_credits()
1827 if (rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) { in rs_send_credits()
1828 if (rs->opts & RS_OPT_MSG_SEND) in rs_send_credits()
1829 rs->ctrl_seqno++; in rs_send_credits()
1831 if (!(rs->opts & RS_OPT_SWAP_SGL)) { in rs_send_credits()
1832 sge.addr = (uintptr_t) &rs->rbuf[rs->rbuf_free_offset]; in rs_send_credits()
1833 sge.key = rs->rmr->rkey; in rs_send_credits()
1834 sge.length = rs->rbuf_size >> 1; in rs_send_credits()
1836 sge.addr = bswap_64((uintptr_t) &rs->rbuf[rs->rbuf_free_offset]); in rs_send_credits()
1837 sge.key = bswap_32(rs->rmr->rkey); in rs_send_credits()
1838 sge.length = bswap_32(rs->rbuf_size >> 1); in rs_send_credits()
1841 if (rs->sq_inline < sizeof sge) { in rs_send_credits()
1842 sge_buf = rs_get_ctrl_buf(rs); in rs_send_credits()
1845 ibsge.lkey = rs->smr->lkey; in rs_send_credits()
1854 rs_post_write_msg(rs, &ibsge, 1, in rs_send_credits()
1855 rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size), flags, in rs_send_credits()
1856 rs->remote_sgl.addr + rs->remote_sge * sizeof(struct rs_sge), in rs_send_credits()
1857 rs->remote_sgl.key); in rs_send_credits()
1859 rs->rbuf_bytes_avail -= rs->rbuf_size >> 1; in rs_send_credits()
1860 rs->rbuf_free_offset += rs->rbuf_size >> 1; in rs_send_credits()
1861 if (rs->rbuf_free_offset >= rs->rbuf_size) in rs_send_credits()
1862 rs->rbuf_free_offset = 0; in rs_send_credits()
1863 if (++rs->remote_sge == rs->remote_sgl.length) in rs_send_credits()
1864 rs->remote_sge = 0; in rs_send_credits()
1866 rs_post_msg(rs, rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size)); in rs_send_credits()
1870 static inline int rs_ctrl_avail(struct rsocket *rs) in rs_ctrl_avail() argument
1872 return rs->ctrl_seqno != rs->ctrl_max_seqno; in rs_ctrl_avail()
1876 static inline int rs_2ctrl_avail(struct rsocket *rs) in rs_2ctrl_avail() argument
1878 return (int)((rs->ctrl_seqno + 1) - rs->ctrl_max_seqno) < 0; in rs_2ctrl_avail()
1881 static int rs_give_credits(struct rsocket *rs) in rs_give_credits() argument
1883 if (!(rs->opts & RS_OPT_MSG_SEND)) { in rs_give_credits()
1884 return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) || in rs_give_credits()
1885 ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) && in rs_give_credits()
1886 rs_ctrl_avail(rs) && (rs->state & rs_connected); in rs_give_credits()
1888 return ((rs->rbuf_bytes_avail >= (rs->rbuf_size >> 1)) || in rs_give_credits()
1889 ((short) ((short) rs->rseq_no - (short) rs->rseq_comp) >= 0)) && in rs_give_credits()
1890 rs_2ctrl_avail(rs) && (rs->state & rs_connected); in rs_give_credits()
1894 static void rs_update_credits(struct rsocket *rs) in rs_update_credits() argument
1896 if (rs_give_credits(rs)) in rs_update_credits()
1897 rs_send_credits(rs); in rs_update_credits()
1900 static int rs_poll_cq(struct rsocket *rs) in rs_poll_cq() argument
1906 while ((ret = ibv_poll_cq(rs->cm_id->recv_cq, 1, &wc)) > 0) { in rs_poll_cq()
1915 msg = ((uint32_t *) (rs->rbuf + rs->rbuf_size)) in rs_poll_cq()
1921 rs->sseq_comp = (uint16_t) rs_msg_data(msg); in rs_poll_cq()
1928 rs->state = rs_disconnected; in rs_poll_cq()
1931 if (rs->state & rs_writable) { in rs_poll_cq()
1932 rs->state &= ~rs_readable; in rs_poll_cq()
1934 rs->state = rs_disconnected; in rs_poll_cq()
1943 rs->rmsg[rs->rmsg_tail].op = rs_msg_op(msg); in rs_poll_cq()
1944 rs->rmsg[rs->rmsg_tail].data = rs_msg_data(msg); in rs_poll_cq()
1945 if (++rs->rmsg_tail == rs->rq_size + 1) in rs_poll_cq()
1946 rs->rmsg_tail = 0; in rs_poll_cq()
1952 rs->ctrl_max_seqno++; in rs_poll_cq()
1955 rs->ctrl_max_seqno++; in rs_poll_cq()
1957 rs->state = rs_disconnected; in rs_poll_cq()
1960 rs->sqe_avail++; in rs_poll_cq()
1962 rs->sbuf_bytes_avail += sizeof(struct rs_iomap); in rs_poll_cq()
1965 rs->sqe_avail++; in rs_poll_cq()
1966 rs->sbuf_bytes_avail += rs_msg_data(rs_wr_data(wc.wr_id)); in rs_poll_cq()
1969 if (wc.status != IBV_WC_SUCCESS && (rs->state & rs_connected)) { in rs_poll_cq()
1970 rs->state = rs_error; in rs_poll_cq()
1971 rs->err = EIO; in rs_poll_cq()
1976 if (rs->state & rs_connected) { in rs_poll_cq()
1978 ret = rs_post_recv(rs); in rs_poll_cq()
1981 rs->state = rs_error; in rs_poll_cq()
1982 rs->err = errno; in rs_poll_cq()
1988 static int rs_get_cq_event(struct rsocket *rs) in rs_get_cq_event() argument
1994 if (!rs->cq_armed) in rs_get_cq_event()
1997 ret = ibv_get_cq_event(rs->cm_id->recv_cq_channel, &cq, &context); in rs_get_cq_event()
1999 if (++rs->unack_cqe >= rs->sq_size + rs->rq_size) { in rs_get_cq_event()
2000 ibv_ack_cq_events(rs->cm_id->recv_cq, rs->unack_cqe); in rs_get_cq_event()
2001 rs->unack_cqe = 0; in rs_get_cq_event()
2003 rs->cq_armed = 0; in rs_get_cq_event()
2005 rs->state = rs_error; in rs_get_cq_event()
2025 static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) in rs_process_cq() argument
2029 fastlock_acquire(&rs->cq_lock); in rs_process_cq()
2031 rs_update_credits(rs); in rs_process_cq()
2032 ret = rs_poll_cq(rs); in rs_process_cq()
2033 if (test(rs)) { in rs_process_cq()
2040 } else if (!rs->cq_armed) { in rs_process_cq()
2041 ibv_req_notify_cq(rs->cm_id->recv_cq, 0); in rs_process_cq()
2042 rs->cq_armed = 1; in rs_process_cq()
2044 rs_update_credits(rs); in rs_process_cq()
2045 fastlock_acquire(&rs->cq_wait_lock); in rs_process_cq()
2046 fastlock_release(&rs->cq_lock); in rs_process_cq()
2048 ret = rs_get_cq_event(rs); in rs_process_cq()
2049 fastlock_release(&rs->cq_wait_lock); in rs_process_cq()
2050 fastlock_acquire(&rs->cq_lock); in rs_process_cq()
2054 rs_update_credits(rs); in rs_process_cq()
2055 fastlock_release(&rs->cq_lock); in rs_process_cq()
2059 static int rs_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) in rs_get_comp() argument
2066 ret = rs_process_cq(rs, 1, test); in rs_get_comp()
2078 ret = rs_process_cq(rs, 0, test); in rs_get_comp()
2099 static void ds_poll_cqs(struct rsocket *rs) in ds_poll_cqs() argument
2107 if (!(qp = rs->qp_list)) in ds_poll_cqs()
2120 if (rs->rqe_avail && wc.status == IBV_WC_SUCCESS && in ds_poll_cqs()
2122 rs->rqe_avail--; in ds_poll_cqs()
2123 rmsg = &rs->dmsg[rs->rmsg_tail]; in ds_poll_cqs()
2127 if (++rs->rmsg_tail == rs->rq_size + 1) in ds_poll_cqs()
2128 rs->rmsg_tail = 0; in ds_poll_cqs()
2130 ds_post_recv(rs, qp, rs_wr_data(wc.wr_id)); in ds_poll_cqs()
2133 smsg = (struct ds_smsg *) (rs->sbuf + rs_wr_data(wc.wr_id)); in ds_poll_cqs()
2134 smsg->next = rs->smsg_free; in ds_poll_cqs()
2135 rs->smsg_free = smsg; in ds_poll_cqs()
2136 rs->sqe_avail++; in ds_poll_cqs()
2140 if (!rs->rqe_avail && rs->sqe_avail) { in ds_poll_cqs()
2141 rs->qp_list = qp; in ds_poll_cqs()
2145 } while (qp != rs->qp_list); in ds_poll_cqs()
2149 static void ds_req_notify_cqs(struct rsocket *rs) in ds_req_notify_cqs() argument
2153 if (!(qp = rs->qp_list)) in ds_req_notify_cqs()
2162 } while (qp != rs->qp_list); in ds_req_notify_cqs()
2165 static int ds_get_cq_event(struct rsocket *rs) in ds_get_cq_event() argument
2173 if (!rs->cq_armed) in ds_get_cq_event()
2176 ret = epoll_wait(rs->epfd, &event, 1, -1); in ds_get_cq_event()
2185 rs->cq_armed = 0; in ds_get_cq_event()
2191 static int ds_process_cqs(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) in ds_process_cqs() argument
2195 fastlock_acquire(&rs->cq_lock); in ds_process_cqs()
2197 ds_poll_cqs(rs); in ds_process_cqs()
2198 if (test(rs)) { in ds_process_cqs()
2203 } else if (!rs->cq_armed) { in ds_process_cqs()
2204 ds_req_notify_cqs(rs); in ds_process_cqs()
2205 rs->cq_armed = 1; in ds_process_cqs()
2207 fastlock_acquire(&rs->cq_wait_lock); in ds_process_cqs()
2208 fastlock_release(&rs->cq_lock); in ds_process_cqs()
2210 ret = ds_get_cq_event(rs); in ds_process_cqs()
2211 fastlock_release(&rs->cq_wait_lock); in ds_process_cqs()
2212 fastlock_acquire(&rs->cq_lock); in ds_process_cqs()
2216 fastlock_release(&rs->cq_lock); in ds_process_cqs()
2220 static int ds_get_comp(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) in ds_get_comp() argument
2227 ret = ds_process_cqs(rs, 1, test); in ds_get_comp()
2239 ret = ds_process_cqs(rs, 0, test); in ds_get_comp()
2243 static int rs_nonblocking(struct rsocket *rs, int flags) in rs_nonblocking() argument
2245 return (rs->fd_flags & O_NONBLOCK) || (flags & MSG_DONTWAIT); in rs_nonblocking()
2248 static int rs_is_cq_armed(struct rsocket *rs) in rs_is_cq_armed() argument
2250 return rs->cq_armed; in rs_is_cq_armed()
2253 static int rs_poll_all(struct rsocket *rs) in rs_poll_all() argument
2267 static int rs_can_send(struct rsocket *rs) in rs_can_send() argument
2269 if (!(rs->opts & RS_OPT_MSG_SEND)) { in rs_can_send()
2270 return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) && in rs_can_send()
2271 (rs->sseq_no != rs->sseq_comp) && in rs_can_send()
2272 (rs->target_sgl[rs->target_sge].length != 0); in rs_can_send()
2274 return (rs->sqe_avail >= 2) && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) && in rs_can_send()
2275 (rs->sseq_no != rs->sseq_comp) && in rs_can_send()
2276 (rs->target_sgl[rs->target_sge].length != 0); in rs_can_send()
2280 static int ds_can_send(struct rsocket *rs) in ds_can_send() argument
2282 return rs->sqe_avail; in ds_can_send()
2285 static int ds_all_sends_done(struct rsocket *rs) in ds_all_sends_done() argument
2287 return rs->sqe_avail == rs->sq_size; in ds_all_sends_done()
2290 static int rs_conn_can_send(struct rsocket *rs) in rs_conn_can_send() argument
2292 return rs_can_send(rs) || !(rs->state & rs_writable); in rs_conn_can_send()
2295 static int rs_conn_can_send_ctrl(struct rsocket *rs) in rs_conn_can_send_ctrl() argument
2297 return rs_ctrl_avail(rs) || !(rs->state & rs_connected); in rs_conn_can_send_ctrl()
2300 static int rs_have_rdata(struct rsocket *rs) in rs_have_rdata() argument
2302 return (rs->rmsg_head != rs->rmsg_tail); in rs_have_rdata()
2305 static int rs_conn_have_rdata(struct rsocket *rs) in rs_conn_have_rdata() argument
2307 return rs_have_rdata(rs) || !(rs->state & rs_readable); in rs_conn_have_rdata()
2310 static int rs_conn_all_sends_done(struct rsocket *rs) in rs_conn_all_sends_done() argument
2312 return ((((int) rs->ctrl_max_seqno) - ((int) rs->ctrl_seqno)) + in rs_conn_all_sends_done()
2313 rs->sqe_avail == rs->sq_size) || in rs_conn_all_sends_done()
2314 !(rs->state & rs_connected); in rs_conn_all_sends_done()
2342 static ssize_t ds_recvfrom(struct rsocket *rs, void *buf, size_t len, int flags, in ds_recvfrom() argument
2349 if (!(rs->state & rs_readable)) in ds_recvfrom()
2352 if (!rs_have_rdata(rs)) { in ds_recvfrom()
2353 ret = ds_get_comp(rs, rs_nonblocking(rs, flags), in ds_recvfrom()
2359 rmsg = &rs->dmsg[rs->rmsg_head]; in ds_recvfrom()
2369 ds_post_recv(rs, rmsg->qp, rmsg->offset); in ds_recvfrom()
2370 if (++rs->rmsg_head == rs->rq_size + 1) in ds_recvfrom()
2371 rs->rmsg_head = 0; in ds_recvfrom()
2372 rs->rqe_avail++; in ds_recvfrom()
2378 static ssize_t rs_peek(struct rsocket *rs, void *buf, size_t len) in rs_peek() argument
2384 rmsg_head = rs->rmsg_head; in rs_peek()
2385 rbuf_offset = rs->rbuf_offset; in rs_peek()
2387 for (; left && (rmsg_head != rs->rmsg_tail); left -= rsize) { in rs_peek()
2388 if (left < rs->rmsg[rmsg_head].data) { in rs_peek()
2391 rsize = rs->rmsg[rmsg_head].data; in rs_peek()
2392 if (++rmsg_head == rs->rq_size + 1) in rs_peek()
2396 end_size = rs->rbuf_size - rbuf_offset; in rs_peek()
2398 memcpy(buf, &rs->rbuf[rbuf_offset], end_size); in rs_peek()
2404 memcpy(buf, &rs->rbuf[rbuf_offset], rsize); in rs_peek()
2417 struct rsocket *rs; in rrecv() local
2422 rs = idm_at(&idm, socket); in rrecv()
2423 if (rs->type == SOCK_DGRAM) { in rrecv()
2424 fastlock_acquire(&rs->rlock); in rrecv()
2425 ret = ds_recvfrom(rs, buf, len, flags, NULL, NULL); in rrecv()
2426 fastlock_release(&rs->rlock); in rrecv()
2430 if (rs->state & rs_opening) { in rrecv()
2431 ret = rs_do_connect(rs); in rrecv()
2438 fastlock_acquire(&rs->rlock); in rrecv()
2440 if (!rs_have_rdata(rs)) { in rrecv()
2441 ret = rs_get_comp(rs, rs_nonblocking(rs, flags), in rrecv()
2448 left = len - rs_peek(rs, buf, left); in rrecv()
2452 for (; left && rs_have_rdata(rs); left -= rsize) { in rrecv()
2453 if (left < rs->rmsg[rs->rmsg_head].data) { in rrecv()
2455 rs->rmsg[rs->rmsg_head].data -= left; in rrecv()
2457 rs->rseq_no++; in rrecv()
2458 rsize = rs->rmsg[rs->rmsg_head].data; in rrecv()
2459 if (++rs->rmsg_head == rs->rq_size + 1) in rrecv()
2460 rs->rmsg_head = 0; in rrecv()
2463 end_size = rs->rbuf_size - rs->rbuf_offset; in rrecv()
2465 memcpy(buf, &rs->rbuf[rs->rbuf_offset], end_size); in rrecv()
2466 rs->rbuf_offset = 0; in rrecv()
2470 rs->rbuf_bytes_avail += end_size; in rrecv()
2472 memcpy(buf, &rs->rbuf[rs->rbuf_offset], rsize); in rrecv()
2473 rs->rbuf_offset += rsize; in rrecv()
2475 rs->rbuf_bytes_avail += rsize; in rrecv()
2478 } while (left && (flags & MSG_WAITALL) && (rs->state & rs_readable)); in rrecv()
2480 fastlock_release(&rs->rlock); in rrecv()
2487 struct rsocket *rs; in rrecvfrom() local
2490 rs = idm_at(&idm, socket); in rrecvfrom()
2491 if (rs->type == SOCK_DGRAM) { in rrecvfrom()
2492 fastlock_acquire(&rs->rlock); in rrecvfrom()
2493 ret = ds_recvfrom(rs, buf, len, flags, src_addr, addrlen); in rrecvfrom()
2494 fastlock_release(&rs->rlock); in rrecvfrom()
2532 static int rs_send_iomaps(struct rsocket *rs, int flags) in rs_send_iomaps() argument
2539 fastlock_acquire(&rs->map_lock); in rs_send_iomaps()
2540 while (!dlist_empty(&rs->iomap_queue)) { in rs_send_iomaps()
2541 if (!rs_can_send(rs)) { in rs_send_iomaps()
2542 ret = rs_get_comp(rs, rs_nonblocking(rs, flags), in rs_send_iomaps()
2546 if (!(rs->state & rs_writable)) { in rs_send_iomaps()
2552 iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr, entry); in rs_send_iomaps()
2553 if (!(rs->opts & RS_OPT_SWAP_SGL)) { in rs_send_iomaps()
2565 if (rs->sq_inline >= sizeof iom) { in rs_send_iomaps()
2569 ret = rs_write_iomap(rs, iomr, &sge, 1, IBV_SEND_INLINE); in rs_send_iomaps()
2570 } else if (rs_sbuf_left(rs) >= sizeof iom) { in rs_send_iomaps()
2571 memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, sizeof iom); in rs_send_iomaps()
2572 rs->ssgl[0].length = sizeof iom; in rs_send_iomaps()
2573 ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0); in rs_send_iomaps()
2574 if (rs_sbuf_left(rs) > sizeof iom) in rs_send_iomaps()
2575 rs->ssgl[0].addr += sizeof iom; in rs_send_iomaps()
2577 rs->ssgl[0].addr = (uintptr_t) rs->sbuf; in rs_send_iomaps()
2579 rs->ssgl[0].length = rs_sbuf_left(rs); in rs_send_iomaps()
2580 memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom, in rs_send_iomaps()
2581 rs->ssgl[0].length); in rs_send_iomaps()
2582 rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length; in rs_send_iomaps()
2583 memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length, in rs_send_iomaps()
2584 rs->ssgl[1].length); in rs_send_iomaps()
2585 ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0); in rs_send_iomaps()
2586 rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; in rs_send_iomaps()
2589 dlist_insert_tail(&iomr->entry, &rs->iomap_list); in rs_send_iomaps()
2594 rs->iomap_pending = !dlist_empty(&rs->iomap_queue); in rs_send_iomaps()
2595 fastlock_release(&rs->map_lock); in rs_send_iomaps()
2599 static ssize_t ds_sendv_udp(struct rsocket *rs, const struct iovec *iov, in ds_sendv_udp() argument
2611 hdr.version = rs->conn_dest->qp->hdr.version; in ds_sendv_udp()
2614 hdr.qpn = htobe32(rs->conn_dest->qp->cm_id->qp->qp_num & 0xFFFFFF); in ds_sendv_udp()
2615 if (rs->conn_dest->qp->hdr.version == 4) { in ds_sendv_udp()
2617 hdr.addr.ipv4 = rs->conn_dest->qp->hdr.addr.ipv4; in ds_sendv_udp()
2620 memcpy(hdr.addr.ipv6, &rs->conn_dest->qp->hdr.addr.ipv6, 16); in ds_sendv_udp()
2629 msg.msg_name = &rs->conn_dest->addr; in ds_sendv_udp()
2630 msg.msg_namelen = ucma_addrlen(&rs->conn_dest->addr.sa); in ds_sendv_udp()
2633 ret = sendmsg(rs->udp_sock, &msg, flags); in ds_sendv_udp()
2637 static ssize_t ds_send_udp(struct rsocket *rs, const void *buf, size_t len, in ds_send_udp() argument
2644 return ds_sendv_udp(rs, &iov, 1, flags, op); in ds_send_udp()
2646 return ds_sendv_udp(rs, NULL, 0, flags, op); in ds_send_udp()
2650 static ssize_t dsend(struct rsocket *rs, const void *buf, size_t len, int flags) in dsend() argument
2657 if (!rs->conn_dest->ah) in dsend()
2658 return ds_send_udp(rs, buf, len, flags, RS_OP_DATA); in dsend()
2660 if (!ds_can_send(rs)) { in dsend()
2661 ret = ds_get_comp(rs, rs_nonblocking(rs, flags), ds_can_send); in dsend()
2666 msg = rs->smsg_free; in dsend()
2667 rs->smsg_free = msg->next; in dsend()
2668 rs->sqe_avail--; in dsend()
2670 memcpy((void *) msg, &rs->conn_dest->qp->hdr, rs->conn_dest->qp->hdr.length); in dsend()
2671 memcpy((void *) msg + rs->conn_dest->qp->hdr.length, buf, len); in dsend()
2673 sge.length = rs->conn_dest->qp->hdr.length + len; in dsend()
2674 sge.lkey = rs->conn_dest->qp->smr->lkey; in dsend()
2675 offset = (uint8_t *) msg - rs->sbuf; in dsend()
2677 ret = ds_post_send(rs, &sge, offset); in dsend()
2687 struct rsocket *rs; in rsend() local
2693 rs = idm_at(&idm, socket); in rsend()
2694 if (rs->type == SOCK_DGRAM) { in rsend()
2695 fastlock_acquire(&rs->slock); in rsend()
2696 ret = dsend(rs, buf, len, flags); in rsend()
2697 fastlock_release(&rs->slock); in rsend()
2701 if (rs->state & rs_opening) { in rsend()
2702 ret = rs_do_connect(rs); in rsend()
2710 fastlock_acquire(&rs->slock); in rsend()
2711 if (rs->iomap_pending) { in rsend()
2712 ret = rs_send_iomaps(rs, flags); in rsend()
2717 if (!rs_can_send(rs)) { in rsend()
2718 ret = rs_get_comp(rs, rs_nonblocking(rs, flags), in rsend()
2722 if (!(rs->state & rs_writable)) { in rsend()
2736 if (xfer_size > rs->sbuf_bytes_avail) in rsend()
2737 xfer_size = rs->sbuf_bytes_avail; in rsend()
2738 if (xfer_size > rs->target_sgl[rs->target_sge].length) in rsend()
2739 xfer_size = rs->target_sgl[rs->target_sge].length; in rsend()
2741 if (xfer_size <= rs->sq_inline) { in rsend()
2745 ret = rs_write_data(rs, &sge, 1, xfer_size, IBV_SEND_INLINE); in rsend()
2746 } else if (xfer_size <= rs_sbuf_left(rs)) { in rsend()
2747 memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size); in rsend()
2748 rs->ssgl[0].length = xfer_size; in rsend()
2749 ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, 0); in rsend()
2750 if (xfer_size < rs_sbuf_left(rs)) in rsend()
2751 rs->ssgl[0].addr += xfer_size; in rsend()
2753 rs->ssgl[0].addr = (uintptr_t) rs->sbuf; in rsend()
2755 rs->ssgl[0].length = rs_sbuf_left(rs); in rsend()
2756 memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, in rsend()
2757 rs->ssgl[0].length); in rsend()
2758 rs->ssgl[1].length = xfer_size - rs->ssgl[0].length; in rsend()
2759 memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length); in rsend()
2760 ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, 0); in rsend()
2761 rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; in rsend()
2767 fastlock_release(&rs->slock); in rsend()
2775 struct rsocket *rs; in rsendto() local
2778 rs = idm_at(&idm, socket); in rsendto()
2779 if (rs->type == SOCK_STREAM) { in rsendto()
2786 if (rs->state == rs_init) { in rsendto()
2787 ret = ds_init_ep(rs); in rsendto()
2792 fastlock_acquire(&rs->slock); in rsendto()
2793 if (!rs->conn_dest || ds_compare_addr(dest_addr, &rs->conn_dest->addr)) { in rsendto()
2794 ret = ds_get_dest(rs, dest_addr, addrlen, &rs->conn_dest); in rsendto()
2799 ret = dsend(rs, buf, len, flags); in rsendto()
2801 fastlock_release(&rs->slock); in rsendto()
2827 struct rsocket *rs; in rsendv() local
2833 rs = idm_at(&idm, socket); in rsendv()
2834 if (rs->state & rs_opening) { in rsendv()
2835 ret = rs_do_connect(rs); in rsendv()
2849 fastlock_acquire(&rs->slock); in rsendv()
2850 if (rs->iomap_pending) { in rsendv()
2851 ret = rs_send_iomaps(rs, flags); in rsendv()
2856 if (!rs_can_send(rs)) { in rsendv()
2857 ret = rs_get_comp(rs, rs_nonblocking(rs, flags), in rsendv()
2861 if (!(rs->state & rs_writable)) { in rsendv()
2875 if (xfer_size > rs->sbuf_bytes_avail) in rsendv()
2876 xfer_size = rs->sbuf_bytes_avail; in rsendv()
2877 if (xfer_size > rs->target_sgl[rs->target_sge].length) in rsendv()
2878 xfer_size = rs->target_sgl[rs->target_sge].length; in rsendv()
2880 if (xfer_size <= rs_sbuf_left(rs)) { in rsendv()
2881 rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr, in rsendv()
2883 rs->ssgl[0].length = xfer_size; in rsendv()
2884 ret = rs_write_data(rs, rs->ssgl, 1, xfer_size, in rsendv()
2885 xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0); in rsendv()
2886 if (xfer_size < rs_sbuf_left(rs)) in rsendv()
2887 rs->ssgl[0].addr += xfer_size; in rsendv()
2889 rs->ssgl[0].addr = (uintptr_t) rs->sbuf; in rsendv()
2891 rs->ssgl[0].length = rs_sbuf_left(rs); in rsendv()
2892 rs_copy_iov((void *) (uintptr_t) rs->ssgl[0].addr, &cur_iov, in rsendv()
2893 &offset, rs->ssgl[0].length); in rsendv()
2894 rs->ssgl[1].length = xfer_size - rs->ssgl[0].length; in rsendv()
2895 rs_copy_iov(rs->sbuf, &cur_iov, &offset, rs->ssgl[1].length); in rsendv()
2896 ret = rs_write_data(rs, rs->ssgl, 2, xfer_size, in rsendv()
2897 xfer_size <= rs->sq_inline ? IBV_SEND_INLINE : 0); in rsendv()
2898 rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; in rsendv()
2904 fastlock_release(&rs->slock); in rsendv()
2943 static int rs_poll_rs(struct rsocket *rs, int events, in rs_poll_rs() argument
2944 int nonblock, int (*test)(struct rsocket *rs)) in rs_poll_rs() argument
2951 if ((rs->type == SOCK_STREAM) && ((rs->state & rs_connected) || in rs_poll_rs()
2952 (rs->state == rs_disconnected) || (rs->state & rs_error))) { in rs_poll_rs()
2953 rs_process_cq(rs, nonblock, test); in rs_poll_rs()
2956 if ((events & POLLIN) && rs_conn_have_rdata(rs)) in rs_poll_rs()
2958 if ((events & POLLOUT) && rs_can_send(rs)) in rs_poll_rs()
2960 if (!(rs->state & rs_connected)) { in rs_poll_rs()
2961 if (rs->state == rs_disconnected) in rs_poll_rs()
2968 } else if (rs->type == SOCK_DGRAM) { in rs_poll_rs()
2969 ds_process_cqs(rs, nonblock, test); in rs_poll_rs()
2972 if ((events & POLLIN) && rs_have_rdata(rs)) in rs_poll_rs()
2974 if ((events & POLLOUT) && ds_can_send(rs)) in rs_poll_rs()
2980 if (rs->state == rs_listening) { in rs_poll_rs()
2981 fds.fd = rs->cm_id->channel->fd; in rs_poll_rs()
2988 if (rs->state & rs_opening) { in rs_poll_rs()
2989 ret = rs_do_connect(rs); in rs_poll_rs()
2997 if (rs->state == rs_connect_error) { in rs_poll_rs()
3012 struct rsocket *rs; in rs_poll_check() local
3016 rs = idm_lookup(&idm, fds[i].fd); in rs_poll_check()
3017 if (rs) in rs_poll_check()
3018 fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); in rs_poll_check()
3030 struct rsocket *rs; in rs_poll_arm() local
3034 rs = idm_lookup(&idm, fds[i].fd); in rs_poll_arm()
3035 if (rs) { in rs_poll_arm()
3036 fds[i].revents = rs_poll_rs(rs, fds[i].events, 0, rs_is_cq_armed); in rs_poll_arm()
3040 if (rs->type == SOCK_STREAM) { in rs_poll_arm()
3041 if (rs->state >= rs_connected) in rs_poll_arm()
3042 rfds[i].fd = rs->cm_id->recv_cq_channel->fd; in rs_poll_arm()
3044 rfds[i].fd = rs->cm_id->channel->fd; in rs_poll_arm()
3046 rfds[i].fd = rs->epfd; in rs_poll_arm()
3060 struct rsocket *rs; in rs_poll_events() local
3067 rs = idm_lookup(&idm, fds[i].fd); in rs_poll_events()
3068 if (rs) { in rs_poll_events()
3069 fastlock_acquire(&rs->cq_wait_lock); in rs_poll_events()
3070 if (rs->type == SOCK_STREAM) in rs_poll_events()
3071 rs_get_cq_event(rs); in rs_poll_events()
3073 ds_get_cq_event(rs); in rs_poll_events()
3074 fastlock_release(&rs->cq_wait_lock); in rs_poll_events()
3075 fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); in rs_poll_events()
3226 struct rsocket *rs; in rshutdown() local
3229 rs = idm_lookup(&idm, socket); in rshutdown()
3230 if (!rs) in rshutdown()
3232 if (rs->opts & RS_OPT_SVC_ACTIVE) in rshutdown()
3233 rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE); in rshutdown()
3235 if (rs->fd_flags & O_NONBLOCK) in rshutdown()
3236 rs_set_nonblocking(rs, 0); in rshutdown()
3238 if (rs->state & rs_connected) { in rshutdown()
3241 rs->state &= ~(rs_readable | rs_writable); in rshutdown()
3243 rs->state &= ~rs_writable; in rshutdown()
3244 ctrl = (rs->state & rs_readable) ? in rshutdown()
3247 rs->state &= ~rs_readable; in rshutdown()
3248 if (rs->state & rs_writable) in rshutdown()
3252 if (!rs_ctrl_avail(rs)) { in rshutdown()
3253 ret = rs_process_cq(rs, 0, rs_conn_can_send_ctrl); in rshutdown()
3258 if ((rs->state & rs_connected) && rs_ctrl_avail(rs)) { in rshutdown()
3259 rs->ctrl_seqno++; in rshutdown()
3260 ret = rs_post_msg(rs, rs_msg_set(RS_OP_CTRL, ctrl)); in rshutdown()
3264 if (rs->state & rs_connected) in rshutdown()
3265 rs_process_cq(rs, 0, rs_conn_all_sends_done); in rshutdown()
3268 if ((rs->fd_flags & O_NONBLOCK) && (rs->state & rs_connected)) in rshutdown()
3269 rs_set_nonblocking(rs, rs->fd_flags); in rshutdown()
3271 if (rs->state & rs_disconnected) { in rshutdown()
3273 ibv_req_notify_cq(rs->cm_id->recv_cq, 0); in rshutdown()
3274 ucma_shutdown(rs->cm_id); in rshutdown()
3280 static void ds_shutdown(struct rsocket *rs) in ds_shutdown() argument
3282 if (rs->opts & RS_OPT_SVC_ACTIVE) in ds_shutdown()
3283 rs_notify_svc(&udp_svc, rs, RS_SVC_REM_DGRAM); in ds_shutdown()
3285 if (rs->fd_flags & O_NONBLOCK) in ds_shutdown()
3286 rs_set_nonblocking(rs, 0); in ds_shutdown()
3288 rs->state &= ~(rs_readable | rs_writable); in ds_shutdown()
3289 ds_process_cqs(rs, 0, ds_all_sends_done); in ds_shutdown()
3291 if (rs->fd_flags & O_NONBLOCK) in ds_shutdown()
3292 rs_set_nonblocking(rs, rs->fd_flags); in ds_shutdown()
3297 struct rsocket *rs; in rclose() local
3299 rs = idm_lookup(&idm, socket); in rclose()
3300 if (!rs) in rclose()
3302 if (rs->type == SOCK_STREAM) { in rclose()
3303 if (rs->state & rs_connected) in rclose()
3305 else if (rs->opts & RS_OPT_SVC_ACTIVE) in rclose()
3306 rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE); in rclose()
3308 ds_shutdown(rs); in rclose()
3311 rs_free(rs); in rclose()
3331 struct rsocket *rs; in rgetpeername() local
3333 rs = idm_lookup(&idm, socket); in rgetpeername()
3334 if (!rs) in rgetpeername()
3336 if (rs->type == SOCK_STREAM) { in rgetpeername()
3337 rs_copy_addr(addr, rdma_get_peer_addr(rs->cm_id), addrlen); in rgetpeername()
3340 return getpeername(rs->udp_sock, addr, addrlen); in rgetpeername()
3346 struct rsocket *rs; in rgetsockname() local
3348 rs = idm_lookup(&idm, socket); in rgetsockname()
3349 if (!rs) in rgetsockname()
3351 if (rs->type == SOCK_STREAM) { in rgetsockname()
3352 rs_copy_addr(addr, rdma_get_local_addr(rs->cm_id), addrlen); in rgetsockname()
3355 return getsockname(rs->udp_sock, addr, addrlen); in rgetsockname()
3359 static int rs_set_keepalive(struct rsocket *rs, int on) in rs_set_keepalive() argument
3364 if ((on && (rs->opts & RS_OPT_SVC_ACTIVE)) || in rs_set_keepalive()
3365 (!on && !(rs->opts & RS_OPT_SVC_ACTIVE))) in rs_set_keepalive()
3369 if (!rs->keepalive_time) { in rs_set_keepalive()
3371 if (fscanf(f, "%u", &rs->keepalive_time) != 1) in rs_set_keepalive()
3372 rs->keepalive_time = 7200; in rs_set_keepalive()
3375 rs->keepalive_time = 7200; in rs_set_keepalive()
3378 ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_ADD_KEEPALIVE); in rs_set_keepalive()
3380 ret = rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE); in rs_set_keepalive()
3389 struct rsocket *rs; in rsetsockopt() local
3394 rs = idm_lookup(&idm, socket); in rsetsockopt()
3395 if (!rs) in rsetsockopt()
3397 if (rs->type == SOCK_DGRAM && level != SOL_RDMA) { in rsetsockopt()
3398 ret = setsockopt(rs->udp_sock, level, optname, optval, optlen); in rsetsockopt()
3405 opts = &rs->so_opts; in rsetsockopt()
3408 if (rs->type == SOCK_STREAM) { in rsetsockopt()
3409 ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, in rsetsockopt()
3412 if (ret && ((errno == ENOSYS) || ((rs->state != rs_init) && in rsetsockopt()
3413 rs->cm_id->context && in rsetsockopt()
3414 (rs->cm_id->verbs->device->transport_type == IBV_TRANSPORT_IB)))) in rsetsockopt()
3420 if ((rs->type == SOCK_STREAM && !rs->rbuf) || in rsetsockopt()
3421 (rs->type == SOCK_DGRAM && !rs->qp_list)) in rsetsockopt()
3422 rs->rbuf_size = (*(uint32_t *) optval) << 1; in rsetsockopt()
3426 if (!rs->sbuf) in rsetsockopt()
3427 rs->sbuf_size = (*(uint32_t *) optval) << 1; in rsetsockopt()
3428 if (rs->sbuf_size < RS_SNDLOWAT) in rsetsockopt()
3429 rs->sbuf_size = RS_SNDLOWAT << 1; in rsetsockopt()
3438 ret = rs_set_keepalive(rs, *(int *) optval); in rsetsockopt()
3439 opt_on = rs->opts & RS_OPT_SVC_ACTIVE; in rsetsockopt()
3450 opts = &rs->tcp_opts; in rsetsockopt()
3461 rs->keepalive_time = *(int *) optval; in rsetsockopt()
3462 ret = (rs->opts & RS_OPT_SVC_ACTIVE) ? in rsetsockopt()
3463 rs_notify_svc(&tcp_svc, rs, RS_SVC_MOD_KEEPALIVE) : 0; in rsetsockopt()
3477 opts = &rs->ipv6_opts; in rsetsockopt()
3480 if (rs->type == SOCK_STREAM) { in rsetsockopt()
3481 ret = rdma_set_option(rs->cm_id, RDMA_OPTION_ID, in rsetsockopt()
3492 if (rs->state >= rs_opening) { in rsetsockopt()
3499 rs->sq_size = min_t(uint32_t, (*(uint32_t *)optval), in rsetsockopt()
3504 rs->rq_size = min_t(uint32_t, (*(uint32_t *)optval), in rsetsockopt()
3509 rs->sq_inline = min_t(uint32_t, *(uint32_t *)optval, in rsetsockopt()
3514 rs->target_iomap_size = (uint16_t) rs_scale_to_value( in rsetsockopt()
3519 if ((rs->optval = malloc(optlen))) { in rsetsockopt()
3520 memcpy(rs->optval, optval, optlen); in rsetsockopt()
3521 rs->optlen = optlen; in rsetsockopt()
3570 struct rsocket *rs; in rgetsockopt() local
3578 rs = idm_lookup(&idm, socket); in rgetsockopt()
3579 if (!rs) in rgetsockopt()
3587 *((int *) optval) = !!(rs->so_opts & (1 << optname)); in rgetsockopt()
3591 *((int *) optval) = rs->rbuf_size; in rgetsockopt()
3595 *((int *) optval) = rs->sbuf_size; in rgetsockopt()
3601 !(rs->so_opts & (1 << optname)); in rgetsockopt()
3606 *((int *) optval) = rs->err; in rgetsockopt()
3608 rs->err = 0; in rgetsockopt()
3622 *((int *) optval) = (int) rs->keepalive_time; in rgetsockopt()
3626 *((int *) optval) = !!(rs->tcp_opts & (1 << optname)); in rgetsockopt()
3630 *((int *) optval) = (rs->cm_id && rs->cm_id->route.num_paths) ? in rgetsockopt()
3631 1 << (7 + rs->cm_id->route.path_rec->mtu) : in rgetsockopt()
3643 *((int *) optval) = !!(rs->ipv6_opts & (1 << optname)); in rgetsockopt()
3654 *((int *) optval) = rs->sq_size; in rgetsockopt()
3658 *((int *) optval) = rs->rq_size; in rgetsockopt()
3662 *((int *) optval) = rs->sq_inline; in rgetsockopt()
3666 *((int *) optval) = rs->target_iomap_size; in rgetsockopt()
3670 if (rs->optval) { in rgetsockopt()
3671 if (*optlen < rs->optlen) { in rgetsockopt()
3674 memcpy(rs->optval, optval, rs->optlen); in rgetsockopt()
3675 *optlen = rs->optlen; in rgetsockopt()
3683 path_rec = rs->cm_id->route.path_rec; in rgetsockopt()
3686 num_paths < rs->cm_id->route.num_paths) { in rgetsockopt()
3714 struct rsocket *rs; in rfcntl() local
3719 rs = idm_lookup(&idm, socket); in rfcntl()
3720 if (!rs) in rfcntl()
3725 ret = rs->fd_flags; in rfcntl()
3729 if ((rs->fd_flags & O_NONBLOCK) != (param & O_NONBLOCK)) in rfcntl()
3730 ret = rs_set_nonblocking(rs, param & O_NONBLOCK); in rfcntl()
3733 rs->fd_flags = param; in rfcntl()
3743 static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs) in rs_get_iomap_mr() argument
3747 if (!rs->remote_iomappings) { in rs_get_iomap_mr()
3748 rs->remote_iomappings = calloc(rs->remote_iomap.length, in rs_get_iomap_mr()
3749 sizeof(*rs->remote_iomappings)); in rs_get_iomap_mr()
3750 if (!rs->remote_iomappings) in rs_get_iomap_mr()
3753 for (i = 0; i < rs->remote_iomap.length; i++) in rs_get_iomap_mr()
3754 rs->remote_iomappings[i].index = i; in rs_get_iomap_mr()
3757 for (i = 0; i < rs->remote_iomap.length; i++) { in rs_get_iomap_mr()
3758 if (!rs->remote_iomappings[i].mr) in rs_get_iomap_mr()
3759 return &rs->remote_iomappings[i]; in rs_get_iomap_mr()
3771 struct rsocket *rs; in riomap() local
3775 rs = idm_at(&idm, socket); in riomap()
3776 if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE))) in riomap()
3779 fastlock_acquire(&rs->map_lock); in riomap()
3781 iomr = rs_get_iomap_mr(rs); in riomap()
3792 iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access); in riomap()
3806 dlist_insert_tail(&iomr->entry, &rs->iomap_queue); in riomap()
3807 rs->iomap_pending = 1; in riomap()
3809 dlist_insert_tail(&iomr->entry, &rs->iomap_list); in riomap()
3812 fastlock_release(&rs->map_lock); in riomap()
3818 struct rsocket *rs; in riounmap() local
3823 rs = idm_at(&idm, socket); in riounmap()
3824 fastlock_acquire(&rs->map_lock); in riounmap()
3826 for (entry = rs->iomap_list.next; entry != &rs->iomap_list; in riounmap()
3835 for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue; in riounmap()
3845 fastlock_release(&rs->map_lock); in riounmap()
3849 static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset) in rs_find_iomap() argument
3853 for (i = 0; i < rs->target_iomap_size; i++) { in rs_find_iomap()
3854 if (offset >= rs->target_iomap[i].offset && in rs_find_iomap()
3855 offset < rs->target_iomap[i].offset + rs->target_iomap[i].sge.length) in rs_find_iomap()
3856 return &rs->target_iomap[i]; in rs_find_iomap()
3863 struct rsocket *rs; in riowrite() local
3870 rs = idm_at(&idm, socket); in riowrite()
3871 fastlock_acquire(&rs->slock); in riowrite()
3872 if (rs->iomap_pending) { in riowrite()
3873 ret = rs_send_iomaps(rs, flags); in riowrite()
3879 iom = rs_find_iomap(rs, offset); in riowrite()
3884 if (!rs_can_send(rs)) { in riowrite()
3885 ret = rs_get_comp(rs, rs_nonblocking(rs, flags), in riowrite()
3889 if (!(rs->state & rs_writable)) { in riowrite()
3903 if (xfer_size > rs->sbuf_bytes_avail) in riowrite()
3904 xfer_size = rs->sbuf_bytes_avail; in riowrite()
3908 if (xfer_size <= rs->sq_inline) { in riowrite()
3912 ret = rs_write_direct(rs, iom, offset, &sge, 1, in riowrite()
3914 } else if (xfer_size <= rs_sbuf_left(rs)) { in riowrite()
3915 memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, xfer_size); in riowrite()
3916 rs->ssgl[0].length = xfer_size; in riowrite()
3917 ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1, xfer_size, 0); in riowrite()
3918 if (xfer_size < rs_sbuf_left(rs)) in riowrite()
3919 rs->ssgl[0].addr += xfer_size; in riowrite()
3921 rs->ssgl[0].addr = (uintptr_t) rs->sbuf; in riowrite()
3923 rs->ssgl[0].length = rs_sbuf_left(rs); in riowrite()
3924 memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf, in riowrite()
3925 rs->ssgl[0].length); in riowrite()
3926 rs->ssgl[1].length = xfer_size - rs->ssgl[0].length; in riowrite()
3927 memcpy(rs->sbuf, buf + rs->ssgl[0].length, rs->ssgl[1].length); in riowrite()
3928 ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2, xfer_size, 0); in riowrite()
3929 rs->ssgl[0].addr = (uintptr_t) rs->sbuf + rs->ssgl[1].length; in riowrite()
3935 fastlock_release(&rs->slock); in riowrite()
3970 static int rs_svc_add_rs(struct rs_svc *svc, struct rsocket *rs) in rs_svc_add_rs() argument
3980 svc->rss[++svc->cnt] = rs; in rs_svc_add_rs()
3984 static int rs_svc_index(struct rs_svc *svc, struct rsocket *rs) in rs_svc_index() argument
3989 if (svc->rss[i] == rs) in rs_svc_index()
3995 static int rs_svc_rm_rs(struct rs_svc *svc, struct rsocket *rs) in rs_svc_rm_rs() argument
3999 if ((i = rs_svc_index(svc, rs)) >= 0) { in rs_svc_rm_rs()
4017 msg.status = rs_svc_add_rs(svc, msg.rs); in udp_svc_process_sock()
4019 msg.rs->opts |= RS_OPT_SVC_ACTIVE; in udp_svc_process_sock()
4021 udp_svc_fds[svc->cnt].fd = msg.rs->udp_sock; in udp_svc_process_sock()
4027 msg.status = rs_svc_rm_rs(svc, msg.rs); in udp_svc_process_sock()
4029 msg.rs->opts &= ~RS_OPT_SVC_ACTIVE; in udp_svc_process_sock()
4064 static void udp_svc_create_ah(struct rsocket *rs, struct ds_dest *dest, uint32_t qpn) in udp_svc_create_ah() argument
4072 fastlock_acquire(&rs->slock); in udp_svc_create_ah()
4075 fastlock_release(&rs->slock); in udp_svc_create_ah()
4111 fastlock_acquire(&rs->slock); in udp_svc_create_ah()
4114 fastlock_release(&rs->slock); in udp_svc_create_ah()
4129 static void udp_svc_forward(struct rsocket *rs, void *buf, size_t len, in udp_svc_forward() argument
4137 if (!ds_can_send(rs)) { in udp_svc_forward()
4138 if (ds_get_comp(rs, 0, ds_can_send)) in udp_svc_forward()
4142 msg = rs->smsg_free; in udp_svc_forward()
4143 rs->smsg_free = msg->next; in udp_svc_forward()
4144 rs->sqe_avail--; in udp_svc_forward()
4151 sge.lkey = rs->conn_dest->qp->smr->lkey; in udp_svc_forward()
4152 offset = (uint8_t *) msg - rs->sbuf; in udp_svc_forward()
4154 ds_post_send(rs, &sge, offset); in udp_svc_forward()
4157 static void udp_svc_process_rs(struct rsocket *rs) in udp_svc_process_rs() argument
4167 ret = recvfrom(rs->udp_sock, buf, sizeof buf, 0, &addr.sa, &addrlen); in udp_svc_process_rs()
4181 ret = ds_get_dest(rs, &addr.sa, addrlen, &dest); in udp_svc_process_rs()
4186 fastlock_acquire(&rs->slock); in udp_svc_process_rs()
4187 cur_dest = rs->conn_dest; in udp_svc_process_rs()
4188 rs->conn_dest = dest; in udp_svc_process_rs()
4189 ds_send_udp(rs, NULL, 0, 0, RS_OP_CTRL); in udp_svc_process_rs()
4190 rs->conn_dest = cur_dest; in udp_svc_process_rs()
4191 fastlock_release(&rs->slock); in udp_svc_process_rs()
4195 udp_svc_create_ah(rs, dest, qpn); in udp_svc_process_rs()
4199 fastlock_acquire(&rs->slock); in udp_svc_process_rs()
4200 cur_dest = rs->conn_dest; in udp_svc_process_rs()
4201 rs->conn_dest = &dest->qp->dest; in udp_svc_process_rs()
4202 udp_svc_forward(rs, buf + udp_hdr->length, len, &addr); in udp_svc_process_rs()
4203 rs->conn_dest = cur_dest; in udp_svc_process_rs()
4204 fastlock_release(&rs->slock); in udp_svc_process_rs()
4258 msg.status = rs_svc_add_rs(svc, msg.rs); in tcp_svc_process_sock()
4260 msg.rs->opts |= RS_OPT_SVC_ACTIVE; in tcp_svc_process_sock()
4263 msg.rs->keepalive_time; in tcp_svc_process_sock()
4267 msg.status = rs_svc_rm_rs(svc, msg.rs); in tcp_svc_process_sock()
4269 msg.rs->opts &= ~RS_OPT_SVC_ACTIVE; in tcp_svc_process_sock()
4272 i = rs_svc_index(svc, msg.rs); in tcp_svc_process_sock()
4274 tcp_svc_timeouts[i] = rs_get_time() + msg.rs->keepalive_time; in tcp_svc_process_sock()
4293 static void tcp_svc_send_keepalive(struct rsocket *rs) in tcp_svc_send_keepalive() argument
4295 fastlock_acquire(&rs->cq_lock); in tcp_svc_send_keepalive()
4296 if (rs_ctrl_avail(rs) && (rs->state & rs_connected)) { in tcp_svc_send_keepalive()
4297 rs->ctrl_seqno++; in tcp_svc_send_keepalive()
4298 rs_post_write(rs, NULL, 0, rs_msg_set(RS_OP_CTRL, RS_CTRL_KEEPALIVE), in tcp_svc_send_keepalive()
4301 fastlock_release(&rs->cq_lock); in tcp_svc_send_keepalive()