1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/crc32c.h> 4 #include <linux/ctype.h> 5 #include <linux/highmem.h> 6 #include <linux/inet.h> 7 #include <linux/kthread.h> 8 #include <linux/net.h> 9 #include <linux/slab.h> 10 #include <linux/socket.h> 11 #include <linux/string.h> 12 #include <linux/bio.h> 13 #include <linux/blkdev.h> 14 #include <linux/dns_resolver.h> 15 #include <net/tcp.h> 16 17 #include <linux/ceph/libceph.h> 18 #include <linux/ceph/messenger.h> 19 #include <linux/ceph/decode.h> 20 #include <linux/ceph/pagelist.h> 21 #include <linux/export.h> 22 23 /* 24 * Ceph uses the messenger to exchange ceph_msg messages with other 25 * hosts in the system. The messenger provides ordered and reliable 26 * delivery. We tolerate TCP disconnects by reconnecting (with 27 * exponential backoff) in the case of a fault (disconnection, bad 28 * crc, protocol error). Acks allow sent messages to be discarded by 29 * the sender. 30 */ 31 32 /* static tag bytes (protocol control messages) */ 33 static char tag_msg = CEPH_MSGR_TAG_MSG; 34 static char tag_ack = CEPH_MSGR_TAG_ACK; 35 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 36 37 #ifdef CONFIG_LOCKDEP 38 static struct lock_class_key socket_class; 39 #endif 40 41 /* 42 * When skipping (ignoring) a block of input we read it into a "skip 43 * buffer," which is this many bytes in size. 44 */ 45 #define SKIP_BUF_SIZE 1024 46 47 static void queue_con(struct ceph_connection *con); 48 static void con_work(struct work_struct *); 49 static void ceph_fault(struct ceph_connection *con); 50 51 /* 52 * Nicely render a sockaddr as a string. An array of formatted 53 * strings is used, to approximate reentrancy. 54 */ 55 #define ADDR_STR_COUNT_LOG 5 /* log2(# address strings in array) */ 56 #define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG) 57 #define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1) 58 #define MAX_ADDR_STR_LEN 64 /* 54 is enough */ 59 60 static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN]; 61 static atomic_t addr_str_seq = ATOMIC_INIT(0); 62 63 static struct page *zero_page; /* used in certain error cases */ 64 65 const char *ceph_pr_addr(const struct sockaddr_storage *ss) 66 { 67 int i; 68 char *s; 69 struct sockaddr_in *in4 = (struct sockaddr_in *) ss; 70 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss; 71 72 i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK; 73 s = addr_str[i]; 74 75 switch (ss->ss_family) { 76 case AF_INET: 77 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr, 78 ntohs(in4->sin_port)); 79 break; 80 81 case AF_INET6: 82 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr, 83 ntohs(in6->sin6_port)); 84 break; 85 86 default: 87 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)", 88 ss->ss_family); 89 } 90 91 return s; 92 } 93 EXPORT_SYMBOL(ceph_pr_addr); 94 95 static void encode_my_addr(struct ceph_messenger *msgr) 96 { 97 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); 98 ceph_encode_addr(&msgr->my_enc_addr); 99 } 100 101 /* 102 * work queue for all reading and writing to/from the socket. 103 */ 104 static struct workqueue_struct *ceph_msgr_wq; 105 106 void _ceph_msgr_exit(void) 107 { 108 if (ceph_msgr_wq) { 109 destroy_workqueue(ceph_msgr_wq); 110 ceph_msgr_wq = NULL; 111 } 112 113 BUG_ON(zero_page == NULL); 114 kunmap(zero_page); 115 page_cache_release(zero_page); 116 zero_page = NULL; 117 } 118 119 int ceph_msgr_init(void) 120 { 121 BUG_ON(zero_page != NULL); 122 zero_page = ZERO_PAGE(0); 123 page_cache_get(zero_page); 124 125 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); 126 if (ceph_msgr_wq) 127 return 0; 128 129 pr_err("msgr_init failed to create workqueue\n"); 130 _ceph_msgr_exit(); 131 132 return -ENOMEM; 133 } 134 EXPORT_SYMBOL(ceph_msgr_init); 135 136 void ceph_msgr_exit(void) 137 { 138 BUG_ON(ceph_msgr_wq == NULL); 139 140 _ceph_msgr_exit(); 141 } 142 EXPORT_SYMBOL(ceph_msgr_exit); 143 144 void ceph_msgr_flush(void) 145 { 146 flush_workqueue(ceph_msgr_wq); 147 } 148 EXPORT_SYMBOL(ceph_msgr_flush); 149 150 151 /* 152 * socket callback functions 153 */ 154 155 /* data available on socket, or listen socket received a connect */ 156 static void ceph_data_ready(struct sock *sk, int count_unused) 157 { 158 struct ceph_connection *con = sk->sk_user_data; 159 160 if (sk->sk_state != TCP_CLOSE_WAIT) { 161 dout("ceph_data_ready on %p state = %lu, queueing work\n", 162 con, con->state); 163 queue_con(con); 164 } 165 } 166 167 /* socket has buffer space for writing */ 168 static void ceph_write_space(struct sock *sk) 169 { 170 struct ceph_connection *con = sk->sk_user_data; 171 172 /* only queue to workqueue if there is data we want to write, 173 * and there is sufficient space in the socket buffer to accept 174 * more data. clear SOCK_NOSPACE so that ceph_write_space() 175 * doesn't get called again until try_write() fills the socket 176 * buffer. See net/ipv4/tcp_input.c:tcp_check_space() 177 * and net/core/stream.c:sk_stream_write_space(). 178 */ 179 if (test_bit(WRITE_PENDING, &con->state)) { 180 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { 181 dout("ceph_write_space %p queueing write work\n", con); 182 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 183 queue_con(con); 184 } 185 } else { 186 dout("ceph_write_space %p nothing to write\n", con); 187 } 188 } 189 190 /* socket's state has changed */ 191 static void ceph_state_change(struct sock *sk) 192 { 193 struct ceph_connection *con = sk->sk_user_data; 194 195 dout("ceph_state_change %p state = %lu sk_state = %u\n", 196 con, con->state, sk->sk_state); 197 198 if (test_bit(CLOSED, &con->state)) 199 return; 200 201 switch (sk->sk_state) { 202 case TCP_CLOSE: 203 dout("ceph_state_change TCP_CLOSE\n"); 204 case TCP_CLOSE_WAIT: 205 dout("ceph_state_change TCP_CLOSE_WAIT\n"); 206 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { 207 if (test_bit(CONNECTING, &con->state)) 208 con->error_msg = "connection failed"; 209 else 210 con->error_msg = "socket closed"; 211 queue_con(con); 212 } 213 break; 214 case TCP_ESTABLISHED: 215 dout("ceph_state_change TCP_ESTABLISHED\n"); 216 queue_con(con); 217 break; 218 default: /* Everything else is uninteresting */ 219 break; 220 } 221 } 222 223 /* 224 * set up socket callbacks 225 */ 226 static void set_sock_callbacks(struct socket *sock, 227 struct ceph_connection *con) 228 { 229 struct sock *sk = sock->sk; 230 sk->sk_user_data = con; 231 sk->sk_data_ready = ceph_data_ready; 232 sk->sk_write_space = ceph_write_space; 233 sk->sk_state_change = ceph_state_change; 234 } 235 236 237 /* 238 * socket helpers 239 */ 240 241 /* 242 * initiate connection to a remote socket. 243 */ 244 static int ceph_tcp_connect(struct ceph_connection *con) 245 { 246 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; 247 struct socket *sock; 248 int ret; 249 250 BUG_ON(con->sock); 251 ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, 252 IPPROTO_TCP, &sock); 253 if (ret) 254 return ret; 255 sock->sk->sk_allocation = GFP_NOFS; 256 257 #ifdef CONFIG_LOCKDEP 258 lockdep_set_class(&sock->sk->sk_lock, &socket_class); 259 #endif 260 261 set_sock_callbacks(sock, con); 262 263 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); 264 265 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), 266 O_NONBLOCK); 267 if (ret == -EINPROGRESS) { 268 dout("connect %s EINPROGRESS sk_state = %u\n", 269 ceph_pr_addr(&con->peer_addr.in_addr), 270 sock->sk->sk_state); 271 } else if (ret < 0) { 272 pr_err("connect %s error %d\n", 273 ceph_pr_addr(&con->peer_addr.in_addr), ret); 274 sock_release(sock); 275 con->error_msg = "connect error"; 276 277 return ret; 278 } 279 con->sock = sock; 280 281 return 0; 282 } 283 284 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 285 { 286 struct kvec iov = {buf, len}; 287 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 288 int r; 289 290 r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); 291 if (r == -EAGAIN) 292 r = 0; 293 return r; 294 } 295 296 /* 297 * write something. @more is true if caller will be sending more data 298 * shortly. 299 */ 300 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 301 size_t kvlen, size_t len, int more) 302 { 303 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 304 int r; 305 306 if (more) 307 msg.msg_flags |= MSG_MORE; 308 else 309 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 310 311 r = kernel_sendmsg(sock, &msg, iov, kvlen, len); 312 if (r == -EAGAIN) 313 r = 0; 314 return r; 315 } 316 317 static int ceph_tcp_sendpage(struct socket *sock, struct page *page, 318 int offset, size_t size, int more) 319 { 320 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR); 321 int ret; 322 323 ret = kernel_sendpage(sock, page, offset, size, flags); 324 if (ret == -EAGAIN) 325 ret = 0; 326 327 return ret; 328 } 329 330 331 /* 332 * Shutdown/close the socket for the given connection. 333 */ 334 static int con_close_socket(struct ceph_connection *con) 335 { 336 int rc; 337 338 dout("con_close_socket on %p sock %p\n", con, con->sock); 339 if (!con->sock) 340 return 0; 341 set_bit(SOCK_CLOSED, &con->state); 342 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 343 sock_release(con->sock); 344 con->sock = NULL; 345 clear_bit(SOCK_CLOSED, &con->state); 346 return rc; 347 } 348 349 /* 350 * Reset a connection. Discard all incoming and outgoing messages 351 * and clear *_seq state. 352 */ 353 static void ceph_msg_remove(struct ceph_msg *msg) 354 { 355 list_del_init(&msg->list_head); 356 ceph_msg_put(msg); 357 } 358 static void ceph_msg_remove_list(struct list_head *head) 359 { 360 while (!list_empty(head)) { 361 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 362 list_head); 363 ceph_msg_remove(msg); 364 } 365 } 366 367 static void reset_connection(struct ceph_connection *con) 368 { 369 /* reset connection, out_queue, msg_ and connect_seq */ 370 /* discard existing out_queue and msg_seq */ 371 ceph_msg_remove_list(&con->out_queue); 372 ceph_msg_remove_list(&con->out_sent); 373 374 if (con->in_msg) { 375 ceph_msg_put(con->in_msg); 376 con->in_msg = NULL; 377 } 378 379 con->connect_seq = 0; 380 con->out_seq = 0; 381 if (con->out_msg) { 382 ceph_msg_put(con->out_msg); 383 con->out_msg = NULL; 384 } 385 con->in_seq = 0; 386 con->in_seq_acked = 0; 387 } 388 389 /* 390 * mark a peer down. drop any open connections. 391 */ 392 void ceph_con_close(struct ceph_connection *con) 393 { 394 dout("con_close %p peer %s\n", con, 395 ceph_pr_addr(&con->peer_addr.in_addr)); 396 set_bit(CLOSED, &con->state); /* in case there's queued work */ 397 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 398 clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ 399 clear_bit(KEEPALIVE_PENDING, &con->state); 400 clear_bit(WRITE_PENDING, &con->state); 401 mutex_lock(&con->mutex); 402 reset_connection(con); 403 con->peer_global_seq = 0; 404 cancel_delayed_work(&con->work); 405 mutex_unlock(&con->mutex); 406 queue_con(con); 407 } 408 EXPORT_SYMBOL(ceph_con_close); 409 410 /* 411 * Reopen a closed connection, with a new peer address. 412 */ 413 void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) 414 { 415 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 416 set_bit(OPENING, &con->state); 417 clear_bit(CLOSED, &con->state); 418 memcpy(&con->peer_addr, addr, sizeof(*addr)); 419 con->delay = 0; /* reset backoff memory */ 420 queue_con(con); 421 } 422 EXPORT_SYMBOL(ceph_con_open); 423 424 /* 425 * return true if this connection ever successfully opened 426 */ 427 bool ceph_con_opened(struct ceph_connection *con) 428 { 429 return con->connect_seq > 0; 430 } 431 432 /* 433 * generic get/put 434 */ 435 struct ceph_connection *ceph_con_get(struct ceph_connection *con) 436 { 437 int nref = __atomic_add_unless(&con->nref, 1, 0); 438 439 dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1); 440 441 return nref ? con : NULL; 442 } 443 444 void ceph_con_put(struct ceph_connection *con) 445 { 446 int nref = atomic_dec_return(&con->nref); 447 448 BUG_ON(nref < 0); 449 if (nref == 0) { 450 BUG_ON(con->sock); 451 kfree(con); 452 } 453 dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref); 454 } 455 456 /* 457 * initialize a new connection. 458 */ 459 void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) 460 { 461 dout("con_init %p\n", con); 462 memset(con, 0, sizeof(*con)); 463 atomic_set(&con->nref, 1); 464 con->msgr = msgr; 465 mutex_init(&con->mutex); 466 INIT_LIST_HEAD(&con->out_queue); 467 INIT_LIST_HEAD(&con->out_sent); 468 INIT_DELAYED_WORK(&con->work, con_work); 469 } 470 EXPORT_SYMBOL(ceph_con_init); 471 472 473 /* 474 * We maintain a global counter to order connection attempts. Get 475 * a unique seq greater than @gt. 476 */ 477 static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) 478 { 479 u32 ret; 480 481 spin_lock(&msgr->global_seq_lock); 482 if (msgr->global_seq < gt) 483 msgr->global_seq = gt; 484 ret = ++msgr->global_seq; 485 spin_unlock(&msgr->global_seq_lock); 486 return ret; 487 } 488 489 static void ceph_con_out_kvec_reset(struct ceph_connection *con) 490 { 491 con->out_kvec_left = 0; 492 con->out_kvec_bytes = 0; 493 con->out_kvec_cur = &con->out_kvec[0]; 494 } 495 496 static void ceph_con_out_kvec_add(struct ceph_connection *con, 497 size_t size, void *data) 498 { 499 int index; 500 501 index = con->out_kvec_left; 502 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); 503 504 con->out_kvec[index].iov_len = size; 505 con->out_kvec[index].iov_base = data; 506 con->out_kvec_left++; 507 con->out_kvec_bytes += size; 508 } 509 510 /* 511 * Prepare footer for currently outgoing message, and finish things 512 * off. Assumes out_kvec* are already valid.. we just add on to the end. 513 */ 514 static void prepare_write_message_footer(struct ceph_connection *con) 515 { 516 struct ceph_msg *m = con->out_msg; 517 int v = con->out_kvec_left; 518 519 dout("prepare_write_message_footer %p\n", con); 520 con->out_kvec_is_msg = true; 521 con->out_kvec[v].iov_base = &m->footer; 522 con->out_kvec[v].iov_len = sizeof(m->footer); 523 con->out_kvec_bytes += sizeof(m->footer); 524 con->out_kvec_left++; 525 con->out_more = m->more_to_follow; 526 con->out_msg_done = true; 527 } 528 529 /* 530 * Prepare headers for the next outgoing message. 531 */ 532 static void prepare_write_message(struct ceph_connection *con) 533 { 534 struct ceph_msg *m; 535 u32 crc; 536 537 ceph_con_out_kvec_reset(con); 538 con->out_kvec_is_msg = true; 539 con->out_msg_done = false; 540 541 /* Sneak an ack in there first? If we can get it into the same 542 * TCP packet that's a good thing. */ 543 if (con->in_seq > con->in_seq_acked) { 544 con->in_seq_acked = con->in_seq; 545 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 546 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 547 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), 548 &con->out_temp_ack); 549 } 550 551 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); 552 con->out_msg = m; 553 554 /* put message on sent list */ 555 ceph_msg_get(m); 556 list_move_tail(&m->list_head, &con->out_sent); 557 558 /* 559 * only assign outgoing seq # if we haven't sent this message 560 * yet. if it is requeued, resend with it's original seq. 561 */ 562 if (m->needs_out_seq) { 563 m->hdr.seq = cpu_to_le64(++con->out_seq); 564 m->needs_out_seq = false; 565 } 566 567 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 568 m, con->out_seq, le16_to_cpu(m->hdr.type), 569 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 570 le32_to_cpu(m->hdr.data_len), 571 m->nr_pages); 572 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 573 574 /* tag + hdr + front + middle */ 575 ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 576 ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); 577 ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 578 579 if (m->middle) 580 ceph_con_out_kvec_add(con, m->middle->vec.iov_len, 581 m->middle->vec.iov_base); 582 583 /* fill in crc (except data pages), footer */ 584 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 585 con->out_msg->hdr.crc = cpu_to_le32(crc); 586 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; 587 588 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 589 con->out_msg->footer.front_crc = cpu_to_le32(crc); 590 if (m->middle) { 591 crc = crc32c(0, m->middle->vec.iov_base, 592 m->middle->vec.iov_len); 593 con->out_msg->footer.middle_crc = cpu_to_le32(crc); 594 } else 595 con->out_msg->footer.middle_crc = 0; 596 con->out_msg->footer.data_crc = 0; 597 dout("prepare_write_message front_crc %u data_crc %u\n", 598 le32_to_cpu(con->out_msg->footer.front_crc), 599 le32_to_cpu(con->out_msg->footer.middle_crc)); 600 601 /* is there a data payload? */ 602 if (le32_to_cpu(m->hdr.data_len) > 0) { 603 /* initialize page iterator */ 604 con->out_msg_pos.page = 0; 605 if (m->pages) 606 con->out_msg_pos.page_pos = m->page_alignment; 607 else 608 con->out_msg_pos.page_pos = 0; 609 con->out_msg_pos.data_pos = 0; 610 con->out_msg_pos.did_page_crc = false; 611 con->out_more = 1; /* data + footer will follow */ 612 } else { 613 /* no, queue up footer too and be done */ 614 prepare_write_message_footer(con); 615 } 616 617 set_bit(WRITE_PENDING, &con->state); 618 } 619 620 /* 621 * Prepare an ack. 622 */ 623 static void prepare_write_ack(struct ceph_connection *con) 624 { 625 dout("prepare_write_ack %p %llu -> %llu\n", con, 626 con->in_seq_acked, con->in_seq); 627 con->in_seq_acked = con->in_seq; 628 629 ceph_con_out_kvec_reset(con); 630 631 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 632 633 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 634 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), 635 &con->out_temp_ack); 636 637 con->out_more = 1; /* more will follow.. eventually.. */ 638 set_bit(WRITE_PENDING, &con->state); 639 } 640 641 /* 642 * Prepare to write keepalive byte. 643 */ 644 static void prepare_write_keepalive(struct ceph_connection *con) 645 { 646 dout("prepare_write_keepalive %p\n", con); 647 ceph_con_out_kvec_reset(con); 648 ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); 649 set_bit(WRITE_PENDING, &con->state); 650 } 651 652 /* 653 * Connection negotiation. 654 */ 655 656 static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con, 657 int *auth_proto) 658 { 659 struct ceph_auth_handshake *auth; 660 661 if (!con->ops->get_authorizer) { 662 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 663 con->out_connect.authorizer_len = 0; 664 665 return NULL; 666 } 667 668 /* Can't hold the mutex while getting authorizer */ 669 670 mutex_unlock(&con->mutex); 671 672 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); 673 674 mutex_lock(&con->mutex); 675 676 if (IS_ERR(auth)) 677 return auth; 678 if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) 679 return ERR_PTR(-EAGAIN); 680 681 con->auth_reply_buf = auth->authorizer_reply_buf; 682 con->auth_reply_buf_len = auth->authorizer_reply_buf_len; 683 684 685 return auth; 686 } 687 688 /* 689 * We connected to a peer and are saying hello. 690 */ 691 static void prepare_write_banner(struct ceph_connection *con) 692 { 693 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 694 ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), 695 &con->msgr->my_enc_addr); 696 697 con->out_more = 0; 698 set_bit(WRITE_PENDING, &con->state); 699 } 700 701 static int prepare_write_connect(struct ceph_connection *con) 702 { 703 unsigned int global_seq = get_global_seq(con->msgr, 0); 704 int proto; 705 int auth_proto; 706 struct ceph_auth_handshake *auth; 707 708 switch (con->peer_name.type) { 709 case CEPH_ENTITY_TYPE_MON: 710 proto = CEPH_MONC_PROTOCOL; 711 break; 712 case CEPH_ENTITY_TYPE_OSD: 713 proto = CEPH_OSDC_PROTOCOL; 714 break; 715 case CEPH_ENTITY_TYPE_MDS: 716 proto = CEPH_MDSC_PROTOCOL; 717 break; 718 default: 719 BUG(); 720 } 721 722 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 723 con->connect_seq, global_seq, proto); 724 725 con->out_connect.features = cpu_to_le64(con->msgr->supported_features); 726 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 727 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 728 con->out_connect.global_seq = cpu_to_le32(global_seq); 729 con->out_connect.protocol_version = cpu_to_le32(proto); 730 con->out_connect.flags = 0; 731 732 auth_proto = CEPH_AUTH_UNKNOWN; 733 auth = get_connect_authorizer(con, &auth_proto); 734 if (IS_ERR(auth)) 735 return PTR_ERR(auth); 736 737 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); 738 con->out_connect.authorizer_len = auth ? 739 cpu_to_le32(auth->authorizer_buf_len) : 0; 740 741 ceph_con_out_kvec_add(con, sizeof (con->out_connect), 742 &con->out_connect); 743 if (auth && auth->authorizer_buf_len) 744 ceph_con_out_kvec_add(con, auth->authorizer_buf_len, 745 auth->authorizer_buf); 746 747 con->out_more = 0; 748 set_bit(WRITE_PENDING, &con->state); 749 750 return 0; 751 } 752 753 /* 754 * write as much of pending kvecs to the socket as we can. 755 * 1 -> done 756 * 0 -> socket full, but more to do 757 * <0 -> error 758 */ 759 static int write_partial_kvec(struct ceph_connection *con) 760 { 761 int ret; 762 763 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 764 while (con->out_kvec_bytes > 0) { 765 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 766 con->out_kvec_left, con->out_kvec_bytes, 767 con->out_more); 768 if (ret <= 0) 769 goto out; 770 con->out_kvec_bytes -= ret; 771 if (con->out_kvec_bytes == 0) 772 break; /* done */ 773 774 /* account for full iov entries consumed */ 775 while (ret >= con->out_kvec_cur->iov_len) { 776 BUG_ON(!con->out_kvec_left); 777 ret -= con->out_kvec_cur->iov_len; 778 con->out_kvec_cur++; 779 con->out_kvec_left--; 780 } 781 /* and for a partially-consumed entry */ 782 if (ret) { 783 con->out_kvec_cur->iov_len -= ret; 784 con->out_kvec_cur->iov_base += ret; 785 } 786 } 787 con->out_kvec_left = 0; 788 con->out_kvec_is_msg = false; 789 ret = 1; 790 out: 791 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 792 con->out_kvec_bytes, con->out_kvec_left, ret); 793 return ret; /* done! */ 794 } 795 796 #ifdef CONFIG_BLOCK 797 static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) 798 { 799 if (!bio) { 800 *iter = NULL; 801 *seg = 0; 802 return; 803 } 804 *iter = bio; 805 *seg = bio->bi_idx; 806 } 807 808 static void iter_bio_next(struct bio **bio_iter, int *seg) 809 { 810 if (*bio_iter == NULL) 811 return; 812 813 BUG_ON(*seg >= (*bio_iter)->bi_vcnt); 814 815 (*seg)++; 816 if (*seg == (*bio_iter)->bi_vcnt) 817 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); 818 } 819 #endif 820 821 /* 822 * Write as much message data payload as we can. If we finish, queue 823 * up the footer. 824 * 1 -> done, footer is now queued in out_kvec[]. 825 * 0 -> socket full, but more to do 826 * <0 -> error 827 */ 828 static int write_partial_msg_pages(struct ceph_connection *con) 829 { 830 struct ceph_msg *msg = con->out_msg; 831 unsigned int data_len = le32_to_cpu(msg->hdr.data_len); 832 size_t len; 833 bool do_datacrc = !con->msgr->nocrc; 834 int ret; 835 int total_max_write; 836 int in_trail = 0; 837 size_t trail_len = (msg->trail ? msg->trail->length : 0); 838 839 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 840 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 841 con->out_msg_pos.page_pos); 842 843 #ifdef CONFIG_BLOCK 844 if (msg->bio && !msg->bio_iter) 845 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); 846 #endif 847 848 while (data_len > con->out_msg_pos.data_pos) { 849 struct page *page = NULL; 850 int max_write = PAGE_SIZE; 851 int bio_offset = 0; 852 853 total_max_write = data_len - trail_len - 854 con->out_msg_pos.data_pos; 855 856 /* 857 * if we are calculating the data crc (the default), we need 858 * to map the page. if our pages[] has been revoked, use the 859 * zero page. 860 */ 861 862 /* have we reached the trail part of the data? */ 863 if (con->out_msg_pos.data_pos >= data_len - trail_len) { 864 in_trail = 1; 865 866 total_max_write = data_len - con->out_msg_pos.data_pos; 867 868 page = list_first_entry(&msg->trail->head, 869 struct page, lru); 870 max_write = PAGE_SIZE; 871 } else if (msg->pages) { 872 page = msg->pages[con->out_msg_pos.page]; 873 } else if (msg->pagelist) { 874 page = list_first_entry(&msg->pagelist->head, 875 struct page, lru); 876 #ifdef CONFIG_BLOCK 877 } else if (msg->bio) { 878 struct bio_vec *bv; 879 880 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); 881 page = bv->bv_page; 882 bio_offset = bv->bv_offset; 883 max_write = bv->bv_len; 884 #endif 885 } else { 886 page = zero_page; 887 } 888 len = min_t(int, max_write - con->out_msg_pos.page_pos, 889 total_max_write); 890 891 if (do_datacrc && !con->out_msg_pos.did_page_crc) { 892 void *base; 893 u32 crc; 894 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 895 char *kaddr; 896 897 kaddr = kmap(page); 898 BUG_ON(kaddr == NULL); 899 base = kaddr + con->out_msg_pos.page_pos + bio_offset; 900 crc = crc32c(tmpcrc, base, len); 901 con->out_msg->footer.data_crc = cpu_to_le32(crc); 902 con->out_msg_pos.did_page_crc = true; 903 } 904 ret = ceph_tcp_sendpage(con->sock, page, 905 con->out_msg_pos.page_pos + bio_offset, 906 len, 1); 907 908 if (do_datacrc) 909 kunmap(page); 910 911 if (ret <= 0) 912 goto out; 913 914 con->out_msg_pos.data_pos += ret; 915 con->out_msg_pos.page_pos += ret; 916 if (ret == len) { 917 con->out_msg_pos.page_pos = 0; 918 con->out_msg_pos.page++; 919 con->out_msg_pos.did_page_crc = false; 920 if (in_trail) 921 list_move_tail(&page->lru, 922 &msg->trail->head); 923 else if (msg->pagelist) 924 list_move_tail(&page->lru, 925 &msg->pagelist->head); 926 #ifdef CONFIG_BLOCK 927 else if (msg->bio) 928 iter_bio_next(&msg->bio_iter, &msg->bio_seg); 929 #endif 930 } 931 } 932 933 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 934 935 /* prepare and queue up footer, too */ 936 if (!do_datacrc) 937 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 938 ceph_con_out_kvec_reset(con); 939 prepare_write_message_footer(con); 940 ret = 1; 941 out: 942 return ret; 943 } 944 945 /* 946 * write some zeros 947 */ 948 static int write_partial_skip(struct ceph_connection *con) 949 { 950 int ret; 951 952 while (con->out_skip > 0) { 953 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); 954 955 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1); 956 if (ret <= 0) 957 goto out; 958 con->out_skip -= ret; 959 } 960 ret = 1; 961 out: 962 return ret; 963 } 964 965 /* 966 * Prepare to read connection handshake, or an ack. 967 */ 968 static void prepare_read_banner(struct ceph_connection *con) 969 { 970 dout("prepare_read_banner %p\n", con); 971 con->in_base_pos = 0; 972 } 973 974 static void prepare_read_connect(struct ceph_connection *con) 975 { 976 dout("prepare_read_connect %p\n", con); 977 con->in_base_pos = 0; 978 } 979 980 static void prepare_read_ack(struct ceph_connection *con) 981 { 982 dout("prepare_read_ack %p\n", con); 983 con->in_base_pos = 0; 984 } 985 986 static void prepare_read_tag(struct ceph_connection *con) 987 { 988 dout("prepare_read_tag %p\n", con); 989 con->in_base_pos = 0; 990 con->in_tag = CEPH_MSGR_TAG_READY; 991 } 992 993 /* 994 * Prepare to read a message. 995 */ 996 static int prepare_read_message(struct ceph_connection *con) 997 { 998 dout("prepare_read_message %p\n", con); 999 BUG_ON(con->in_msg != NULL); 1000 con->in_base_pos = 0; 1001 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 1002 return 0; 1003 } 1004 1005 1006 static int read_partial(struct ceph_connection *con, 1007 int end, int size, void *object) 1008 { 1009 while (con->in_base_pos < end) { 1010 int left = end - con->in_base_pos; 1011 int have = size - left; 1012 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 1013 if (ret <= 0) 1014 return ret; 1015 con->in_base_pos += ret; 1016 } 1017 return 1; 1018 } 1019 1020 1021 /* 1022 * Read all or part of the connect-side handshake on a new connection 1023 */ 1024 static int read_partial_banner(struct ceph_connection *con) 1025 { 1026 int size; 1027 int end; 1028 int ret; 1029 1030 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 1031 1032 /* peer's banner */ 1033 size = strlen(CEPH_BANNER); 1034 end = size; 1035 ret = read_partial(con, end, size, con->in_banner); 1036 if (ret <= 0) 1037 goto out; 1038 1039 size = sizeof (con->actual_peer_addr); 1040 end += size; 1041 ret = read_partial(con, end, size, &con->actual_peer_addr); 1042 if (ret <= 0) 1043 goto out; 1044 1045 size = sizeof (con->peer_addr_for_me); 1046 end += size; 1047 ret = read_partial(con, end, size, &con->peer_addr_for_me); 1048 if (ret <= 0) 1049 goto out; 1050 1051 out: 1052 return ret; 1053 } 1054 1055 static int read_partial_connect(struct ceph_connection *con) 1056 { 1057 int size; 1058 int end; 1059 int ret; 1060 1061 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1062 1063 size = sizeof (con->in_reply); 1064 end = size; 1065 ret = read_partial(con, end, size, &con->in_reply); 1066 if (ret <= 0) 1067 goto out; 1068 1069 size = le32_to_cpu(con->in_reply.authorizer_len); 1070 end += size; 1071 ret = read_partial(con, end, size, con->auth_reply_buf); 1072 if (ret <= 0) 1073 goto out; 1074 1075 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 1076 con, (int)con->in_reply.tag, 1077 le32_to_cpu(con->in_reply.connect_seq), 1078 le32_to_cpu(con->in_reply.global_seq)); 1079 out: 1080 return ret; 1081 1082 } 1083 1084 /* 1085 * Verify the hello banner looks okay. 1086 */ 1087 static int verify_hello(struct ceph_connection *con) 1088 { 1089 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 1090 pr_err("connect to %s got bad banner\n", 1091 ceph_pr_addr(&con->peer_addr.in_addr)); 1092 con->error_msg = "protocol error, bad banner"; 1093 return -1; 1094 } 1095 return 0; 1096 } 1097 1098 static bool addr_is_blank(struct sockaddr_storage *ss) 1099 { 1100 switch (ss->ss_family) { 1101 case AF_INET: 1102 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; 1103 case AF_INET6: 1104 return 1105 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && 1106 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && 1107 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && 1108 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; 1109 } 1110 return false; 1111 } 1112 1113 static int addr_port(struct sockaddr_storage *ss) 1114 { 1115 switch (ss->ss_family) { 1116 case AF_INET: 1117 return ntohs(((struct sockaddr_in *)ss)->sin_port); 1118 case AF_INET6: 1119 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port); 1120 } 1121 return 0; 1122 } 1123 1124 static void addr_set_port(struct sockaddr_storage *ss, int p) 1125 { 1126 switch (ss->ss_family) { 1127 case AF_INET: 1128 ((struct sockaddr_in *)ss)->sin_port = htons(p); 1129 break; 1130 case AF_INET6: 1131 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p); 1132 break; 1133 } 1134 } 1135 1136 /* 1137 * Unlike other *_pton function semantics, zero indicates success. 1138 */ 1139 static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss, 1140 char delim, const char **ipend) 1141 { 1142 struct sockaddr_in *in4 = (struct sockaddr_in *) ss; 1143 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss; 1144 1145 memset(ss, 0, sizeof(*ss)); 1146 1147 if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) { 1148 ss->ss_family = AF_INET; 1149 return 0; 1150 } 1151 1152 if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) { 1153 ss->ss_family = AF_INET6; 1154 return 0; 1155 } 1156 1157 return -EINVAL; 1158 } 1159 1160 /* 1161 * Extract hostname string and resolve using kernel DNS facility. 1162 */ 1163 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER 1164 static int ceph_dns_resolve_name(const char *name, size_t namelen, 1165 struct sockaddr_storage *ss, char delim, const char **ipend) 1166 { 1167 const char *end, *delim_p; 1168 char *colon_p, *ip_addr = NULL; 1169 int ip_len, ret; 1170 1171 /* 1172 * The end of the hostname occurs immediately preceding the delimiter or 1173 * the port marker (':') where the delimiter takes precedence. 1174 */ 1175 delim_p = memchr(name, delim, namelen); 1176 colon_p = memchr(name, ':', namelen); 1177 1178 if (delim_p && colon_p) 1179 end = delim_p < colon_p ? delim_p : colon_p; 1180 else if (!delim_p && colon_p) 1181 end = colon_p; 1182 else { 1183 end = delim_p; 1184 if (!end) /* case: hostname:/ */ 1185 end = name + namelen; 1186 } 1187 1188 if (end <= name) 1189 return -EINVAL; 1190 1191 /* do dns_resolve upcall */ 1192 ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL); 1193 if (ip_len > 0) 1194 ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL); 1195 else 1196 ret = -ESRCH; 1197 1198 kfree(ip_addr); 1199 1200 *ipend = end; 1201 1202 pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name, 1203 ret, ret ? "failed" : ceph_pr_addr(ss)); 1204 1205 return ret; 1206 } 1207 #else 1208 static inline int ceph_dns_resolve_name(const char *name, size_t namelen, 1209 struct sockaddr_storage *ss, char delim, const char **ipend) 1210 { 1211 return -EINVAL; 1212 } 1213 #endif 1214 1215 /* 1216 * Parse a server name (IP or hostname). If a valid IP address is not found 1217 * then try to extract a hostname to resolve using userspace DNS upcall. 1218 */ 1219 static int ceph_parse_server_name(const char *name, size_t namelen, 1220 struct sockaddr_storage *ss, char delim, const char **ipend) 1221 { 1222 int ret; 1223 1224 ret = ceph_pton(name, namelen, ss, delim, ipend); 1225 if (ret) 1226 ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend); 1227 1228 return ret; 1229 } 1230 1231 /* 1232 * Parse an ip[:port] list into an addr array. Use the default 1233 * monitor port if a port isn't specified. 1234 */ 1235 int ceph_parse_ips(const char *c, const char *end, 1236 struct ceph_entity_addr *addr, 1237 int max_count, int *count) 1238 { 1239 int i, ret = -EINVAL; 1240 const char *p = c; 1241 1242 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 1243 for (i = 0; i < max_count; i++) { 1244 const char *ipend; 1245 struct sockaddr_storage *ss = &addr[i].in_addr; 1246 int port; 1247 char delim = ','; 1248 1249 if (*p == '[') { 1250 delim = ']'; 1251 p++; 1252 } 1253 1254 ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend); 1255 if (ret) 1256 goto bad; 1257 ret = -EINVAL; 1258 1259 p = ipend; 1260 1261 if (delim == ']') { 1262 if (*p != ']') { 1263 dout("missing matching ']'\n"); 1264 goto bad; 1265 } 1266 p++; 1267 } 1268 1269 /* port? */ 1270 if (p < end && *p == ':') { 1271 port = 0; 1272 p++; 1273 while (p < end && *p >= '0' && *p <= '9') { 1274 port = (port * 10) + (*p - '0'); 1275 p++; 1276 } 1277 if (port > 65535 || port == 0) 1278 goto bad; 1279 } else { 1280 port = CEPH_MON_PORT; 1281 } 1282 1283 addr_set_port(ss, port); 1284 1285 dout("parse_ips got %s\n", ceph_pr_addr(ss)); 1286 1287 if (p == end) 1288 break; 1289 if (*p != ',') 1290 goto bad; 1291 p++; 1292 } 1293 1294 if (p != end) 1295 goto bad; 1296 1297 if (count) 1298 *count = i + 1; 1299 return 0; 1300 1301 bad: 1302 pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); 1303 return ret; 1304 } 1305 EXPORT_SYMBOL(ceph_parse_ips); 1306 1307 static int process_banner(struct ceph_connection *con) 1308 { 1309 dout("process_banner on %p\n", con); 1310 1311 if (verify_hello(con) < 0) 1312 return -1; 1313 1314 ceph_decode_addr(&con->actual_peer_addr); 1315 ceph_decode_addr(&con->peer_addr_for_me); 1316 1317 /* 1318 * Make sure the other end is who we wanted. note that the other 1319 * end may not yet know their ip address, so if it's 0.0.0.0, give 1320 * them the benefit of the doubt. 1321 */ 1322 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 1323 sizeof(con->peer_addr)) != 0 && 1324 !(addr_is_blank(&con->actual_peer_addr.in_addr) && 1325 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 1326 pr_warning("wrong peer, want %s/%d, got %s/%d\n", 1327 ceph_pr_addr(&con->peer_addr.in_addr), 1328 (int)le32_to_cpu(con->peer_addr.nonce), 1329 ceph_pr_addr(&con->actual_peer_addr.in_addr), 1330 (int)le32_to_cpu(con->actual_peer_addr.nonce)); 1331 con->error_msg = "wrong peer at address"; 1332 return -1; 1333 } 1334 1335 /* 1336 * did we learn our address? 1337 */ 1338 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { 1339 int port = addr_port(&con->msgr->inst.addr.in_addr); 1340 1341 memcpy(&con->msgr->inst.addr.in_addr, 1342 &con->peer_addr_for_me.in_addr, 1343 sizeof(con->peer_addr_for_me.in_addr)); 1344 addr_set_port(&con->msgr->inst.addr.in_addr, port); 1345 encode_my_addr(con->msgr); 1346 dout("process_banner learned my addr is %s\n", 1347 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); 1348 } 1349 1350 set_bit(NEGOTIATING, &con->state); 1351 prepare_read_connect(con); 1352 return 0; 1353 } 1354 1355 static void fail_protocol(struct ceph_connection *con) 1356 { 1357 reset_connection(con); 1358 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1359 1360 mutex_unlock(&con->mutex); 1361 if (con->ops->bad_proto) 1362 con->ops->bad_proto(con); 1363 mutex_lock(&con->mutex); 1364 } 1365 1366 static int process_connect(struct ceph_connection *con) 1367 { 1368 u64 sup_feat = con->msgr->supported_features; 1369 u64 req_feat = con->msgr->required_features; 1370 u64 server_feat = le64_to_cpu(con->in_reply.features); 1371 int ret; 1372 1373 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1374 1375 switch (con->in_reply.tag) { 1376 case CEPH_MSGR_TAG_FEATURES: 1377 pr_err("%s%lld %s feature set mismatch," 1378 " my %llx < server's %llx, missing %llx\n", 1379 ENTITY_NAME(con->peer_name), 1380 ceph_pr_addr(&con->peer_addr.in_addr), 1381 sup_feat, server_feat, server_feat & ~sup_feat); 1382 con->error_msg = "missing required protocol features"; 1383 fail_protocol(con); 1384 return -1; 1385 1386 case CEPH_MSGR_TAG_BADPROTOVER: 1387 pr_err("%s%lld %s protocol version mismatch," 1388 " my %d != server's %d\n", 1389 ENTITY_NAME(con->peer_name), 1390 ceph_pr_addr(&con->peer_addr.in_addr), 1391 le32_to_cpu(con->out_connect.protocol_version), 1392 le32_to_cpu(con->in_reply.protocol_version)); 1393 con->error_msg = "protocol version mismatch"; 1394 fail_protocol(con); 1395 return -1; 1396 1397 case CEPH_MSGR_TAG_BADAUTHORIZER: 1398 con->auth_retry++; 1399 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 1400 con->auth_retry); 1401 if (con->auth_retry == 2) { 1402 con->error_msg = "connect authorization failure"; 1403 return -1; 1404 } 1405 con->auth_retry = 1; 1406 ceph_con_out_kvec_reset(con); 1407 ret = prepare_write_connect(con); 1408 if (ret < 0) 1409 return ret; 1410 prepare_read_connect(con); 1411 break; 1412 1413 case CEPH_MSGR_TAG_RESETSESSION: 1414 /* 1415 * If we connected with a large connect_seq but the peer 1416 * has no record of a session with us (no connection, or 1417 * connect_seq == 0), they will send RESETSESION to indicate 1418 * that they must have reset their session, and may have 1419 * dropped messages. 1420 */ 1421 dout("process_connect got RESET peer seq %u\n", 1422 le32_to_cpu(con->in_connect.connect_seq)); 1423 pr_err("%s%lld %s connection reset\n", 1424 ENTITY_NAME(con->peer_name), 1425 ceph_pr_addr(&con->peer_addr.in_addr)); 1426 reset_connection(con); 1427 ceph_con_out_kvec_reset(con); 1428 ret = prepare_write_connect(con); 1429 if (ret < 0) 1430 return ret; 1431 prepare_read_connect(con); 1432 1433 /* Tell ceph about it. */ 1434 mutex_unlock(&con->mutex); 1435 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); 1436 if (con->ops->peer_reset) 1437 con->ops->peer_reset(con); 1438 mutex_lock(&con->mutex); 1439 if (test_bit(CLOSED, &con->state) || 1440 test_bit(OPENING, &con->state)) 1441 return -EAGAIN; 1442 break; 1443 1444 case CEPH_MSGR_TAG_RETRY_SESSION: 1445 /* 1446 * If we sent a smaller connect_seq than the peer has, try 1447 * again with a larger value. 1448 */ 1449 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n", 1450 le32_to_cpu(con->out_connect.connect_seq), 1451 le32_to_cpu(con->in_connect.connect_seq)); 1452 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1453 ceph_con_out_kvec_reset(con); 1454 ret = prepare_write_connect(con); 1455 if (ret < 0) 1456 return ret; 1457 prepare_read_connect(con); 1458 break; 1459 1460 case CEPH_MSGR_TAG_RETRY_GLOBAL: 1461 /* 1462 * If we sent a smaller global_seq than the peer has, try 1463 * again with a larger value. 1464 */ 1465 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 1466 con->peer_global_seq, 1467 le32_to_cpu(con->in_connect.global_seq)); 1468 get_global_seq(con->msgr, 1469 le32_to_cpu(con->in_connect.global_seq)); 1470 ceph_con_out_kvec_reset(con); 1471 ret = prepare_write_connect(con); 1472 if (ret < 0) 1473 return ret; 1474 prepare_read_connect(con); 1475 break; 1476 1477 case CEPH_MSGR_TAG_READY: 1478 if (req_feat & ~server_feat) { 1479 pr_err("%s%lld %s protocol feature mismatch," 1480 " my required %llx > server's %llx, need %llx\n", 1481 ENTITY_NAME(con->peer_name), 1482 ceph_pr_addr(&con->peer_addr.in_addr), 1483 req_feat, server_feat, req_feat & ~server_feat); 1484 con->error_msg = "missing required protocol features"; 1485 fail_protocol(con); 1486 return -1; 1487 } 1488 clear_bit(CONNECTING, &con->state); 1489 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1490 con->connect_seq++; 1491 con->peer_features = server_feat; 1492 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1493 con->peer_global_seq, 1494 le32_to_cpu(con->in_reply.connect_seq), 1495 con->connect_seq); 1496 WARN_ON(con->connect_seq != 1497 le32_to_cpu(con->in_reply.connect_seq)); 1498 1499 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1500 set_bit(LOSSYTX, &con->state); 1501 1502 prepare_read_tag(con); 1503 break; 1504 1505 case CEPH_MSGR_TAG_WAIT: 1506 /* 1507 * If there is a connection race (we are opening 1508 * connections to each other), one of us may just have 1509 * to WAIT. This shouldn't happen if we are the 1510 * client. 1511 */ 1512 pr_err("process_connect got WAIT as client\n"); 1513 con->error_msg = "protocol error, got WAIT as client"; 1514 return -1; 1515 1516 default: 1517 pr_err("connect protocol error, will retry\n"); 1518 con->error_msg = "protocol error, garbage tag during connect"; 1519 return -1; 1520 } 1521 return 0; 1522 } 1523 1524 1525 /* 1526 * read (part of) an ack 1527 */ 1528 static int read_partial_ack(struct ceph_connection *con) 1529 { 1530 int size = sizeof (con->in_temp_ack); 1531 int end = size; 1532 1533 return read_partial(con, end, size, &con->in_temp_ack); 1534 } 1535 1536 1537 /* 1538 * We can finally discard anything that's been acked. 1539 */ 1540 static void process_ack(struct ceph_connection *con) 1541 { 1542 struct ceph_msg *m; 1543 u64 ack = le64_to_cpu(con->in_temp_ack); 1544 u64 seq; 1545 1546 while (!list_empty(&con->out_sent)) { 1547 m = list_first_entry(&con->out_sent, struct ceph_msg, 1548 list_head); 1549 seq = le64_to_cpu(m->hdr.seq); 1550 if (seq > ack) 1551 break; 1552 dout("got ack for seq %llu type %d at %p\n", seq, 1553 le16_to_cpu(m->hdr.type), m); 1554 m->ack_stamp = jiffies; 1555 ceph_msg_remove(m); 1556 } 1557 prepare_read_tag(con); 1558 } 1559 1560 1561 1562 1563 static int read_partial_message_section(struct ceph_connection *con, 1564 struct kvec *section, 1565 unsigned int sec_len, u32 *crc) 1566 { 1567 int ret, left; 1568 1569 BUG_ON(!section); 1570 1571 while (section->iov_len < sec_len) { 1572 BUG_ON(section->iov_base == NULL); 1573 left = sec_len - section->iov_len; 1574 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 1575 section->iov_len, left); 1576 if (ret <= 0) 1577 return ret; 1578 section->iov_len += ret; 1579 } 1580 if (section->iov_len == sec_len) 1581 *crc = crc32c(0, section->iov_base, section->iov_len); 1582 1583 return 1; 1584 } 1585 1586 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1587 struct ceph_msg_header *hdr, 1588 int *skip); 1589 1590 1591 static int read_partial_message_pages(struct ceph_connection *con, 1592 struct page **pages, 1593 unsigned int data_len, bool do_datacrc) 1594 { 1595 void *p; 1596 int ret; 1597 int left; 1598 1599 left = min((int)(data_len - con->in_msg_pos.data_pos), 1600 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1601 /* (page) data */ 1602 BUG_ON(pages == NULL); 1603 p = kmap(pages[con->in_msg_pos.page]); 1604 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1605 left); 1606 if (ret > 0 && do_datacrc) 1607 con->in_data_crc = 1608 crc32c(con->in_data_crc, 1609 p + con->in_msg_pos.page_pos, ret); 1610 kunmap(pages[con->in_msg_pos.page]); 1611 if (ret <= 0) 1612 return ret; 1613 con->in_msg_pos.data_pos += ret; 1614 con->in_msg_pos.page_pos += ret; 1615 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1616 con->in_msg_pos.page_pos = 0; 1617 con->in_msg_pos.page++; 1618 } 1619 1620 return ret; 1621 } 1622 1623 #ifdef CONFIG_BLOCK 1624 static int read_partial_message_bio(struct ceph_connection *con, 1625 struct bio **bio_iter, int *bio_seg, 1626 unsigned int data_len, bool do_datacrc) 1627 { 1628 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg); 1629 void *p; 1630 int ret, left; 1631 1632 if (IS_ERR(bv)) 1633 return PTR_ERR(bv); 1634 1635 left = min((int)(data_len - con->in_msg_pos.data_pos), 1636 (int)(bv->bv_len - con->in_msg_pos.page_pos)); 1637 1638 p = kmap(bv->bv_page) + bv->bv_offset; 1639 1640 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1641 left); 1642 if (ret > 0 && do_datacrc) 1643 con->in_data_crc = 1644 crc32c(con->in_data_crc, 1645 p + con->in_msg_pos.page_pos, ret); 1646 kunmap(bv->bv_page); 1647 if (ret <= 0) 1648 return ret; 1649 con->in_msg_pos.data_pos += ret; 1650 con->in_msg_pos.page_pos += ret; 1651 if (con->in_msg_pos.page_pos == bv->bv_len) { 1652 con->in_msg_pos.page_pos = 0; 1653 iter_bio_next(bio_iter, bio_seg); 1654 } 1655 1656 return ret; 1657 } 1658 #endif 1659 1660 /* 1661 * read (part of) a message. 1662 */ 1663 static int read_partial_message(struct ceph_connection *con) 1664 { 1665 struct ceph_msg *m = con->in_msg; 1666 int size; 1667 int end; 1668 int ret; 1669 unsigned int front_len, middle_len, data_len; 1670 bool do_datacrc = !con->msgr->nocrc; 1671 int skip; 1672 u64 seq; 1673 u32 crc; 1674 1675 dout("read_partial_message con %p msg %p\n", con, m); 1676 1677 /* header */ 1678 size = sizeof (con->in_hdr); 1679 end = size; 1680 ret = read_partial(con, end, size, &con->in_hdr); 1681 if (ret <= 0) 1682 return ret; 1683 1684 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); 1685 if (cpu_to_le32(crc) != con->in_hdr.crc) { 1686 pr_err("read_partial_message bad hdr " 1687 " crc %u != expected %u\n", 1688 crc, con->in_hdr.crc); 1689 return -EBADMSG; 1690 } 1691 1692 front_len = le32_to_cpu(con->in_hdr.front_len); 1693 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1694 return -EIO; 1695 middle_len = le32_to_cpu(con->in_hdr.middle_len); 1696 if (middle_len > CEPH_MSG_MAX_DATA_LEN) 1697 return -EIO; 1698 data_len = le32_to_cpu(con->in_hdr.data_len); 1699 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1700 return -EIO; 1701 1702 /* verify seq# */ 1703 seq = le64_to_cpu(con->in_hdr.seq); 1704 if ((s64)seq - (s64)con->in_seq < 1) { 1705 pr_info("skipping %s%lld %s seq %lld expected %lld\n", 1706 ENTITY_NAME(con->peer_name), 1707 ceph_pr_addr(&con->peer_addr.in_addr), 1708 seq, con->in_seq + 1); 1709 con->in_base_pos = -front_len - middle_len - data_len - 1710 sizeof(m->footer); 1711 con->in_tag = CEPH_MSGR_TAG_READY; 1712 return 0; 1713 } else if ((s64)seq - (s64)con->in_seq > 1) { 1714 pr_err("read_partial_message bad seq %lld expected %lld\n", 1715 seq, con->in_seq + 1); 1716 con->error_msg = "bad message sequence # for incoming message"; 1717 return -EBADMSG; 1718 } 1719 1720 /* allocate message? */ 1721 if (!con->in_msg) { 1722 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1723 con->in_hdr.front_len, con->in_hdr.data_len); 1724 skip = 0; 1725 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1726 if (skip) { 1727 /* skip this message */ 1728 dout("alloc_msg said skip message\n"); 1729 BUG_ON(con->in_msg); 1730 con->in_base_pos = -front_len - middle_len - data_len - 1731 sizeof(m->footer); 1732 con->in_tag = CEPH_MSGR_TAG_READY; 1733 con->in_seq++; 1734 return 0; 1735 } 1736 if (!con->in_msg) { 1737 con->error_msg = 1738 "error allocating memory for incoming message"; 1739 return -ENOMEM; 1740 } 1741 m = con->in_msg; 1742 m->front.iov_len = 0; /* haven't read it yet */ 1743 if (m->middle) 1744 m->middle->vec.iov_len = 0; 1745 1746 con->in_msg_pos.page = 0; 1747 if (m->pages) 1748 con->in_msg_pos.page_pos = m->page_alignment; 1749 else 1750 con->in_msg_pos.page_pos = 0; 1751 con->in_msg_pos.data_pos = 0; 1752 } 1753 1754 /* front */ 1755 ret = read_partial_message_section(con, &m->front, front_len, 1756 &con->in_front_crc); 1757 if (ret <= 0) 1758 return ret; 1759 1760 /* middle */ 1761 if (m->middle) { 1762 ret = read_partial_message_section(con, &m->middle->vec, 1763 middle_len, 1764 &con->in_middle_crc); 1765 if (ret <= 0) 1766 return ret; 1767 } 1768 #ifdef CONFIG_BLOCK 1769 if (m->bio && !m->bio_iter) 1770 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); 1771 #endif 1772 1773 /* (page) data */ 1774 while (con->in_msg_pos.data_pos < data_len) { 1775 if (m->pages) { 1776 ret = read_partial_message_pages(con, m->pages, 1777 data_len, do_datacrc); 1778 if (ret <= 0) 1779 return ret; 1780 #ifdef CONFIG_BLOCK 1781 } else if (m->bio) { 1782 1783 ret = read_partial_message_bio(con, 1784 &m->bio_iter, &m->bio_seg, 1785 data_len, do_datacrc); 1786 if (ret <= 0) 1787 return ret; 1788 #endif 1789 } else { 1790 BUG_ON(1); 1791 } 1792 } 1793 1794 /* footer */ 1795 size = sizeof (m->footer); 1796 end += size; 1797 ret = read_partial(con, end, size, &m->footer); 1798 if (ret <= 0) 1799 return ret; 1800 1801 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1802 m, front_len, m->footer.front_crc, middle_len, 1803 m->footer.middle_crc, data_len, m->footer.data_crc); 1804 1805 /* crc ok? */ 1806 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 1807 pr_err("read_partial_message %p front crc %u != exp. %u\n", 1808 m, con->in_front_crc, m->footer.front_crc); 1809 return -EBADMSG; 1810 } 1811 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 1812 pr_err("read_partial_message %p middle crc %u != exp %u\n", 1813 m, con->in_middle_crc, m->footer.middle_crc); 1814 return -EBADMSG; 1815 } 1816 if (do_datacrc && 1817 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 1818 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 1819 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 1820 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 1821 return -EBADMSG; 1822 } 1823 1824 return 1; /* done! */ 1825 } 1826 1827 /* 1828 * Process message. This happens in the worker thread. The callback should 1829 * be careful not to do anything that waits on other incoming messages or it 1830 * may deadlock. 1831 */ 1832 static void process_message(struct ceph_connection *con) 1833 { 1834 struct ceph_msg *msg; 1835 1836 msg = con->in_msg; 1837 con->in_msg = NULL; 1838 1839 /* if first message, set peer_name */ 1840 if (con->peer_name.type == 0) 1841 con->peer_name = msg->hdr.src; 1842 1843 con->in_seq++; 1844 mutex_unlock(&con->mutex); 1845 1846 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1847 msg, le64_to_cpu(msg->hdr.seq), 1848 ENTITY_NAME(msg->hdr.src), 1849 le16_to_cpu(msg->hdr.type), 1850 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1851 le32_to_cpu(msg->hdr.front_len), 1852 le32_to_cpu(msg->hdr.data_len), 1853 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 1854 con->ops->dispatch(con, msg); 1855 1856 mutex_lock(&con->mutex); 1857 prepare_read_tag(con); 1858 } 1859 1860 1861 /* 1862 * Write something to the socket. Called in a worker thread when the 1863 * socket appears to be writeable and we have something ready to send. 1864 */ 1865 static int try_write(struct ceph_connection *con) 1866 { 1867 int ret = 1; 1868 1869 dout("try_write start %p state %lu nref %d\n", con, con->state, 1870 atomic_read(&con->nref)); 1871 1872 more: 1873 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1874 1875 /* open the socket first? */ 1876 if (con->sock == NULL) { 1877 ceph_con_out_kvec_reset(con); 1878 prepare_write_banner(con); 1879 ret = prepare_write_connect(con); 1880 if (ret < 0) 1881 goto out; 1882 prepare_read_banner(con); 1883 set_bit(CONNECTING, &con->state); 1884 clear_bit(NEGOTIATING, &con->state); 1885 1886 BUG_ON(con->in_msg); 1887 con->in_tag = CEPH_MSGR_TAG_READY; 1888 dout("try_write initiating connect on %p new state %lu\n", 1889 con, con->state); 1890 ret = ceph_tcp_connect(con); 1891 if (ret < 0) { 1892 con->error_msg = "connect error"; 1893 goto out; 1894 } 1895 } 1896 1897 more_kvec: 1898 /* kvec data queued? */ 1899 if (con->out_skip) { 1900 ret = write_partial_skip(con); 1901 if (ret <= 0) 1902 goto out; 1903 } 1904 if (con->out_kvec_left) { 1905 ret = write_partial_kvec(con); 1906 if (ret <= 0) 1907 goto out; 1908 } 1909 1910 /* msg pages? */ 1911 if (con->out_msg) { 1912 if (con->out_msg_done) { 1913 ceph_msg_put(con->out_msg); 1914 con->out_msg = NULL; /* we're done with this one */ 1915 goto do_next; 1916 } 1917 1918 ret = write_partial_msg_pages(con); 1919 if (ret == 1) 1920 goto more_kvec; /* we need to send the footer, too! */ 1921 if (ret == 0) 1922 goto out; 1923 if (ret < 0) { 1924 dout("try_write write_partial_msg_pages err %d\n", 1925 ret); 1926 goto out; 1927 } 1928 } 1929 1930 do_next: 1931 if (!test_bit(CONNECTING, &con->state)) { 1932 /* is anything else pending? */ 1933 if (!list_empty(&con->out_queue)) { 1934 prepare_write_message(con); 1935 goto more; 1936 } 1937 if (con->in_seq > con->in_seq_acked) { 1938 prepare_write_ack(con); 1939 goto more; 1940 } 1941 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { 1942 prepare_write_keepalive(con); 1943 goto more; 1944 } 1945 } 1946 1947 /* Nothing to do! */ 1948 clear_bit(WRITE_PENDING, &con->state); 1949 dout("try_write nothing else to write.\n"); 1950 ret = 0; 1951 out: 1952 dout("try_write done on %p ret %d\n", con, ret); 1953 return ret; 1954 } 1955 1956 1957 1958 /* 1959 * Read what we can from the socket. 1960 */ 1961 static int try_read(struct ceph_connection *con) 1962 { 1963 int ret = -1; 1964 1965 if (!con->sock) 1966 return 0; 1967 1968 if (test_bit(STANDBY, &con->state)) 1969 return 0; 1970 1971 dout("try_read start on %p\n", con); 1972 1973 more: 1974 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1975 con->in_base_pos); 1976 1977 /* 1978 * process_connect and process_message drop and re-take 1979 * con->mutex. make sure we handle a racing close or reopen. 1980 */ 1981 if (test_bit(CLOSED, &con->state) || 1982 test_bit(OPENING, &con->state)) { 1983 ret = -EAGAIN; 1984 goto out; 1985 } 1986 1987 if (test_bit(CONNECTING, &con->state)) { 1988 if (!test_bit(NEGOTIATING, &con->state)) { 1989 dout("try_read connecting\n"); 1990 ret = read_partial_banner(con); 1991 if (ret <= 0) 1992 goto out; 1993 ret = process_banner(con); 1994 if (ret < 0) 1995 goto out; 1996 } 1997 ret = read_partial_connect(con); 1998 if (ret <= 0) 1999 goto out; 2000 ret = process_connect(con); 2001 if (ret < 0) 2002 goto out; 2003 goto more; 2004 } 2005 2006 if (con->in_base_pos < 0) { 2007 /* 2008 * skipping + discarding content. 2009 * 2010 * FIXME: there must be a better way to do this! 2011 */ 2012 static char buf[SKIP_BUF_SIZE]; 2013 int skip = min((int) sizeof (buf), -con->in_base_pos); 2014 2015 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); 2016 ret = ceph_tcp_recvmsg(con->sock, buf, skip); 2017 if (ret <= 0) 2018 goto out; 2019 con->in_base_pos += ret; 2020 if (con->in_base_pos) 2021 goto more; 2022 } 2023 if (con->in_tag == CEPH_MSGR_TAG_READY) { 2024 /* 2025 * what's next? 2026 */ 2027 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 2028 if (ret <= 0) 2029 goto out; 2030 dout("try_read got tag %d\n", (int)con->in_tag); 2031 switch (con->in_tag) { 2032 case CEPH_MSGR_TAG_MSG: 2033 prepare_read_message(con); 2034 break; 2035 case CEPH_MSGR_TAG_ACK: 2036 prepare_read_ack(con); 2037 break; 2038 case CEPH_MSGR_TAG_CLOSE: 2039 set_bit(CLOSED, &con->state); /* fixme */ 2040 goto out; 2041 default: 2042 goto bad_tag; 2043 } 2044 } 2045 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 2046 ret = read_partial_message(con); 2047 if (ret <= 0) { 2048 switch (ret) { 2049 case -EBADMSG: 2050 con->error_msg = "bad crc"; 2051 ret = -EIO; 2052 break; 2053 case -EIO: 2054 con->error_msg = "io error"; 2055 break; 2056 } 2057 goto out; 2058 } 2059 if (con->in_tag == CEPH_MSGR_TAG_READY) 2060 goto more; 2061 process_message(con); 2062 goto more; 2063 } 2064 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 2065 ret = read_partial_ack(con); 2066 if (ret <= 0) 2067 goto out; 2068 process_ack(con); 2069 goto more; 2070 } 2071 2072 out: 2073 dout("try_read done on %p ret %d\n", con, ret); 2074 return ret; 2075 2076 bad_tag: 2077 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 2078 con->error_msg = "protocol error, garbage tag"; 2079 ret = -1; 2080 goto out; 2081 } 2082 2083 2084 /* 2085 * Atomically queue work on a connection. Bump @con reference to 2086 * avoid races with connection teardown. 2087 */ 2088 static void queue_con(struct ceph_connection *con) 2089 { 2090 if (test_bit(DEAD, &con->state)) { 2091 dout("queue_con %p ignoring: DEAD\n", 2092 con); 2093 return; 2094 } 2095 2096 if (!con->ops->get(con)) { 2097 dout("queue_con %p ref count 0\n", con); 2098 return; 2099 } 2100 2101 if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { 2102 dout("queue_con %p - already queued\n", con); 2103 con->ops->put(con); 2104 } else { 2105 dout("queue_con %p\n", con); 2106 } 2107 } 2108 2109 /* 2110 * Do some work on a connection. Drop a connection ref when we're done. 2111 */ 2112 static void con_work(struct work_struct *work) 2113 { 2114 struct ceph_connection *con = container_of(work, struct ceph_connection, 2115 work.work); 2116 int ret; 2117 2118 mutex_lock(&con->mutex); 2119 restart: 2120 if (test_and_clear_bit(BACKOFF, &con->state)) { 2121 dout("con_work %p backing off\n", con); 2122 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2123 round_jiffies_relative(con->delay))) { 2124 dout("con_work %p backoff %lu\n", con, con->delay); 2125 mutex_unlock(&con->mutex); 2126 return; 2127 } else { 2128 con->ops->put(con); 2129 dout("con_work %p FAILED to back off %lu\n", con, 2130 con->delay); 2131 } 2132 } 2133 2134 if (test_bit(STANDBY, &con->state)) { 2135 dout("con_work %p STANDBY\n", con); 2136 goto done; 2137 } 2138 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 2139 dout("con_work CLOSED\n"); 2140 con_close_socket(con); 2141 goto done; 2142 } 2143 if (test_and_clear_bit(OPENING, &con->state)) { 2144 /* reopen w/ new peer */ 2145 dout("con_work OPENING\n"); 2146 con_close_socket(con); 2147 } 2148 2149 if (test_and_clear_bit(SOCK_CLOSED, &con->state)) 2150 goto fault; 2151 2152 ret = try_read(con); 2153 if (ret == -EAGAIN) 2154 goto restart; 2155 if (ret < 0) 2156 goto fault; 2157 2158 ret = try_write(con); 2159 if (ret == -EAGAIN) 2160 goto restart; 2161 if (ret < 0) 2162 goto fault; 2163 2164 done: 2165 mutex_unlock(&con->mutex); 2166 done_unlocked: 2167 con->ops->put(con); 2168 return; 2169 2170 fault: 2171 mutex_unlock(&con->mutex); 2172 ceph_fault(con); /* error/fault path */ 2173 goto done_unlocked; 2174 } 2175 2176 2177 /* 2178 * Generic error/fault handler. A retry mechanism is used with 2179 * exponential backoff 2180 */ 2181 static void ceph_fault(struct ceph_connection *con) 2182 { 2183 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2184 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2185 dout("fault %p state %lu to peer %s\n", 2186 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2187 2188 if (test_bit(LOSSYTX, &con->state)) { 2189 dout("fault on LOSSYTX channel\n"); 2190 goto out; 2191 } 2192 2193 mutex_lock(&con->mutex); 2194 if (test_bit(CLOSED, &con->state)) 2195 goto out_unlock; 2196 2197 con_close_socket(con); 2198 2199 if (con->in_msg) { 2200 ceph_msg_put(con->in_msg); 2201 con->in_msg = NULL; 2202 } 2203 2204 /* Requeue anything that hasn't been acked */ 2205 list_splice_init(&con->out_sent, &con->out_queue); 2206 2207 /* If there are no messages queued or keepalive pending, place 2208 * the connection in a STANDBY state */ 2209 if (list_empty(&con->out_queue) && 2210 !test_bit(KEEPALIVE_PENDING, &con->state)) { 2211 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2212 clear_bit(WRITE_PENDING, &con->state); 2213 set_bit(STANDBY, &con->state); 2214 } else { 2215 /* retry after a delay. */ 2216 if (con->delay == 0) 2217 con->delay = BASE_DELAY_INTERVAL; 2218 else if (con->delay < MAX_DELAY_INTERVAL) 2219 con->delay *= 2; 2220 con->ops->get(con); 2221 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2222 round_jiffies_relative(con->delay))) { 2223 dout("fault queued %p delay %lu\n", con, con->delay); 2224 } else { 2225 con->ops->put(con); 2226 dout("fault failed to queue %p delay %lu, backoff\n", 2227 con, con->delay); 2228 /* 2229 * In many cases we see a socket state change 2230 * while con_work is running and end up 2231 * queuing (non-delayed) work, such that we 2232 * can't backoff with a delay. Set a flag so 2233 * that when con_work restarts we schedule the 2234 * delay then. 2235 */ 2236 set_bit(BACKOFF, &con->state); 2237 } 2238 } 2239 2240 out_unlock: 2241 mutex_unlock(&con->mutex); 2242 out: 2243 /* 2244 * in case we faulted due to authentication, invalidate our 2245 * current tickets so that we can get new ones. 2246 */ 2247 if (con->auth_retry && con->ops->invalidate_authorizer) { 2248 dout("calling invalidate_authorizer()\n"); 2249 con->ops->invalidate_authorizer(con); 2250 } 2251 2252 if (con->ops->fault) 2253 con->ops->fault(con); 2254 } 2255 2256 2257 2258 /* 2259 * create a new messenger instance 2260 */ 2261 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, 2262 u32 supported_features, 2263 u32 required_features) 2264 { 2265 struct ceph_messenger *msgr; 2266 2267 msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); 2268 if (msgr == NULL) 2269 return ERR_PTR(-ENOMEM); 2270 2271 msgr->supported_features = supported_features; 2272 msgr->required_features = required_features; 2273 2274 spin_lock_init(&msgr->global_seq_lock); 2275 2276 if (myaddr) 2277 msgr->inst.addr = *myaddr; 2278 2279 /* select a random nonce */ 2280 msgr->inst.addr.type = 0; 2281 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2282 encode_my_addr(msgr); 2283 2284 dout("messenger_create %p\n", msgr); 2285 return msgr; 2286 } 2287 EXPORT_SYMBOL(ceph_messenger_create); 2288 2289 void ceph_messenger_destroy(struct ceph_messenger *msgr) 2290 { 2291 dout("destroy %p\n", msgr); 2292 kfree(msgr); 2293 dout("destroyed messenger %p\n", msgr); 2294 } 2295 EXPORT_SYMBOL(ceph_messenger_destroy); 2296 2297 static void clear_standby(struct ceph_connection *con) 2298 { 2299 /* come back from STANDBY? */ 2300 if (test_and_clear_bit(STANDBY, &con->state)) { 2301 mutex_lock(&con->mutex); 2302 dout("clear_standby %p and ++connect_seq\n", con); 2303 con->connect_seq++; 2304 WARN_ON(test_bit(WRITE_PENDING, &con->state)); 2305 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); 2306 mutex_unlock(&con->mutex); 2307 } 2308 } 2309 2310 /* 2311 * Queue up an outgoing message on the given connection. 2312 */ 2313 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 2314 { 2315 if (test_bit(CLOSED, &con->state)) { 2316 dout("con_send %p closed, dropping %p\n", con, msg); 2317 ceph_msg_put(msg); 2318 return; 2319 } 2320 2321 /* set src+dst */ 2322 msg->hdr.src = con->msgr->inst.name; 2323 2324 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 2325 2326 msg->needs_out_seq = true; 2327 2328 /* queue */ 2329 mutex_lock(&con->mutex); 2330 BUG_ON(!list_empty(&msg->list_head)); 2331 list_add_tail(&msg->list_head, &con->out_queue); 2332 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 2333 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), 2334 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 2335 le32_to_cpu(msg->hdr.front_len), 2336 le32_to_cpu(msg->hdr.middle_len), 2337 le32_to_cpu(msg->hdr.data_len)); 2338 mutex_unlock(&con->mutex); 2339 2340 /* if there wasn't anything waiting to send before, queue 2341 * new work */ 2342 clear_standby(con); 2343 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2344 queue_con(con); 2345 } 2346 EXPORT_SYMBOL(ceph_con_send); 2347 2348 /* 2349 * Revoke a message that was previously queued for send 2350 */ 2351 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) 2352 { 2353 mutex_lock(&con->mutex); 2354 if (!list_empty(&msg->list_head)) { 2355 dout("con_revoke %p msg %p - was on queue\n", con, msg); 2356 list_del_init(&msg->list_head); 2357 ceph_msg_put(msg); 2358 msg->hdr.seq = 0; 2359 } 2360 if (con->out_msg == msg) { 2361 dout("con_revoke %p msg %p - was sending\n", con, msg); 2362 con->out_msg = NULL; 2363 if (con->out_kvec_is_msg) { 2364 con->out_skip = con->out_kvec_bytes; 2365 con->out_kvec_is_msg = false; 2366 } 2367 ceph_msg_put(msg); 2368 msg->hdr.seq = 0; 2369 } 2370 mutex_unlock(&con->mutex); 2371 } 2372 2373 /* 2374 * Revoke a message that we may be reading data into 2375 */ 2376 void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) 2377 { 2378 mutex_lock(&con->mutex); 2379 if (con->in_msg && con->in_msg == msg) { 2380 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); 2381 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); 2382 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); 2383 2384 /* skip rest of message */ 2385 dout("con_revoke_pages %p msg %p revoked\n", con, msg); 2386 con->in_base_pos = con->in_base_pos - 2387 sizeof(struct ceph_msg_header) - 2388 front_len - 2389 middle_len - 2390 data_len - 2391 sizeof(struct ceph_msg_footer); 2392 ceph_msg_put(con->in_msg); 2393 con->in_msg = NULL; 2394 con->in_tag = CEPH_MSGR_TAG_READY; 2395 con->in_seq++; 2396 } else { 2397 dout("con_revoke_pages %p msg %p pages %p no-op\n", 2398 con, con->in_msg, msg); 2399 } 2400 mutex_unlock(&con->mutex); 2401 } 2402 2403 /* 2404 * Queue a keepalive byte to ensure the tcp connection is alive. 2405 */ 2406 void ceph_con_keepalive(struct ceph_connection *con) 2407 { 2408 dout("con_keepalive %p\n", con); 2409 clear_standby(con); 2410 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2411 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2412 queue_con(con); 2413 } 2414 EXPORT_SYMBOL(ceph_con_keepalive); 2415 2416 2417 /* 2418 * construct a new message with given type, size 2419 * the new msg has a ref count of 1. 2420 */ 2421 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 2422 bool can_fail) 2423 { 2424 struct ceph_msg *m; 2425 2426 m = kmalloc(sizeof(*m), flags); 2427 if (m == NULL) 2428 goto out; 2429 kref_init(&m->kref); 2430 INIT_LIST_HEAD(&m->list_head); 2431 2432 m->hdr.tid = 0; 2433 m->hdr.type = cpu_to_le16(type); 2434 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 2435 m->hdr.version = 0; 2436 m->hdr.front_len = cpu_to_le32(front_len); 2437 m->hdr.middle_len = 0; 2438 m->hdr.data_len = 0; 2439 m->hdr.data_off = 0; 2440 m->hdr.reserved = 0; 2441 m->footer.front_crc = 0; 2442 m->footer.middle_crc = 0; 2443 m->footer.data_crc = 0; 2444 m->footer.flags = 0; 2445 m->front_max = front_len; 2446 m->front_is_vmalloc = false; 2447 m->more_to_follow = false; 2448 m->ack_stamp = 0; 2449 m->pool = NULL; 2450 2451 /* middle */ 2452 m->middle = NULL; 2453 2454 /* data */ 2455 m->nr_pages = 0; 2456 m->page_alignment = 0; 2457 m->pages = NULL; 2458 m->pagelist = NULL; 2459 m->bio = NULL; 2460 m->bio_iter = NULL; 2461 m->bio_seg = 0; 2462 m->trail = NULL; 2463 2464 /* front */ 2465 if (front_len) { 2466 if (front_len > PAGE_CACHE_SIZE) { 2467 m->front.iov_base = __vmalloc(front_len, flags, 2468 PAGE_KERNEL); 2469 m->front_is_vmalloc = true; 2470 } else { 2471 m->front.iov_base = kmalloc(front_len, flags); 2472 } 2473 if (m->front.iov_base == NULL) { 2474 dout("ceph_msg_new can't allocate %d bytes\n", 2475 front_len); 2476 goto out2; 2477 } 2478 } else { 2479 m->front.iov_base = NULL; 2480 } 2481 m->front.iov_len = front_len; 2482 2483 dout("ceph_msg_new %p front %d\n", m, front_len); 2484 return m; 2485 2486 out2: 2487 ceph_msg_put(m); 2488 out: 2489 if (!can_fail) { 2490 pr_err("msg_new can't create type %d front %d\n", type, 2491 front_len); 2492 WARN_ON(1); 2493 } else { 2494 dout("msg_new can't create type %d front %d\n", type, 2495 front_len); 2496 } 2497 return NULL; 2498 } 2499 EXPORT_SYMBOL(ceph_msg_new); 2500 2501 /* 2502 * Allocate "middle" portion of a message, if it is needed and wasn't 2503 * allocated by alloc_msg. This allows us to read a small fixed-size 2504 * per-type header in the front and then gracefully fail (i.e., 2505 * propagate the error to the caller based on info in the front) when 2506 * the middle is too large. 2507 */ 2508 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 2509 { 2510 int type = le16_to_cpu(msg->hdr.type); 2511 int middle_len = le32_to_cpu(msg->hdr.middle_len); 2512 2513 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, 2514 ceph_msg_type_name(type), middle_len); 2515 BUG_ON(!middle_len); 2516 BUG_ON(msg->middle); 2517 2518 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS); 2519 if (!msg->middle) 2520 return -ENOMEM; 2521 return 0; 2522 } 2523 2524 /* 2525 * Generic message allocator, for incoming messages. 2526 */ 2527 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 2528 struct ceph_msg_header *hdr, 2529 int *skip) 2530 { 2531 int type = le16_to_cpu(hdr->type); 2532 int front_len = le32_to_cpu(hdr->front_len); 2533 int middle_len = le32_to_cpu(hdr->middle_len); 2534 struct ceph_msg *msg = NULL; 2535 int ret; 2536 2537 if (con->ops->alloc_msg) { 2538 mutex_unlock(&con->mutex); 2539 msg = con->ops->alloc_msg(con, hdr, skip); 2540 mutex_lock(&con->mutex); 2541 if (!msg || *skip) 2542 return NULL; 2543 } 2544 if (!msg) { 2545 *skip = 0; 2546 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 2547 if (!msg) { 2548 pr_err("unable to allocate msg type %d len %d\n", 2549 type, front_len); 2550 return NULL; 2551 } 2552 msg->page_alignment = le16_to_cpu(hdr->data_off); 2553 } 2554 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2555 2556 if (middle_len && !msg->middle) { 2557 ret = ceph_alloc_middle(con, msg); 2558 if (ret < 0) { 2559 ceph_msg_put(msg); 2560 return NULL; 2561 } 2562 } 2563 2564 return msg; 2565 } 2566 2567 2568 /* 2569 * Free a generically kmalloc'd message. 2570 */ 2571 void ceph_msg_kfree(struct ceph_msg *m) 2572 { 2573 dout("msg_kfree %p\n", m); 2574 if (m->front_is_vmalloc) 2575 vfree(m->front.iov_base); 2576 else 2577 kfree(m->front.iov_base); 2578 kfree(m); 2579 } 2580 2581 /* 2582 * Drop a msg ref. Destroy as needed. 2583 */ 2584 void ceph_msg_last_put(struct kref *kref) 2585 { 2586 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 2587 2588 dout("ceph_msg_put last one on %p\n", m); 2589 WARN_ON(!list_empty(&m->list_head)); 2590 2591 /* drop middle, data, if any */ 2592 if (m->middle) { 2593 ceph_buffer_put(m->middle); 2594 m->middle = NULL; 2595 } 2596 m->nr_pages = 0; 2597 m->pages = NULL; 2598 2599 if (m->pagelist) { 2600 ceph_pagelist_release(m->pagelist); 2601 kfree(m->pagelist); 2602 m->pagelist = NULL; 2603 } 2604 2605 m->trail = NULL; 2606 2607 if (m->pool) 2608 ceph_msgpool_put(m->pool, m); 2609 else 2610 ceph_msg_kfree(m); 2611 } 2612 EXPORT_SYMBOL(ceph_msg_last_put); 2613 2614 void ceph_msg_dump(struct ceph_msg *msg) 2615 { 2616 pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, 2617 msg->front_max, msg->nr_pages); 2618 print_hex_dump(KERN_DEBUG, "header: ", 2619 DUMP_PREFIX_OFFSET, 16, 1, 2620 &msg->hdr, sizeof(msg->hdr), true); 2621 print_hex_dump(KERN_DEBUG, " front: ", 2622 DUMP_PREFIX_OFFSET, 16, 1, 2623 msg->front.iov_base, msg->front.iov_len, true); 2624 if (msg->middle) 2625 print_hex_dump(KERN_DEBUG, "middle: ", 2626 DUMP_PREFIX_OFFSET, 16, 1, 2627 msg->middle->vec.iov_base, 2628 msg->middle->vec.iov_len, true); 2629 print_hex_dump(KERN_DEBUG, "footer: ", 2630 DUMP_PREFIX_OFFSET, 16, 1, 2631 &msg->footer, sizeof(msg->footer), true); 2632 } 2633 EXPORT_SYMBOL(ceph_msg_dump); 2634