1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/crc32c.h> 4 #include <linux/ctype.h> 5 #include <linux/highmem.h> 6 #include <linux/inet.h> 7 #include <linux/kthread.h> 8 #include <linux/net.h> 9 #include <linux/slab.h> 10 #include <linux/socket.h> 11 #include <linux/string.h> 12 #include <linux/bio.h> 13 #include <linux/blkdev.h> 14 #include <net/tcp.h> 15 16 #include <linux/ceph/libceph.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/pagelist.h> 20 21 /* 22 * Ceph uses the messenger to exchange ceph_msg messages with other 23 * hosts in the system. The messenger provides ordered and reliable 24 * delivery. We tolerate TCP disconnects by reconnecting (with 25 * exponential backoff) in the case of a fault (disconnection, bad 26 * crc, protocol error). Acks allow sent messages to be discarded by 27 * the sender. 28 */ 29 30 /* static tag bytes (protocol control messages) */ 31 static char tag_msg = CEPH_MSGR_TAG_MSG; 32 static char tag_ack = CEPH_MSGR_TAG_ACK; 33 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 34 35 #ifdef CONFIG_LOCKDEP 36 static struct lock_class_key socket_class; 37 #endif 38 39 40 static void queue_con(struct ceph_connection *con); 41 static void con_work(struct work_struct *); 42 static void ceph_fault(struct ceph_connection *con); 43 44 /* 45 * nicely render a sockaddr as a string. 46 */ 47 #define MAX_ADDR_STR 20 48 #define MAX_ADDR_STR_LEN 60 49 static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN]; 50 static DEFINE_SPINLOCK(addr_str_lock); 51 static int last_addr_str; 52 53 const char *ceph_pr_addr(const struct sockaddr_storage *ss) 54 { 55 int i; 56 char *s; 57 struct sockaddr_in *in4 = (void *)ss; 58 struct sockaddr_in6 *in6 = (void *)ss; 59 60 spin_lock(&addr_str_lock); 61 i = last_addr_str++; 62 if (last_addr_str == MAX_ADDR_STR) 63 last_addr_str = 0; 64 spin_unlock(&addr_str_lock); 65 s = addr_str[i]; 66 67 switch (ss->ss_family) { 68 case AF_INET: 69 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr, 70 (unsigned int)ntohs(in4->sin_port)); 71 break; 72 73 case AF_INET6: 74 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr, 75 (unsigned int)ntohs(in6->sin6_port)); 76 break; 77 78 default: 79 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)", 80 (int)ss->ss_family); 81 } 82 83 return s; 84 } 85 EXPORT_SYMBOL(ceph_pr_addr); 86 87 static void encode_my_addr(struct ceph_messenger *msgr) 88 { 89 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); 90 ceph_encode_addr(&msgr->my_enc_addr); 91 } 92 93 /* 94 * work queue for all reading and writing to/from the socket. 95 */ 96 struct workqueue_struct *ceph_msgr_wq; 97 98 int ceph_msgr_init(void) 99 { 100 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); 101 if (!ceph_msgr_wq) { 102 pr_err("msgr_init failed to create workqueue\n"); 103 return -ENOMEM; 104 } 105 return 0; 106 } 107 EXPORT_SYMBOL(ceph_msgr_init); 108 109 void ceph_msgr_exit(void) 110 { 111 destroy_workqueue(ceph_msgr_wq); 112 } 113 EXPORT_SYMBOL(ceph_msgr_exit); 114 115 void ceph_msgr_flush(void) 116 { 117 flush_workqueue(ceph_msgr_wq); 118 } 119 EXPORT_SYMBOL(ceph_msgr_flush); 120 121 122 /* 123 * socket callback functions 124 */ 125 126 /* data available on socket, or listen socket received a connect */ 127 static void ceph_data_ready(struct sock *sk, int count_unused) 128 { 129 struct ceph_connection *con = 130 (struct ceph_connection *)sk->sk_user_data; 131 if (sk->sk_state != TCP_CLOSE_WAIT) { 132 dout("ceph_data_ready on %p state = %lu, queueing work\n", 133 con, con->state); 134 queue_con(con); 135 } 136 } 137 138 /* socket has buffer space for writing */ 139 static void ceph_write_space(struct sock *sk) 140 { 141 struct ceph_connection *con = 142 (struct ceph_connection *)sk->sk_user_data; 143 144 /* only queue to workqueue if there is data we want to write. */ 145 if (test_bit(WRITE_PENDING, &con->state)) { 146 dout("ceph_write_space %p queueing write work\n", con); 147 queue_con(con); 148 } else { 149 dout("ceph_write_space %p nothing to write\n", con); 150 } 151 152 /* since we have our own write_space, clear the SOCK_NOSPACE flag */ 153 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 154 } 155 156 /* socket's state has changed */ 157 static void ceph_state_change(struct sock *sk) 158 { 159 struct ceph_connection *con = 160 (struct ceph_connection *)sk->sk_user_data; 161 162 dout("ceph_state_change %p state = %lu sk_state = %u\n", 163 con, con->state, sk->sk_state); 164 165 if (test_bit(CLOSED, &con->state)) 166 return; 167 168 switch (sk->sk_state) { 169 case TCP_CLOSE: 170 dout("ceph_state_change TCP_CLOSE\n"); 171 case TCP_CLOSE_WAIT: 172 dout("ceph_state_change TCP_CLOSE_WAIT\n"); 173 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { 174 if (test_bit(CONNECTING, &con->state)) 175 con->error_msg = "connection failed"; 176 else 177 con->error_msg = "socket closed"; 178 queue_con(con); 179 } 180 break; 181 case TCP_ESTABLISHED: 182 dout("ceph_state_change TCP_ESTABLISHED\n"); 183 queue_con(con); 184 break; 185 } 186 } 187 188 /* 189 * set up socket callbacks 190 */ 191 static void set_sock_callbacks(struct socket *sock, 192 struct ceph_connection *con) 193 { 194 struct sock *sk = sock->sk; 195 sk->sk_user_data = (void *)con; 196 sk->sk_data_ready = ceph_data_ready; 197 sk->sk_write_space = ceph_write_space; 198 sk->sk_state_change = ceph_state_change; 199 } 200 201 202 /* 203 * socket helpers 204 */ 205 206 /* 207 * initiate connection to a remote socket. 208 */ 209 static struct socket *ceph_tcp_connect(struct ceph_connection *con) 210 { 211 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; 212 struct socket *sock; 213 int ret; 214 215 BUG_ON(con->sock); 216 ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, 217 IPPROTO_TCP, &sock); 218 if (ret) 219 return ERR_PTR(ret); 220 con->sock = sock; 221 sock->sk->sk_allocation = GFP_NOFS; 222 223 #ifdef CONFIG_LOCKDEP 224 lockdep_set_class(&sock->sk->sk_lock, &socket_class); 225 #endif 226 227 set_sock_callbacks(sock, con); 228 229 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); 230 231 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), 232 O_NONBLOCK); 233 if (ret == -EINPROGRESS) { 234 dout("connect %s EINPROGRESS sk_state = %u\n", 235 ceph_pr_addr(&con->peer_addr.in_addr), 236 sock->sk->sk_state); 237 ret = 0; 238 } 239 if (ret < 0) { 240 pr_err("connect %s error %d\n", 241 ceph_pr_addr(&con->peer_addr.in_addr), ret); 242 sock_release(sock); 243 con->sock = NULL; 244 con->error_msg = "connect error"; 245 } 246 247 if (ret < 0) 248 return ERR_PTR(ret); 249 return sock; 250 } 251 252 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 253 { 254 struct kvec iov = {buf, len}; 255 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 256 int r; 257 258 r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); 259 if (r == -EAGAIN) 260 r = 0; 261 return r; 262 } 263 264 /* 265 * write something. @more is true if caller will be sending more data 266 * shortly. 267 */ 268 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 269 size_t kvlen, size_t len, int more) 270 { 271 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 272 int r; 273 274 if (more) 275 msg.msg_flags |= MSG_MORE; 276 else 277 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 278 279 r = kernel_sendmsg(sock, &msg, iov, kvlen, len); 280 if (r == -EAGAIN) 281 r = 0; 282 return r; 283 } 284 285 286 /* 287 * Shutdown/close the socket for the given connection. 288 */ 289 static int con_close_socket(struct ceph_connection *con) 290 { 291 int rc; 292 293 dout("con_close_socket on %p sock %p\n", con, con->sock); 294 if (!con->sock) 295 return 0; 296 set_bit(SOCK_CLOSED, &con->state); 297 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 298 sock_release(con->sock); 299 con->sock = NULL; 300 clear_bit(SOCK_CLOSED, &con->state); 301 return rc; 302 } 303 304 /* 305 * Reset a connection. Discard all incoming and outgoing messages 306 * and clear *_seq state. 307 */ 308 static void ceph_msg_remove(struct ceph_msg *msg) 309 { 310 list_del_init(&msg->list_head); 311 ceph_msg_put(msg); 312 } 313 static void ceph_msg_remove_list(struct list_head *head) 314 { 315 while (!list_empty(head)) { 316 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 317 list_head); 318 ceph_msg_remove(msg); 319 } 320 } 321 322 static void reset_connection(struct ceph_connection *con) 323 { 324 /* reset connection, out_queue, msg_ and connect_seq */ 325 /* discard existing out_queue and msg_seq */ 326 ceph_msg_remove_list(&con->out_queue); 327 ceph_msg_remove_list(&con->out_sent); 328 329 if (con->in_msg) { 330 ceph_msg_put(con->in_msg); 331 con->in_msg = NULL; 332 } 333 334 con->connect_seq = 0; 335 con->out_seq = 0; 336 if (con->out_msg) { 337 ceph_msg_put(con->out_msg); 338 con->out_msg = NULL; 339 } 340 con->in_seq = 0; 341 con->in_seq_acked = 0; 342 } 343 344 /* 345 * mark a peer down. drop any open connections. 346 */ 347 void ceph_con_close(struct ceph_connection *con) 348 { 349 dout("con_close %p peer %s\n", con, 350 ceph_pr_addr(&con->peer_addr.in_addr)); 351 set_bit(CLOSED, &con->state); /* in case there's queued work */ 352 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 353 clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ 354 clear_bit(KEEPALIVE_PENDING, &con->state); 355 clear_bit(WRITE_PENDING, &con->state); 356 mutex_lock(&con->mutex); 357 reset_connection(con); 358 con->peer_global_seq = 0; 359 cancel_delayed_work(&con->work); 360 mutex_unlock(&con->mutex); 361 queue_con(con); 362 } 363 EXPORT_SYMBOL(ceph_con_close); 364 365 /* 366 * Reopen a closed connection, with a new peer address. 367 */ 368 void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) 369 { 370 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 371 set_bit(OPENING, &con->state); 372 clear_bit(CLOSED, &con->state); 373 memcpy(&con->peer_addr, addr, sizeof(*addr)); 374 con->delay = 0; /* reset backoff memory */ 375 queue_con(con); 376 } 377 EXPORT_SYMBOL(ceph_con_open); 378 379 /* 380 * return true if this connection ever successfully opened 381 */ 382 bool ceph_con_opened(struct ceph_connection *con) 383 { 384 return con->connect_seq > 0; 385 } 386 387 /* 388 * generic get/put 389 */ 390 struct ceph_connection *ceph_con_get(struct ceph_connection *con) 391 { 392 dout("con_get %p nref = %d -> %d\n", con, 393 atomic_read(&con->nref), atomic_read(&con->nref) + 1); 394 if (atomic_inc_not_zero(&con->nref)) 395 return con; 396 return NULL; 397 } 398 399 void ceph_con_put(struct ceph_connection *con) 400 { 401 dout("con_put %p nref = %d -> %d\n", con, 402 atomic_read(&con->nref), atomic_read(&con->nref) - 1); 403 BUG_ON(atomic_read(&con->nref) == 0); 404 if (atomic_dec_and_test(&con->nref)) { 405 BUG_ON(con->sock); 406 kfree(con); 407 } 408 } 409 410 /* 411 * initialize a new connection. 412 */ 413 void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) 414 { 415 dout("con_init %p\n", con); 416 memset(con, 0, sizeof(*con)); 417 atomic_set(&con->nref, 1); 418 con->msgr = msgr; 419 mutex_init(&con->mutex); 420 INIT_LIST_HEAD(&con->out_queue); 421 INIT_LIST_HEAD(&con->out_sent); 422 INIT_DELAYED_WORK(&con->work, con_work); 423 } 424 EXPORT_SYMBOL(ceph_con_init); 425 426 427 /* 428 * We maintain a global counter to order connection attempts. Get 429 * a unique seq greater than @gt. 430 */ 431 static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) 432 { 433 u32 ret; 434 435 spin_lock(&msgr->global_seq_lock); 436 if (msgr->global_seq < gt) 437 msgr->global_seq = gt; 438 ret = ++msgr->global_seq; 439 spin_unlock(&msgr->global_seq_lock); 440 return ret; 441 } 442 443 444 /* 445 * Prepare footer for currently outgoing message, and finish things 446 * off. Assumes out_kvec* are already valid.. we just add on to the end. 447 */ 448 static void prepare_write_message_footer(struct ceph_connection *con, int v) 449 { 450 struct ceph_msg *m = con->out_msg; 451 452 dout("prepare_write_message_footer %p\n", con); 453 con->out_kvec_is_msg = true; 454 con->out_kvec[v].iov_base = &m->footer; 455 con->out_kvec[v].iov_len = sizeof(m->footer); 456 con->out_kvec_bytes += sizeof(m->footer); 457 con->out_kvec_left++; 458 con->out_more = m->more_to_follow; 459 con->out_msg_done = true; 460 } 461 462 /* 463 * Prepare headers for the next outgoing message. 464 */ 465 static void prepare_write_message(struct ceph_connection *con) 466 { 467 struct ceph_msg *m; 468 int v = 0; 469 470 con->out_kvec_bytes = 0; 471 con->out_kvec_is_msg = true; 472 con->out_msg_done = false; 473 474 /* Sneak an ack in there first? If we can get it into the same 475 * TCP packet that's a good thing. */ 476 if (con->in_seq > con->in_seq_acked) { 477 con->in_seq_acked = con->in_seq; 478 con->out_kvec[v].iov_base = &tag_ack; 479 con->out_kvec[v++].iov_len = 1; 480 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 481 con->out_kvec[v].iov_base = &con->out_temp_ack; 482 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack); 483 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); 484 } 485 486 m = list_first_entry(&con->out_queue, 487 struct ceph_msg, list_head); 488 con->out_msg = m; 489 490 /* put message on sent list */ 491 ceph_msg_get(m); 492 list_move_tail(&m->list_head, &con->out_sent); 493 494 /* 495 * only assign outgoing seq # if we haven't sent this message 496 * yet. if it is requeued, resend with it's original seq. 497 */ 498 if (m->needs_out_seq) { 499 m->hdr.seq = cpu_to_le64(++con->out_seq); 500 m->needs_out_seq = false; 501 } 502 503 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 504 m, con->out_seq, le16_to_cpu(m->hdr.type), 505 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 506 le32_to_cpu(m->hdr.data_len), 507 m->nr_pages); 508 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 509 510 /* tag + hdr + front + middle */ 511 con->out_kvec[v].iov_base = &tag_msg; 512 con->out_kvec[v++].iov_len = 1; 513 con->out_kvec[v].iov_base = &m->hdr; 514 con->out_kvec[v++].iov_len = sizeof(m->hdr); 515 con->out_kvec[v++] = m->front; 516 if (m->middle) 517 con->out_kvec[v++] = m->middle->vec; 518 con->out_kvec_left = v; 519 con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len + 520 (m->middle ? m->middle->vec.iov_len : 0); 521 con->out_kvec_cur = con->out_kvec; 522 523 /* fill in crc (except data pages), footer */ 524 con->out_msg->hdr.crc = 525 cpu_to_le32(crc32c(0, (void *)&m->hdr, 526 sizeof(m->hdr) - sizeof(m->hdr.crc))); 527 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; 528 con->out_msg->footer.front_crc = 529 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len)); 530 if (m->middle) 531 con->out_msg->footer.middle_crc = 532 cpu_to_le32(crc32c(0, m->middle->vec.iov_base, 533 m->middle->vec.iov_len)); 534 else 535 con->out_msg->footer.middle_crc = 0; 536 con->out_msg->footer.data_crc = 0; 537 dout("prepare_write_message front_crc %u data_crc %u\n", 538 le32_to_cpu(con->out_msg->footer.front_crc), 539 le32_to_cpu(con->out_msg->footer.middle_crc)); 540 541 /* is there a data payload? */ 542 if (le32_to_cpu(m->hdr.data_len) > 0) { 543 /* initialize page iterator */ 544 con->out_msg_pos.page = 0; 545 if (m->pages) 546 con->out_msg_pos.page_pos = m->page_alignment; 547 else 548 con->out_msg_pos.page_pos = 0; 549 con->out_msg_pos.data_pos = 0; 550 con->out_msg_pos.did_page_crc = 0; 551 con->out_more = 1; /* data + footer will follow */ 552 } else { 553 /* no, queue up footer too and be done */ 554 prepare_write_message_footer(con, v); 555 } 556 557 set_bit(WRITE_PENDING, &con->state); 558 } 559 560 /* 561 * Prepare an ack. 562 */ 563 static void prepare_write_ack(struct ceph_connection *con) 564 { 565 dout("prepare_write_ack %p %llu -> %llu\n", con, 566 con->in_seq_acked, con->in_seq); 567 con->in_seq_acked = con->in_seq; 568 569 con->out_kvec[0].iov_base = &tag_ack; 570 con->out_kvec[0].iov_len = 1; 571 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 572 con->out_kvec[1].iov_base = &con->out_temp_ack; 573 con->out_kvec[1].iov_len = sizeof(con->out_temp_ack); 574 con->out_kvec_left = 2; 575 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); 576 con->out_kvec_cur = con->out_kvec; 577 con->out_more = 1; /* more will follow.. eventually.. */ 578 set_bit(WRITE_PENDING, &con->state); 579 } 580 581 /* 582 * Prepare to write keepalive byte. 583 */ 584 static void prepare_write_keepalive(struct ceph_connection *con) 585 { 586 dout("prepare_write_keepalive %p\n", con); 587 con->out_kvec[0].iov_base = &tag_keepalive; 588 con->out_kvec[0].iov_len = 1; 589 con->out_kvec_left = 1; 590 con->out_kvec_bytes = 1; 591 con->out_kvec_cur = con->out_kvec; 592 set_bit(WRITE_PENDING, &con->state); 593 } 594 595 /* 596 * Connection negotiation. 597 */ 598 599 static int prepare_connect_authorizer(struct ceph_connection *con) 600 { 601 void *auth_buf; 602 int auth_len = 0; 603 int auth_protocol = 0; 604 605 mutex_unlock(&con->mutex); 606 if (con->ops->get_authorizer) 607 con->ops->get_authorizer(con, &auth_buf, &auth_len, 608 &auth_protocol, &con->auth_reply_buf, 609 &con->auth_reply_buf_len, 610 con->auth_retry); 611 mutex_lock(&con->mutex); 612 613 if (test_bit(CLOSED, &con->state) || 614 test_bit(OPENING, &con->state)) 615 return -EAGAIN; 616 617 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 618 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 619 620 if (auth_len) { 621 con->out_kvec[con->out_kvec_left].iov_base = auth_buf; 622 con->out_kvec[con->out_kvec_left].iov_len = auth_len; 623 con->out_kvec_left++; 624 con->out_kvec_bytes += auth_len; 625 } 626 return 0; 627 } 628 629 /* 630 * We connected to a peer and are saying hello. 631 */ 632 static void prepare_write_banner(struct ceph_messenger *msgr, 633 struct ceph_connection *con) 634 { 635 int len = strlen(CEPH_BANNER); 636 637 con->out_kvec[0].iov_base = CEPH_BANNER; 638 con->out_kvec[0].iov_len = len; 639 con->out_kvec[1].iov_base = &msgr->my_enc_addr; 640 con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr); 641 con->out_kvec_left = 2; 642 con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr); 643 con->out_kvec_cur = con->out_kvec; 644 con->out_more = 0; 645 set_bit(WRITE_PENDING, &con->state); 646 } 647 648 static int prepare_write_connect(struct ceph_messenger *msgr, 649 struct ceph_connection *con, 650 int after_banner) 651 { 652 unsigned global_seq = get_global_seq(con->msgr, 0); 653 int proto; 654 655 switch (con->peer_name.type) { 656 case CEPH_ENTITY_TYPE_MON: 657 proto = CEPH_MONC_PROTOCOL; 658 break; 659 case CEPH_ENTITY_TYPE_OSD: 660 proto = CEPH_OSDC_PROTOCOL; 661 break; 662 case CEPH_ENTITY_TYPE_MDS: 663 proto = CEPH_MDSC_PROTOCOL; 664 break; 665 default: 666 BUG(); 667 } 668 669 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 670 con->connect_seq, global_seq, proto); 671 672 con->out_connect.features = cpu_to_le64(msgr->supported_features); 673 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 674 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 675 con->out_connect.global_seq = cpu_to_le32(global_seq); 676 con->out_connect.protocol_version = cpu_to_le32(proto); 677 con->out_connect.flags = 0; 678 679 if (!after_banner) { 680 con->out_kvec_left = 0; 681 con->out_kvec_bytes = 0; 682 } 683 con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect; 684 con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect); 685 con->out_kvec_left++; 686 con->out_kvec_bytes += sizeof(con->out_connect); 687 con->out_kvec_cur = con->out_kvec; 688 con->out_more = 0; 689 set_bit(WRITE_PENDING, &con->state); 690 691 return prepare_connect_authorizer(con); 692 } 693 694 695 /* 696 * write as much of pending kvecs to the socket as we can. 697 * 1 -> done 698 * 0 -> socket full, but more to do 699 * <0 -> error 700 */ 701 static int write_partial_kvec(struct ceph_connection *con) 702 { 703 int ret; 704 705 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 706 while (con->out_kvec_bytes > 0) { 707 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 708 con->out_kvec_left, con->out_kvec_bytes, 709 con->out_more); 710 if (ret <= 0) 711 goto out; 712 con->out_kvec_bytes -= ret; 713 if (con->out_kvec_bytes == 0) 714 break; /* done */ 715 while (ret > 0) { 716 if (ret >= con->out_kvec_cur->iov_len) { 717 ret -= con->out_kvec_cur->iov_len; 718 con->out_kvec_cur++; 719 con->out_kvec_left--; 720 } else { 721 con->out_kvec_cur->iov_len -= ret; 722 con->out_kvec_cur->iov_base += ret; 723 ret = 0; 724 break; 725 } 726 } 727 } 728 con->out_kvec_left = 0; 729 con->out_kvec_is_msg = false; 730 ret = 1; 731 out: 732 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 733 con->out_kvec_bytes, con->out_kvec_left, ret); 734 return ret; /* done! */ 735 } 736 737 #ifdef CONFIG_BLOCK 738 static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) 739 { 740 if (!bio) { 741 *iter = NULL; 742 *seg = 0; 743 return; 744 } 745 *iter = bio; 746 *seg = bio->bi_idx; 747 } 748 749 static void iter_bio_next(struct bio **bio_iter, int *seg) 750 { 751 if (*bio_iter == NULL) 752 return; 753 754 BUG_ON(*seg >= (*bio_iter)->bi_vcnt); 755 756 (*seg)++; 757 if (*seg == (*bio_iter)->bi_vcnt) 758 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); 759 } 760 #endif 761 762 /* 763 * Write as much message data payload as we can. If we finish, queue 764 * up the footer. 765 * 1 -> done, footer is now queued in out_kvec[]. 766 * 0 -> socket full, but more to do 767 * <0 -> error 768 */ 769 static int write_partial_msg_pages(struct ceph_connection *con) 770 { 771 struct ceph_msg *msg = con->out_msg; 772 unsigned data_len = le32_to_cpu(msg->hdr.data_len); 773 size_t len; 774 int crc = con->msgr->nocrc; 775 int ret; 776 int total_max_write; 777 int in_trail = 0; 778 size_t trail_len = (msg->trail ? msg->trail->length : 0); 779 780 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 781 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 782 con->out_msg_pos.page_pos); 783 784 #ifdef CONFIG_BLOCK 785 if (msg->bio && !msg->bio_iter) 786 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); 787 #endif 788 789 while (data_len > con->out_msg_pos.data_pos) { 790 struct page *page = NULL; 791 void *kaddr = NULL; 792 int max_write = PAGE_SIZE; 793 int page_shift = 0; 794 795 total_max_write = data_len - trail_len - 796 con->out_msg_pos.data_pos; 797 798 /* 799 * if we are calculating the data crc (the default), we need 800 * to map the page. if our pages[] has been revoked, use the 801 * zero page. 802 */ 803 804 /* have we reached the trail part of the data? */ 805 if (con->out_msg_pos.data_pos >= data_len - trail_len) { 806 in_trail = 1; 807 808 total_max_write = data_len - con->out_msg_pos.data_pos; 809 810 page = list_first_entry(&msg->trail->head, 811 struct page, lru); 812 if (crc) 813 kaddr = kmap(page); 814 max_write = PAGE_SIZE; 815 } else if (msg->pages) { 816 page = msg->pages[con->out_msg_pos.page]; 817 if (crc) 818 kaddr = kmap(page); 819 } else if (msg->pagelist) { 820 page = list_first_entry(&msg->pagelist->head, 821 struct page, lru); 822 if (crc) 823 kaddr = kmap(page); 824 #ifdef CONFIG_BLOCK 825 } else if (msg->bio) { 826 struct bio_vec *bv; 827 828 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); 829 page = bv->bv_page; 830 page_shift = bv->bv_offset; 831 if (crc) 832 kaddr = kmap(page) + page_shift; 833 max_write = bv->bv_len; 834 #endif 835 } else { 836 page = con->msgr->zero_page; 837 if (crc) 838 kaddr = page_address(con->msgr->zero_page); 839 } 840 len = min_t(int, max_write - con->out_msg_pos.page_pos, 841 total_max_write); 842 843 if (crc && !con->out_msg_pos.did_page_crc) { 844 void *base = kaddr + con->out_msg_pos.page_pos; 845 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 846 847 BUG_ON(kaddr == NULL); 848 con->out_msg->footer.data_crc = 849 cpu_to_le32(crc32c(tmpcrc, base, len)); 850 con->out_msg_pos.did_page_crc = 1; 851 } 852 ret = kernel_sendpage(con->sock, page, 853 con->out_msg_pos.page_pos + page_shift, 854 len, 855 MSG_DONTWAIT | MSG_NOSIGNAL | 856 MSG_MORE); 857 858 if (crc && 859 (msg->pages || msg->pagelist || msg->bio || in_trail)) 860 kunmap(page); 861 862 if (ret == -EAGAIN) 863 ret = 0; 864 if (ret <= 0) 865 goto out; 866 867 con->out_msg_pos.data_pos += ret; 868 con->out_msg_pos.page_pos += ret; 869 if (ret == len) { 870 con->out_msg_pos.page_pos = 0; 871 con->out_msg_pos.page++; 872 con->out_msg_pos.did_page_crc = 0; 873 if (in_trail) 874 list_move_tail(&page->lru, 875 &msg->trail->head); 876 else if (msg->pagelist) 877 list_move_tail(&page->lru, 878 &msg->pagelist->head); 879 #ifdef CONFIG_BLOCK 880 else if (msg->bio) 881 iter_bio_next(&msg->bio_iter, &msg->bio_seg); 882 #endif 883 } 884 } 885 886 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 887 888 /* prepare and queue up footer, too */ 889 if (!crc) 890 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 891 con->out_kvec_bytes = 0; 892 con->out_kvec_left = 0; 893 con->out_kvec_cur = con->out_kvec; 894 prepare_write_message_footer(con, 0); 895 ret = 1; 896 out: 897 return ret; 898 } 899 900 /* 901 * write some zeros 902 */ 903 static int write_partial_skip(struct ceph_connection *con) 904 { 905 int ret; 906 907 while (con->out_skip > 0) { 908 struct kvec iov = { 909 .iov_base = page_address(con->msgr->zero_page), 910 .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE) 911 }; 912 913 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1); 914 if (ret <= 0) 915 goto out; 916 con->out_skip -= ret; 917 } 918 ret = 1; 919 out: 920 return ret; 921 } 922 923 /* 924 * Prepare to read connection handshake, or an ack. 925 */ 926 static void prepare_read_banner(struct ceph_connection *con) 927 { 928 dout("prepare_read_banner %p\n", con); 929 con->in_base_pos = 0; 930 } 931 932 static void prepare_read_connect(struct ceph_connection *con) 933 { 934 dout("prepare_read_connect %p\n", con); 935 con->in_base_pos = 0; 936 } 937 938 static void prepare_read_ack(struct ceph_connection *con) 939 { 940 dout("prepare_read_ack %p\n", con); 941 con->in_base_pos = 0; 942 } 943 944 static void prepare_read_tag(struct ceph_connection *con) 945 { 946 dout("prepare_read_tag %p\n", con); 947 con->in_base_pos = 0; 948 con->in_tag = CEPH_MSGR_TAG_READY; 949 } 950 951 /* 952 * Prepare to read a message. 953 */ 954 static int prepare_read_message(struct ceph_connection *con) 955 { 956 dout("prepare_read_message %p\n", con); 957 BUG_ON(con->in_msg != NULL); 958 con->in_base_pos = 0; 959 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 960 return 0; 961 } 962 963 964 static int read_partial(struct ceph_connection *con, 965 int *to, int size, void *object) 966 { 967 *to += size; 968 while (con->in_base_pos < *to) { 969 int left = *to - con->in_base_pos; 970 int have = size - left; 971 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 972 if (ret <= 0) 973 return ret; 974 con->in_base_pos += ret; 975 } 976 return 1; 977 } 978 979 980 /* 981 * Read all or part of the connect-side handshake on a new connection 982 */ 983 static int read_partial_banner(struct ceph_connection *con) 984 { 985 int ret, to = 0; 986 987 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 988 989 /* peer's banner */ 990 ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); 991 if (ret <= 0) 992 goto out; 993 ret = read_partial(con, &to, sizeof(con->actual_peer_addr), 994 &con->actual_peer_addr); 995 if (ret <= 0) 996 goto out; 997 ret = read_partial(con, &to, sizeof(con->peer_addr_for_me), 998 &con->peer_addr_for_me); 999 if (ret <= 0) 1000 goto out; 1001 out: 1002 return ret; 1003 } 1004 1005 static int read_partial_connect(struct ceph_connection *con) 1006 { 1007 int ret, to = 0; 1008 1009 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1010 1011 ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); 1012 if (ret <= 0) 1013 goto out; 1014 ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len), 1015 con->auth_reply_buf); 1016 if (ret <= 0) 1017 goto out; 1018 1019 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 1020 con, (int)con->in_reply.tag, 1021 le32_to_cpu(con->in_reply.connect_seq), 1022 le32_to_cpu(con->in_reply.global_seq)); 1023 out: 1024 return ret; 1025 1026 } 1027 1028 /* 1029 * Verify the hello banner looks okay. 1030 */ 1031 static int verify_hello(struct ceph_connection *con) 1032 { 1033 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 1034 pr_err("connect to %s got bad banner\n", 1035 ceph_pr_addr(&con->peer_addr.in_addr)); 1036 con->error_msg = "protocol error, bad banner"; 1037 return -1; 1038 } 1039 return 0; 1040 } 1041 1042 static bool addr_is_blank(struct sockaddr_storage *ss) 1043 { 1044 switch (ss->ss_family) { 1045 case AF_INET: 1046 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; 1047 case AF_INET6: 1048 return 1049 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && 1050 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && 1051 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && 1052 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; 1053 } 1054 return false; 1055 } 1056 1057 static int addr_port(struct sockaddr_storage *ss) 1058 { 1059 switch (ss->ss_family) { 1060 case AF_INET: 1061 return ntohs(((struct sockaddr_in *)ss)->sin_port); 1062 case AF_INET6: 1063 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port); 1064 } 1065 return 0; 1066 } 1067 1068 static void addr_set_port(struct sockaddr_storage *ss, int p) 1069 { 1070 switch (ss->ss_family) { 1071 case AF_INET: 1072 ((struct sockaddr_in *)ss)->sin_port = htons(p); 1073 break; 1074 case AF_INET6: 1075 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p); 1076 break; 1077 } 1078 } 1079 1080 /* 1081 * Parse an ip[:port] list into an addr array. Use the default 1082 * monitor port if a port isn't specified. 1083 */ 1084 int ceph_parse_ips(const char *c, const char *end, 1085 struct ceph_entity_addr *addr, 1086 int max_count, int *count) 1087 { 1088 int i; 1089 const char *p = c; 1090 1091 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 1092 for (i = 0; i < max_count; i++) { 1093 const char *ipend; 1094 struct sockaddr_storage *ss = &addr[i].in_addr; 1095 struct sockaddr_in *in4 = (void *)ss; 1096 struct sockaddr_in6 *in6 = (void *)ss; 1097 int port; 1098 char delim = ','; 1099 1100 if (*p == '[') { 1101 delim = ']'; 1102 p++; 1103 } 1104 1105 memset(ss, 0, sizeof(*ss)); 1106 if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr, 1107 delim, &ipend)) 1108 ss->ss_family = AF_INET; 1109 else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr, 1110 delim, &ipend)) 1111 ss->ss_family = AF_INET6; 1112 else 1113 goto bad; 1114 p = ipend; 1115 1116 if (delim == ']') { 1117 if (*p != ']') { 1118 dout("missing matching ']'\n"); 1119 goto bad; 1120 } 1121 p++; 1122 } 1123 1124 /* port? */ 1125 if (p < end && *p == ':') { 1126 port = 0; 1127 p++; 1128 while (p < end && *p >= '0' && *p <= '9') { 1129 port = (port * 10) + (*p - '0'); 1130 p++; 1131 } 1132 if (port > 65535 || port == 0) 1133 goto bad; 1134 } else { 1135 port = CEPH_MON_PORT; 1136 } 1137 1138 addr_set_port(ss, port); 1139 1140 dout("parse_ips got %s\n", ceph_pr_addr(ss)); 1141 1142 if (p == end) 1143 break; 1144 if (*p != ',') 1145 goto bad; 1146 p++; 1147 } 1148 1149 if (p != end) 1150 goto bad; 1151 1152 if (count) 1153 *count = i + 1; 1154 return 0; 1155 1156 bad: 1157 pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); 1158 return -EINVAL; 1159 } 1160 EXPORT_SYMBOL(ceph_parse_ips); 1161 1162 static int process_banner(struct ceph_connection *con) 1163 { 1164 dout("process_banner on %p\n", con); 1165 1166 if (verify_hello(con) < 0) 1167 return -1; 1168 1169 ceph_decode_addr(&con->actual_peer_addr); 1170 ceph_decode_addr(&con->peer_addr_for_me); 1171 1172 /* 1173 * Make sure the other end is who we wanted. note that the other 1174 * end may not yet know their ip address, so if it's 0.0.0.0, give 1175 * them the benefit of the doubt. 1176 */ 1177 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 1178 sizeof(con->peer_addr)) != 0 && 1179 !(addr_is_blank(&con->actual_peer_addr.in_addr) && 1180 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 1181 pr_warning("wrong peer, want %s/%d, got %s/%d\n", 1182 ceph_pr_addr(&con->peer_addr.in_addr), 1183 (int)le32_to_cpu(con->peer_addr.nonce), 1184 ceph_pr_addr(&con->actual_peer_addr.in_addr), 1185 (int)le32_to_cpu(con->actual_peer_addr.nonce)); 1186 con->error_msg = "wrong peer at address"; 1187 return -1; 1188 } 1189 1190 /* 1191 * did we learn our address? 1192 */ 1193 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { 1194 int port = addr_port(&con->msgr->inst.addr.in_addr); 1195 1196 memcpy(&con->msgr->inst.addr.in_addr, 1197 &con->peer_addr_for_me.in_addr, 1198 sizeof(con->peer_addr_for_me.in_addr)); 1199 addr_set_port(&con->msgr->inst.addr.in_addr, port); 1200 encode_my_addr(con->msgr); 1201 dout("process_banner learned my addr is %s\n", 1202 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); 1203 } 1204 1205 set_bit(NEGOTIATING, &con->state); 1206 prepare_read_connect(con); 1207 return 0; 1208 } 1209 1210 static void fail_protocol(struct ceph_connection *con) 1211 { 1212 reset_connection(con); 1213 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1214 1215 mutex_unlock(&con->mutex); 1216 if (con->ops->bad_proto) 1217 con->ops->bad_proto(con); 1218 mutex_lock(&con->mutex); 1219 } 1220 1221 static int process_connect(struct ceph_connection *con) 1222 { 1223 u64 sup_feat = con->msgr->supported_features; 1224 u64 req_feat = con->msgr->required_features; 1225 u64 server_feat = le64_to_cpu(con->in_reply.features); 1226 int ret; 1227 1228 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1229 1230 switch (con->in_reply.tag) { 1231 case CEPH_MSGR_TAG_FEATURES: 1232 pr_err("%s%lld %s feature set mismatch," 1233 " my %llx < server's %llx, missing %llx\n", 1234 ENTITY_NAME(con->peer_name), 1235 ceph_pr_addr(&con->peer_addr.in_addr), 1236 sup_feat, server_feat, server_feat & ~sup_feat); 1237 con->error_msg = "missing required protocol features"; 1238 fail_protocol(con); 1239 return -1; 1240 1241 case CEPH_MSGR_TAG_BADPROTOVER: 1242 pr_err("%s%lld %s protocol version mismatch," 1243 " my %d != server's %d\n", 1244 ENTITY_NAME(con->peer_name), 1245 ceph_pr_addr(&con->peer_addr.in_addr), 1246 le32_to_cpu(con->out_connect.protocol_version), 1247 le32_to_cpu(con->in_reply.protocol_version)); 1248 con->error_msg = "protocol version mismatch"; 1249 fail_protocol(con); 1250 return -1; 1251 1252 case CEPH_MSGR_TAG_BADAUTHORIZER: 1253 con->auth_retry++; 1254 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 1255 con->auth_retry); 1256 if (con->auth_retry == 2) { 1257 con->error_msg = "connect authorization failure"; 1258 return -1; 1259 } 1260 con->auth_retry = 1; 1261 ret = prepare_write_connect(con->msgr, con, 0); 1262 if (ret < 0) 1263 return ret; 1264 prepare_read_connect(con); 1265 break; 1266 1267 case CEPH_MSGR_TAG_RESETSESSION: 1268 /* 1269 * If we connected with a large connect_seq but the peer 1270 * has no record of a session with us (no connection, or 1271 * connect_seq == 0), they will send RESETSESION to indicate 1272 * that they must have reset their session, and may have 1273 * dropped messages. 1274 */ 1275 dout("process_connect got RESET peer seq %u\n", 1276 le32_to_cpu(con->in_connect.connect_seq)); 1277 pr_err("%s%lld %s connection reset\n", 1278 ENTITY_NAME(con->peer_name), 1279 ceph_pr_addr(&con->peer_addr.in_addr)); 1280 reset_connection(con); 1281 prepare_write_connect(con->msgr, con, 0); 1282 prepare_read_connect(con); 1283 1284 /* Tell ceph about it. */ 1285 mutex_unlock(&con->mutex); 1286 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); 1287 if (con->ops->peer_reset) 1288 con->ops->peer_reset(con); 1289 mutex_lock(&con->mutex); 1290 if (test_bit(CLOSED, &con->state) || 1291 test_bit(OPENING, &con->state)) 1292 return -EAGAIN; 1293 break; 1294 1295 case CEPH_MSGR_TAG_RETRY_SESSION: 1296 /* 1297 * If we sent a smaller connect_seq than the peer has, try 1298 * again with a larger value. 1299 */ 1300 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n", 1301 le32_to_cpu(con->out_connect.connect_seq), 1302 le32_to_cpu(con->in_connect.connect_seq)); 1303 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1304 prepare_write_connect(con->msgr, con, 0); 1305 prepare_read_connect(con); 1306 break; 1307 1308 case CEPH_MSGR_TAG_RETRY_GLOBAL: 1309 /* 1310 * If we sent a smaller global_seq than the peer has, try 1311 * again with a larger value. 1312 */ 1313 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 1314 con->peer_global_seq, 1315 le32_to_cpu(con->in_connect.global_seq)); 1316 get_global_seq(con->msgr, 1317 le32_to_cpu(con->in_connect.global_seq)); 1318 prepare_write_connect(con->msgr, con, 0); 1319 prepare_read_connect(con); 1320 break; 1321 1322 case CEPH_MSGR_TAG_READY: 1323 if (req_feat & ~server_feat) { 1324 pr_err("%s%lld %s protocol feature mismatch," 1325 " my required %llx > server's %llx, need %llx\n", 1326 ENTITY_NAME(con->peer_name), 1327 ceph_pr_addr(&con->peer_addr.in_addr), 1328 req_feat, server_feat, req_feat & ~server_feat); 1329 con->error_msg = "missing required protocol features"; 1330 fail_protocol(con); 1331 return -1; 1332 } 1333 clear_bit(CONNECTING, &con->state); 1334 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1335 con->connect_seq++; 1336 con->peer_features = server_feat; 1337 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1338 con->peer_global_seq, 1339 le32_to_cpu(con->in_reply.connect_seq), 1340 con->connect_seq); 1341 WARN_ON(con->connect_seq != 1342 le32_to_cpu(con->in_reply.connect_seq)); 1343 1344 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1345 set_bit(LOSSYTX, &con->state); 1346 1347 prepare_read_tag(con); 1348 break; 1349 1350 case CEPH_MSGR_TAG_WAIT: 1351 /* 1352 * If there is a connection race (we are opening 1353 * connections to each other), one of us may just have 1354 * to WAIT. This shouldn't happen if we are the 1355 * client. 1356 */ 1357 pr_err("process_connect got WAIT as client\n"); 1358 con->error_msg = "protocol error, got WAIT as client"; 1359 return -1; 1360 1361 default: 1362 pr_err("connect protocol error, will retry\n"); 1363 con->error_msg = "protocol error, garbage tag during connect"; 1364 return -1; 1365 } 1366 return 0; 1367 } 1368 1369 1370 /* 1371 * read (part of) an ack 1372 */ 1373 static int read_partial_ack(struct ceph_connection *con) 1374 { 1375 int to = 0; 1376 1377 return read_partial(con, &to, sizeof(con->in_temp_ack), 1378 &con->in_temp_ack); 1379 } 1380 1381 1382 /* 1383 * We can finally discard anything that's been acked. 1384 */ 1385 static void process_ack(struct ceph_connection *con) 1386 { 1387 struct ceph_msg *m; 1388 u64 ack = le64_to_cpu(con->in_temp_ack); 1389 u64 seq; 1390 1391 while (!list_empty(&con->out_sent)) { 1392 m = list_first_entry(&con->out_sent, struct ceph_msg, 1393 list_head); 1394 seq = le64_to_cpu(m->hdr.seq); 1395 if (seq > ack) 1396 break; 1397 dout("got ack for seq %llu type %d at %p\n", seq, 1398 le16_to_cpu(m->hdr.type), m); 1399 m->ack_stamp = jiffies; 1400 ceph_msg_remove(m); 1401 } 1402 prepare_read_tag(con); 1403 } 1404 1405 1406 1407 1408 static int read_partial_message_section(struct ceph_connection *con, 1409 struct kvec *section, 1410 unsigned int sec_len, u32 *crc) 1411 { 1412 int ret, left; 1413 1414 BUG_ON(!section); 1415 1416 while (section->iov_len < sec_len) { 1417 BUG_ON(section->iov_base == NULL); 1418 left = sec_len - section->iov_len; 1419 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 1420 section->iov_len, left); 1421 if (ret <= 0) 1422 return ret; 1423 section->iov_len += ret; 1424 if (section->iov_len == sec_len) 1425 *crc = crc32c(0, section->iov_base, 1426 section->iov_len); 1427 } 1428 1429 return 1; 1430 } 1431 1432 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1433 struct ceph_msg_header *hdr, 1434 int *skip); 1435 1436 1437 static int read_partial_message_pages(struct ceph_connection *con, 1438 struct page **pages, 1439 unsigned data_len, int datacrc) 1440 { 1441 void *p; 1442 int ret; 1443 int left; 1444 1445 left = min((int)(data_len - con->in_msg_pos.data_pos), 1446 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1447 /* (page) data */ 1448 BUG_ON(pages == NULL); 1449 p = kmap(pages[con->in_msg_pos.page]); 1450 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1451 left); 1452 if (ret > 0 && datacrc) 1453 con->in_data_crc = 1454 crc32c(con->in_data_crc, 1455 p + con->in_msg_pos.page_pos, ret); 1456 kunmap(pages[con->in_msg_pos.page]); 1457 if (ret <= 0) 1458 return ret; 1459 con->in_msg_pos.data_pos += ret; 1460 con->in_msg_pos.page_pos += ret; 1461 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1462 con->in_msg_pos.page_pos = 0; 1463 con->in_msg_pos.page++; 1464 } 1465 1466 return ret; 1467 } 1468 1469 #ifdef CONFIG_BLOCK 1470 static int read_partial_message_bio(struct ceph_connection *con, 1471 struct bio **bio_iter, int *bio_seg, 1472 unsigned data_len, int datacrc) 1473 { 1474 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg); 1475 void *p; 1476 int ret, left; 1477 1478 if (IS_ERR(bv)) 1479 return PTR_ERR(bv); 1480 1481 left = min((int)(data_len - con->in_msg_pos.data_pos), 1482 (int)(bv->bv_len - con->in_msg_pos.page_pos)); 1483 1484 p = kmap(bv->bv_page) + bv->bv_offset; 1485 1486 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1487 left); 1488 if (ret > 0 && datacrc) 1489 con->in_data_crc = 1490 crc32c(con->in_data_crc, 1491 p + con->in_msg_pos.page_pos, ret); 1492 kunmap(bv->bv_page); 1493 if (ret <= 0) 1494 return ret; 1495 con->in_msg_pos.data_pos += ret; 1496 con->in_msg_pos.page_pos += ret; 1497 if (con->in_msg_pos.page_pos == bv->bv_len) { 1498 con->in_msg_pos.page_pos = 0; 1499 iter_bio_next(bio_iter, bio_seg); 1500 } 1501 1502 return ret; 1503 } 1504 #endif 1505 1506 /* 1507 * read (part of) a message. 1508 */ 1509 static int read_partial_message(struct ceph_connection *con) 1510 { 1511 struct ceph_msg *m = con->in_msg; 1512 int ret; 1513 int to, left; 1514 unsigned front_len, middle_len, data_len; 1515 int datacrc = con->msgr->nocrc; 1516 int skip; 1517 u64 seq; 1518 1519 dout("read_partial_message con %p msg %p\n", con, m); 1520 1521 /* header */ 1522 while (con->in_base_pos < sizeof(con->in_hdr)) { 1523 left = sizeof(con->in_hdr) - con->in_base_pos; 1524 ret = ceph_tcp_recvmsg(con->sock, 1525 (char *)&con->in_hdr + con->in_base_pos, 1526 left); 1527 if (ret <= 0) 1528 return ret; 1529 con->in_base_pos += ret; 1530 if (con->in_base_pos == sizeof(con->in_hdr)) { 1531 u32 crc = crc32c(0, (void *)&con->in_hdr, 1532 sizeof(con->in_hdr) - sizeof(con->in_hdr.crc)); 1533 if (crc != le32_to_cpu(con->in_hdr.crc)) { 1534 pr_err("read_partial_message bad hdr " 1535 " crc %u != expected %u\n", 1536 crc, con->in_hdr.crc); 1537 return -EBADMSG; 1538 } 1539 } 1540 } 1541 front_len = le32_to_cpu(con->in_hdr.front_len); 1542 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1543 return -EIO; 1544 middle_len = le32_to_cpu(con->in_hdr.middle_len); 1545 if (middle_len > CEPH_MSG_MAX_DATA_LEN) 1546 return -EIO; 1547 data_len = le32_to_cpu(con->in_hdr.data_len); 1548 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1549 return -EIO; 1550 1551 /* verify seq# */ 1552 seq = le64_to_cpu(con->in_hdr.seq); 1553 if ((s64)seq - (s64)con->in_seq < 1) { 1554 pr_info("skipping %s%lld %s seq %lld expected %lld\n", 1555 ENTITY_NAME(con->peer_name), 1556 ceph_pr_addr(&con->peer_addr.in_addr), 1557 seq, con->in_seq + 1); 1558 con->in_base_pos = -front_len - middle_len - data_len - 1559 sizeof(m->footer); 1560 con->in_tag = CEPH_MSGR_TAG_READY; 1561 return 0; 1562 } else if ((s64)seq - (s64)con->in_seq > 1) { 1563 pr_err("read_partial_message bad seq %lld expected %lld\n", 1564 seq, con->in_seq + 1); 1565 con->error_msg = "bad message sequence # for incoming message"; 1566 return -EBADMSG; 1567 } 1568 1569 /* allocate message? */ 1570 if (!con->in_msg) { 1571 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1572 con->in_hdr.front_len, con->in_hdr.data_len); 1573 skip = 0; 1574 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1575 if (skip) { 1576 /* skip this message */ 1577 dout("alloc_msg said skip message\n"); 1578 BUG_ON(con->in_msg); 1579 con->in_base_pos = -front_len - middle_len - data_len - 1580 sizeof(m->footer); 1581 con->in_tag = CEPH_MSGR_TAG_READY; 1582 con->in_seq++; 1583 return 0; 1584 } 1585 if (!con->in_msg) { 1586 con->error_msg = 1587 "error allocating memory for incoming message"; 1588 return -ENOMEM; 1589 } 1590 m = con->in_msg; 1591 m->front.iov_len = 0; /* haven't read it yet */ 1592 if (m->middle) 1593 m->middle->vec.iov_len = 0; 1594 1595 con->in_msg_pos.page = 0; 1596 if (m->pages) 1597 con->in_msg_pos.page_pos = m->page_alignment; 1598 else 1599 con->in_msg_pos.page_pos = 0; 1600 con->in_msg_pos.data_pos = 0; 1601 } 1602 1603 /* front */ 1604 ret = read_partial_message_section(con, &m->front, front_len, 1605 &con->in_front_crc); 1606 if (ret <= 0) 1607 return ret; 1608 1609 /* middle */ 1610 if (m->middle) { 1611 ret = read_partial_message_section(con, &m->middle->vec, 1612 middle_len, 1613 &con->in_middle_crc); 1614 if (ret <= 0) 1615 return ret; 1616 } 1617 #ifdef CONFIG_BLOCK 1618 if (m->bio && !m->bio_iter) 1619 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); 1620 #endif 1621 1622 /* (page) data */ 1623 while (con->in_msg_pos.data_pos < data_len) { 1624 if (m->pages) { 1625 ret = read_partial_message_pages(con, m->pages, 1626 data_len, datacrc); 1627 if (ret <= 0) 1628 return ret; 1629 #ifdef CONFIG_BLOCK 1630 } else if (m->bio) { 1631 1632 ret = read_partial_message_bio(con, 1633 &m->bio_iter, &m->bio_seg, 1634 data_len, datacrc); 1635 if (ret <= 0) 1636 return ret; 1637 #endif 1638 } else { 1639 BUG_ON(1); 1640 } 1641 } 1642 1643 /* footer */ 1644 to = sizeof(m->hdr) + sizeof(m->footer); 1645 while (con->in_base_pos < to) { 1646 left = to - con->in_base_pos; 1647 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + 1648 (con->in_base_pos - sizeof(m->hdr)), 1649 left); 1650 if (ret <= 0) 1651 return ret; 1652 con->in_base_pos += ret; 1653 } 1654 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1655 m, front_len, m->footer.front_crc, middle_len, 1656 m->footer.middle_crc, data_len, m->footer.data_crc); 1657 1658 /* crc ok? */ 1659 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 1660 pr_err("read_partial_message %p front crc %u != exp. %u\n", 1661 m, con->in_front_crc, m->footer.front_crc); 1662 return -EBADMSG; 1663 } 1664 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 1665 pr_err("read_partial_message %p middle crc %u != exp %u\n", 1666 m, con->in_middle_crc, m->footer.middle_crc); 1667 return -EBADMSG; 1668 } 1669 if (datacrc && 1670 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 1671 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 1672 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 1673 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 1674 return -EBADMSG; 1675 } 1676 1677 return 1; /* done! */ 1678 } 1679 1680 /* 1681 * Process message. This happens in the worker thread. The callback should 1682 * be careful not to do anything that waits on other incoming messages or it 1683 * may deadlock. 1684 */ 1685 static void process_message(struct ceph_connection *con) 1686 { 1687 struct ceph_msg *msg; 1688 1689 msg = con->in_msg; 1690 con->in_msg = NULL; 1691 1692 /* if first message, set peer_name */ 1693 if (con->peer_name.type == 0) 1694 con->peer_name = msg->hdr.src; 1695 1696 con->in_seq++; 1697 mutex_unlock(&con->mutex); 1698 1699 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1700 msg, le64_to_cpu(msg->hdr.seq), 1701 ENTITY_NAME(msg->hdr.src), 1702 le16_to_cpu(msg->hdr.type), 1703 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1704 le32_to_cpu(msg->hdr.front_len), 1705 le32_to_cpu(msg->hdr.data_len), 1706 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 1707 con->ops->dispatch(con, msg); 1708 1709 mutex_lock(&con->mutex); 1710 prepare_read_tag(con); 1711 } 1712 1713 1714 /* 1715 * Write something to the socket. Called in a worker thread when the 1716 * socket appears to be writeable and we have something ready to send. 1717 */ 1718 static int try_write(struct ceph_connection *con) 1719 { 1720 struct ceph_messenger *msgr = con->msgr; 1721 int ret = 1; 1722 1723 dout("try_write start %p state %lu nref %d\n", con, con->state, 1724 atomic_read(&con->nref)); 1725 1726 more: 1727 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1728 1729 /* open the socket first? */ 1730 if (con->sock == NULL) { 1731 prepare_write_banner(msgr, con); 1732 prepare_write_connect(msgr, con, 1); 1733 prepare_read_banner(con); 1734 set_bit(CONNECTING, &con->state); 1735 clear_bit(NEGOTIATING, &con->state); 1736 1737 BUG_ON(con->in_msg); 1738 con->in_tag = CEPH_MSGR_TAG_READY; 1739 dout("try_write initiating connect on %p new state %lu\n", 1740 con, con->state); 1741 con->sock = ceph_tcp_connect(con); 1742 if (IS_ERR(con->sock)) { 1743 con->sock = NULL; 1744 con->error_msg = "connect error"; 1745 ret = -1; 1746 goto out; 1747 } 1748 } 1749 1750 more_kvec: 1751 /* kvec data queued? */ 1752 if (con->out_skip) { 1753 ret = write_partial_skip(con); 1754 if (ret <= 0) 1755 goto out; 1756 } 1757 if (con->out_kvec_left) { 1758 ret = write_partial_kvec(con); 1759 if (ret <= 0) 1760 goto out; 1761 } 1762 1763 /* msg pages? */ 1764 if (con->out_msg) { 1765 if (con->out_msg_done) { 1766 ceph_msg_put(con->out_msg); 1767 con->out_msg = NULL; /* we're done with this one */ 1768 goto do_next; 1769 } 1770 1771 ret = write_partial_msg_pages(con); 1772 if (ret == 1) 1773 goto more_kvec; /* we need to send the footer, too! */ 1774 if (ret == 0) 1775 goto out; 1776 if (ret < 0) { 1777 dout("try_write write_partial_msg_pages err %d\n", 1778 ret); 1779 goto out; 1780 } 1781 } 1782 1783 do_next: 1784 if (!test_bit(CONNECTING, &con->state)) { 1785 /* is anything else pending? */ 1786 if (!list_empty(&con->out_queue)) { 1787 prepare_write_message(con); 1788 goto more; 1789 } 1790 if (con->in_seq > con->in_seq_acked) { 1791 prepare_write_ack(con); 1792 goto more; 1793 } 1794 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { 1795 prepare_write_keepalive(con); 1796 goto more; 1797 } 1798 } 1799 1800 /* Nothing to do! */ 1801 clear_bit(WRITE_PENDING, &con->state); 1802 dout("try_write nothing else to write.\n"); 1803 ret = 0; 1804 out: 1805 dout("try_write done on %p ret %d\n", con, ret); 1806 return ret; 1807 } 1808 1809 1810 1811 /* 1812 * Read what we can from the socket. 1813 */ 1814 static int try_read(struct ceph_connection *con) 1815 { 1816 int ret = -1; 1817 1818 if (!con->sock) 1819 return 0; 1820 1821 if (test_bit(STANDBY, &con->state)) 1822 return 0; 1823 1824 dout("try_read start on %p\n", con); 1825 1826 more: 1827 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1828 con->in_base_pos); 1829 1830 /* 1831 * process_connect and process_message drop and re-take 1832 * con->mutex. make sure we handle a racing close or reopen. 1833 */ 1834 if (test_bit(CLOSED, &con->state) || 1835 test_bit(OPENING, &con->state)) { 1836 ret = -EAGAIN; 1837 goto out; 1838 } 1839 1840 if (test_bit(CONNECTING, &con->state)) { 1841 if (!test_bit(NEGOTIATING, &con->state)) { 1842 dout("try_read connecting\n"); 1843 ret = read_partial_banner(con); 1844 if (ret <= 0) 1845 goto out; 1846 ret = process_banner(con); 1847 if (ret < 0) 1848 goto out; 1849 } 1850 ret = read_partial_connect(con); 1851 if (ret <= 0) 1852 goto out; 1853 ret = process_connect(con); 1854 if (ret < 0) 1855 goto out; 1856 goto more; 1857 } 1858 1859 if (con->in_base_pos < 0) { 1860 /* 1861 * skipping + discarding content. 1862 * 1863 * FIXME: there must be a better way to do this! 1864 */ 1865 static char buf[1024]; 1866 int skip = min(1024, -con->in_base_pos); 1867 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); 1868 ret = ceph_tcp_recvmsg(con->sock, buf, skip); 1869 if (ret <= 0) 1870 goto out; 1871 con->in_base_pos += ret; 1872 if (con->in_base_pos) 1873 goto more; 1874 } 1875 if (con->in_tag == CEPH_MSGR_TAG_READY) { 1876 /* 1877 * what's next? 1878 */ 1879 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 1880 if (ret <= 0) 1881 goto out; 1882 dout("try_read got tag %d\n", (int)con->in_tag); 1883 switch (con->in_tag) { 1884 case CEPH_MSGR_TAG_MSG: 1885 prepare_read_message(con); 1886 break; 1887 case CEPH_MSGR_TAG_ACK: 1888 prepare_read_ack(con); 1889 break; 1890 case CEPH_MSGR_TAG_CLOSE: 1891 set_bit(CLOSED, &con->state); /* fixme */ 1892 goto out; 1893 default: 1894 goto bad_tag; 1895 } 1896 } 1897 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 1898 ret = read_partial_message(con); 1899 if (ret <= 0) { 1900 switch (ret) { 1901 case -EBADMSG: 1902 con->error_msg = "bad crc"; 1903 ret = -EIO; 1904 break; 1905 case -EIO: 1906 con->error_msg = "io error"; 1907 break; 1908 } 1909 goto out; 1910 } 1911 if (con->in_tag == CEPH_MSGR_TAG_READY) 1912 goto more; 1913 process_message(con); 1914 goto more; 1915 } 1916 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 1917 ret = read_partial_ack(con); 1918 if (ret <= 0) 1919 goto out; 1920 process_ack(con); 1921 goto more; 1922 } 1923 1924 out: 1925 dout("try_read done on %p ret %d\n", con, ret); 1926 return ret; 1927 1928 bad_tag: 1929 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 1930 con->error_msg = "protocol error, garbage tag"; 1931 ret = -1; 1932 goto out; 1933 } 1934 1935 1936 /* 1937 * Atomically queue work on a connection. Bump @con reference to 1938 * avoid races with connection teardown. 1939 */ 1940 static void queue_con(struct ceph_connection *con) 1941 { 1942 if (test_bit(DEAD, &con->state)) { 1943 dout("queue_con %p ignoring: DEAD\n", 1944 con); 1945 return; 1946 } 1947 1948 if (!con->ops->get(con)) { 1949 dout("queue_con %p ref count 0\n", con); 1950 return; 1951 } 1952 1953 if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { 1954 dout("queue_con %p - already queued\n", con); 1955 con->ops->put(con); 1956 } else { 1957 dout("queue_con %p\n", con); 1958 } 1959 } 1960 1961 /* 1962 * Do some work on a connection. Drop a connection ref when we're done. 1963 */ 1964 static void con_work(struct work_struct *work) 1965 { 1966 struct ceph_connection *con = container_of(work, struct ceph_connection, 1967 work.work); 1968 int ret; 1969 1970 mutex_lock(&con->mutex); 1971 restart: 1972 if (test_and_clear_bit(BACKOFF, &con->state)) { 1973 dout("con_work %p backing off\n", con); 1974 if (queue_delayed_work(ceph_msgr_wq, &con->work, 1975 round_jiffies_relative(con->delay))) { 1976 dout("con_work %p backoff %lu\n", con, con->delay); 1977 mutex_unlock(&con->mutex); 1978 return; 1979 } else { 1980 con->ops->put(con); 1981 dout("con_work %p FAILED to back off %lu\n", con, 1982 con->delay); 1983 } 1984 } 1985 1986 if (test_bit(STANDBY, &con->state)) { 1987 dout("con_work %p STANDBY\n", con); 1988 goto done; 1989 } 1990 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 1991 dout("con_work CLOSED\n"); 1992 con_close_socket(con); 1993 goto done; 1994 } 1995 if (test_and_clear_bit(OPENING, &con->state)) { 1996 /* reopen w/ new peer */ 1997 dout("con_work OPENING\n"); 1998 con_close_socket(con); 1999 } 2000 2001 if (test_and_clear_bit(SOCK_CLOSED, &con->state)) 2002 goto fault; 2003 2004 ret = try_read(con); 2005 if (ret == -EAGAIN) 2006 goto restart; 2007 if (ret < 0) 2008 goto fault; 2009 2010 ret = try_write(con); 2011 if (ret == -EAGAIN) 2012 goto restart; 2013 if (ret < 0) 2014 goto fault; 2015 2016 done: 2017 mutex_unlock(&con->mutex); 2018 done_unlocked: 2019 con->ops->put(con); 2020 return; 2021 2022 fault: 2023 mutex_unlock(&con->mutex); 2024 ceph_fault(con); /* error/fault path */ 2025 goto done_unlocked; 2026 } 2027 2028 2029 /* 2030 * Generic error/fault handler. A retry mechanism is used with 2031 * exponential backoff 2032 */ 2033 static void ceph_fault(struct ceph_connection *con) 2034 { 2035 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2036 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2037 dout("fault %p state %lu to peer %s\n", 2038 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2039 2040 if (test_bit(LOSSYTX, &con->state)) { 2041 dout("fault on LOSSYTX channel\n"); 2042 goto out; 2043 } 2044 2045 mutex_lock(&con->mutex); 2046 if (test_bit(CLOSED, &con->state)) 2047 goto out_unlock; 2048 2049 con_close_socket(con); 2050 2051 if (con->in_msg) { 2052 ceph_msg_put(con->in_msg); 2053 con->in_msg = NULL; 2054 } 2055 2056 /* Requeue anything that hasn't been acked */ 2057 list_splice_init(&con->out_sent, &con->out_queue); 2058 2059 /* If there are no messages queued or keepalive pending, place 2060 * the connection in a STANDBY state */ 2061 if (list_empty(&con->out_queue) && 2062 !test_bit(KEEPALIVE_PENDING, &con->state)) { 2063 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2064 clear_bit(WRITE_PENDING, &con->state); 2065 set_bit(STANDBY, &con->state); 2066 } else { 2067 /* retry after a delay. */ 2068 if (con->delay == 0) 2069 con->delay = BASE_DELAY_INTERVAL; 2070 else if (con->delay < MAX_DELAY_INTERVAL) 2071 con->delay *= 2; 2072 con->ops->get(con); 2073 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2074 round_jiffies_relative(con->delay))) { 2075 dout("fault queued %p delay %lu\n", con, con->delay); 2076 } else { 2077 con->ops->put(con); 2078 dout("fault failed to queue %p delay %lu, backoff\n", 2079 con, con->delay); 2080 /* 2081 * In many cases we see a socket state change 2082 * while con_work is running and end up 2083 * queuing (non-delayed) work, such that we 2084 * can't backoff with a delay. Set a flag so 2085 * that when con_work restarts we schedule the 2086 * delay then. 2087 */ 2088 set_bit(BACKOFF, &con->state); 2089 } 2090 } 2091 2092 out_unlock: 2093 mutex_unlock(&con->mutex); 2094 out: 2095 /* 2096 * in case we faulted due to authentication, invalidate our 2097 * current tickets so that we can get new ones. 2098 */ 2099 if (con->auth_retry && con->ops->invalidate_authorizer) { 2100 dout("calling invalidate_authorizer()\n"); 2101 con->ops->invalidate_authorizer(con); 2102 } 2103 2104 if (con->ops->fault) 2105 con->ops->fault(con); 2106 } 2107 2108 2109 2110 /* 2111 * create a new messenger instance 2112 */ 2113 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, 2114 u32 supported_features, 2115 u32 required_features) 2116 { 2117 struct ceph_messenger *msgr; 2118 2119 msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); 2120 if (msgr == NULL) 2121 return ERR_PTR(-ENOMEM); 2122 2123 msgr->supported_features = supported_features; 2124 msgr->required_features = required_features; 2125 2126 spin_lock_init(&msgr->global_seq_lock); 2127 2128 /* the zero page is needed if a request is "canceled" while the message 2129 * is being written over the socket */ 2130 msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO); 2131 if (!msgr->zero_page) { 2132 kfree(msgr); 2133 return ERR_PTR(-ENOMEM); 2134 } 2135 kmap(msgr->zero_page); 2136 2137 if (myaddr) 2138 msgr->inst.addr = *myaddr; 2139 2140 /* select a random nonce */ 2141 msgr->inst.addr.type = 0; 2142 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2143 encode_my_addr(msgr); 2144 2145 dout("messenger_create %p\n", msgr); 2146 return msgr; 2147 } 2148 EXPORT_SYMBOL(ceph_messenger_create); 2149 2150 void ceph_messenger_destroy(struct ceph_messenger *msgr) 2151 { 2152 dout("destroy %p\n", msgr); 2153 kunmap(msgr->zero_page); 2154 __free_page(msgr->zero_page); 2155 kfree(msgr); 2156 dout("destroyed messenger %p\n", msgr); 2157 } 2158 EXPORT_SYMBOL(ceph_messenger_destroy); 2159 2160 static void clear_standby(struct ceph_connection *con) 2161 { 2162 /* come back from STANDBY? */ 2163 if (test_and_clear_bit(STANDBY, &con->state)) { 2164 mutex_lock(&con->mutex); 2165 dout("clear_standby %p and ++connect_seq\n", con); 2166 con->connect_seq++; 2167 WARN_ON(test_bit(WRITE_PENDING, &con->state)); 2168 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); 2169 mutex_unlock(&con->mutex); 2170 } 2171 } 2172 2173 /* 2174 * Queue up an outgoing message on the given connection. 2175 */ 2176 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 2177 { 2178 if (test_bit(CLOSED, &con->state)) { 2179 dout("con_send %p closed, dropping %p\n", con, msg); 2180 ceph_msg_put(msg); 2181 return; 2182 } 2183 2184 /* set src+dst */ 2185 msg->hdr.src = con->msgr->inst.name; 2186 2187 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 2188 2189 msg->needs_out_seq = true; 2190 2191 /* queue */ 2192 mutex_lock(&con->mutex); 2193 BUG_ON(!list_empty(&msg->list_head)); 2194 list_add_tail(&msg->list_head, &con->out_queue); 2195 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 2196 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), 2197 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 2198 le32_to_cpu(msg->hdr.front_len), 2199 le32_to_cpu(msg->hdr.middle_len), 2200 le32_to_cpu(msg->hdr.data_len)); 2201 mutex_unlock(&con->mutex); 2202 2203 /* if there wasn't anything waiting to send before, queue 2204 * new work */ 2205 clear_standby(con); 2206 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2207 queue_con(con); 2208 } 2209 EXPORT_SYMBOL(ceph_con_send); 2210 2211 /* 2212 * Revoke a message that was previously queued for send 2213 */ 2214 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) 2215 { 2216 mutex_lock(&con->mutex); 2217 if (!list_empty(&msg->list_head)) { 2218 dout("con_revoke %p msg %p - was on queue\n", con, msg); 2219 list_del_init(&msg->list_head); 2220 ceph_msg_put(msg); 2221 msg->hdr.seq = 0; 2222 } 2223 if (con->out_msg == msg) { 2224 dout("con_revoke %p msg %p - was sending\n", con, msg); 2225 con->out_msg = NULL; 2226 if (con->out_kvec_is_msg) { 2227 con->out_skip = con->out_kvec_bytes; 2228 con->out_kvec_is_msg = false; 2229 } 2230 ceph_msg_put(msg); 2231 msg->hdr.seq = 0; 2232 } 2233 mutex_unlock(&con->mutex); 2234 } 2235 2236 /* 2237 * Revoke a message that we may be reading data into 2238 */ 2239 void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) 2240 { 2241 mutex_lock(&con->mutex); 2242 if (con->in_msg && con->in_msg == msg) { 2243 unsigned front_len = le32_to_cpu(con->in_hdr.front_len); 2244 unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len); 2245 unsigned data_len = le32_to_cpu(con->in_hdr.data_len); 2246 2247 /* skip rest of message */ 2248 dout("con_revoke_pages %p msg %p revoked\n", con, msg); 2249 con->in_base_pos = con->in_base_pos - 2250 sizeof(struct ceph_msg_header) - 2251 front_len - 2252 middle_len - 2253 data_len - 2254 sizeof(struct ceph_msg_footer); 2255 ceph_msg_put(con->in_msg); 2256 con->in_msg = NULL; 2257 con->in_tag = CEPH_MSGR_TAG_READY; 2258 con->in_seq++; 2259 } else { 2260 dout("con_revoke_pages %p msg %p pages %p no-op\n", 2261 con, con->in_msg, msg); 2262 } 2263 mutex_unlock(&con->mutex); 2264 } 2265 2266 /* 2267 * Queue a keepalive byte to ensure the tcp connection is alive. 2268 */ 2269 void ceph_con_keepalive(struct ceph_connection *con) 2270 { 2271 dout("con_keepalive %p\n", con); 2272 clear_standby(con); 2273 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2274 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2275 queue_con(con); 2276 } 2277 EXPORT_SYMBOL(ceph_con_keepalive); 2278 2279 2280 /* 2281 * construct a new message with given type, size 2282 * the new msg has a ref count of 1. 2283 */ 2284 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) 2285 { 2286 struct ceph_msg *m; 2287 2288 m = kmalloc(sizeof(*m), flags); 2289 if (m == NULL) 2290 goto out; 2291 kref_init(&m->kref); 2292 INIT_LIST_HEAD(&m->list_head); 2293 2294 m->hdr.tid = 0; 2295 m->hdr.type = cpu_to_le16(type); 2296 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 2297 m->hdr.version = 0; 2298 m->hdr.front_len = cpu_to_le32(front_len); 2299 m->hdr.middle_len = 0; 2300 m->hdr.data_len = 0; 2301 m->hdr.data_off = 0; 2302 m->hdr.reserved = 0; 2303 m->footer.front_crc = 0; 2304 m->footer.middle_crc = 0; 2305 m->footer.data_crc = 0; 2306 m->footer.flags = 0; 2307 m->front_max = front_len; 2308 m->front_is_vmalloc = false; 2309 m->more_to_follow = false; 2310 m->ack_stamp = 0; 2311 m->pool = NULL; 2312 2313 /* middle */ 2314 m->middle = NULL; 2315 2316 /* data */ 2317 m->nr_pages = 0; 2318 m->page_alignment = 0; 2319 m->pages = NULL; 2320 m->pagelist = NULL; 2321 m->bio = NULL; 2322 m->bio_iter = NULL; 2323 m->bio_seg = 0; 2324 m->trail = NULL; 2325 2326 /* front */ 2327 if (front_len) { 2328 if (front_len > PAGE_CACHE_SIZE) { 2329 m->front.iov_base = __vmalloc(front_len, flags, 2330 PAGE_KERNEL); 2331 m->front_is_vmalloc = true; 2332 } else { 2333 m->front.iov_base = kmalloc(front_len, flags); 2334 } 2335 if (m->front.iov_base == NULL) { 2336 pr_err("msg_new can't allocate %d bytes\n", 2337 front_len); 2338 goto out2; 2339 } 2340 } else { 2341 m->front.iov_base = NULL; 2342 } 2343 m->front.iov_len = front_len; 2344 2345 dout("ceph_msg_new %p front %d\n", m, front_len); 2346 return m; 2347 2348 out2: 2349 ceph_msg_put(m); 2350 out: 2351 pr_err("msg_new can't create type %d front %d\n", type, front_len); 2352 return NULL; 2353 } 2354 EXPORT_SYMBOL(ceph_msg_new); 2355 2356 /* 2357 * Allocate "middle" portion of a message, if it is needed and wasn't 2358 * allocated by alloc_msg. This allows us to read a small fixed-size 2359 * per-type header in the front and then gracefully fail (i.e., 2360 * propagate the error to the caller based on info in the front) when 2361 * the middle is too large. 2362 */ 2363 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 2364 { 2365 int type = le16_to_cpu(msg->hdr.type); 2366 int middle_len = le32_to_cpu(msg->hdr.middle_len); 2367 2368 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, 2369 ceph_msg_type_name(type), middle_len); 2370 BUG_ON(!middle_len); 2371 BUG_ON(msg->middle); 2372 2373 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS); 2374 if (!msg->middle) 2375 return -ENOMEM; 2376 return 0; 2377 } 2378 2379 /* 2380 * Generic message allocator, for incoming messages. 2381 */ 2382 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 2383 struct ceph_msg_header *hdr, 2384 int *skip) 2385 { 2386 int type = le16_to_cpu(hdr->type); 2387 int front_len = le32_to_cpu(hdr->front_len); 2388 int middle_len = le32_to_cpu(hdr->middle_len); 2389 struct ceph_msg *msg = NULL; 2390 int ret; 2391 2392 if (con->ops->alloc_msg) { 2393 mutex_unlock(&con->mutex); 2394 msg = con->ops->alloc_msg(con, hdr, skip); 2395 mutex_lock(&con->mutex); 2396 if (!msg || *skip) 2397 return NULL; 2398 } 2399 if (!msg) { 2400 *skip = 0; 2401 msg = ceph_msg_new(type, front_len, GFP_NOFS); 2402 if (!msg) { 2403 pr_err("unable to allocate msg type %d len %d\n", 2404 type, front_len); 2405 return NULL; 2406 } 2407 msg->page_alignment = le16_to_cpu(hdr->data_off); 2408 } 2409 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2410 2411 if (middle_len && !msg->middle) { 2412 ret = ceph_alloc_middle(con, msg); 2413 if (ret < 0) { 2414 ceph_msg_put(msg); 2415 return NULL; 2416 } 2417 } 2418 2419 return msg; 2420 } 2421 2422 2423 /* 2424 * Free a generically kmalloc'd message. 2425 */ 2426 void ceph_msg_kfree(struct ceph_msg *m) 2427 { 2428 dout("msg_kfree %p\n", m); 2429 if (m->front_is_vmalloc) 2430 vfree(m->front.iov_base); 2431 else 2432 kfree(m->front.iov_base); 2433 kfree(m); 2434 } 2435 2436 /* 2437 * Drop a msg ref. Destroy as needed. 2438 */ 2439 void ceph_msg_last_put(struct kref *kref) 2440 { 2441 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 2442 2443 dout("ceph_msg_put last one on %p\n", m); 2444 WARN_ON(!list_empty(&m->list_head)); 2445 2446 /* drop middle, data, if any */ 2447 if (m->middle) { 2448 ceph_buffer_put(m->middle); 2449 m->middle = NULL; 2450 } 2451 m->nr_pages = 0; 2452 m->pages = NULL; 2453 2454 if (m->pagelist) { 2455 ceph_pagelist_release(m->pagelist); 2456 kfree(m->pagelist); 2457 m->pagelist = NULL; 2458 } 2459 2460 m->trail = NULL; 2461 2462 if (m->pool) 2463 ceph_msgpool_put(m->pool, m); 2464 else 2465 ceph_msg_kfree(m); 2466 } 2467 EXPORT_SYMBOL(ceph_msg_last_put); 2468 2469 void ceph_msg_dump(struct ceph_msg *msg) 2470 { 2471 pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, 2472 msg->front_max, msg->nr_pages); 2473 print_hex_dump(KERN_DEBUG, "header: ", 2474 DUMP_PREFIX_OFFSET, 16, 1, 2475 &msg->hdr, sizeof(msg->hdr), true); 2476 print_hex_dump(KERN_DEBUG, " front: ", 2477 DUMP_PREFIX_OFFSET, 16, 1, 2478 msg->front.iov_base, msg->front.iov_len, true); 2479 if (msg->middle) 2480 print_hex_dump(KERN_DEBUG, "middle: ", 2481 DUMP_PREFIX_OFFSET, 16, 1, 2482 msg->middle->vec.iov_base, 2483 msg->middle->vec.iov_len, true); 2484 print_hex_dump(KERN_DEBUG, "footer: ", 2485 DUMP_PREFIX_OFFSET, 16, 1, 2486 &msg->footer, sizeof(msg->footer), true); 2487 } 2488 EXPORT_SYMBOL(ceph_msg_dump); 2489