1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/crc32c.h> 4 #include <linux/ctype.h> 5 #include <linux/highmem.h> 6 #include <linux/inet.h> 7 #include <linux/kthread.h> 8 #include <linux/net.h> 9 #include <linux/slab.h> 10 #include <linux/socket.h> 11 #include <linux/string.h> 12 #include <linux/bio.h> 13 #include <linux/blkdev.h> 14 #include <linux/dns_resolver.h> 15 #include <net/tcp.h> 16 17 #include <linux/ceph/libceph.h> 18 #include <linux/ceph/messenger.h> 19 #include <linux/ceph/decode.h> 20 #include <linux/ceph/pagelist.h> 21 #include <linux/export.h> 22 23 /* 24 * Ceph uses the messenger to exchange ceph_msg messages with other 25 * hosts in the system. The messenger provides ordered and reliable 26 * delivery. We tolerate TCP disconnects by reconnecting (with 27 * exponential backoff) in the case of a fault (disconnection, bad 28 * crc, protocol error). Acks allow sent messages to be discarded by 29 * the sender. 30 */ 31 32 /* static tag bytes (protocol control messages) */ 33 static char tag_msg = CEPH_MSGR_TAG_MSG; 34 static char tag_ack = CEPH_MSGR_TAG_ACK; 35 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 36 37 #ifdef CONFIG_LOCKDEP 38 static struct lock_class_key socket_class; 39 #endif 40 41 /* 42 * When skipping (ignoring) a block of input we read it into a "skip 43 * buffer," which is this many bytes in size. 44 */ 45 #define SKIP_BUF_SIZE 1024 46 47 static void queue_con(struct ceph_connection *con); 48 static void con_work(struct work_struct *); 49 static void ceph_fault(struct ceph_connection *con); 50 51 /* 52 * Nicely render a sockaddr as a string. An array of formatted 53 * strings is used, to approximate reentrancy. 54 */ 55 #define ADDR_STR_COUNT_LOG 5 /* log2(# address strings in array) */ 56 #define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG) 57 #define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1) 58 #define MAX_ADDR_STR_LEN 64 /* 54 is enough */ 59 60 static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN]; 61 static atomic_t addr_str_seq = ATOMIC_INIT(0); 62 63 static struct page *zero_page; /* used in certain error cases */ 64 65 const char *ceph_pr_addr(const struct sockaddr_storage *ss) 66 { 67 int i; 68 char *s; 69 struct sockaddr_in *in4 = (struct sockaddr_in *) ss; 70 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss; 71 72 i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK; 73 s = addr_str[i]; 74 75 switch (ss->ss_family) { 76 case AF_INET: 77 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%hu", &in4->sin_addr, 78 ntohs(in4->sin_port)); 79 break; 80 81 case AF_INET6: 82 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%hu", &in6->sin6_addr, 83 ntohs(in6->sin6_port)); 84 break; 85 86 default: 87 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)", 88 ss->ss_family); 89 } 90 91 return s; 92 } 93 EXPORT_SYMBOL(ceph_pr_addr); 94 95 static void encode_my_addr(struct ceph_messenger *msgr) 96 { 97 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); 98 ceph_encode_addr(&msgr->my_enc_addr); 99 } 100 101 /* 102 * work queue for all reading and writing to/from the socket. 103 */ 104 static struct workqueue_struct *ceph_msgr_wq; 105 106 void _ceph_msgr_exit(void) 107 { 108 if (ceph_msgr_wq) { 109 destroy_workqueue(ceph_msgr_wq); 110 ceph_msgr_wq = NULL; 111 } 112 113 BUG_ON(zero_page == NULL); 114 kunmap(zero_page); 115 page_cache_release(zero_page); 116 zero_page = NULL; 117 } 118 119 int ceph_msgr_init(void) 120 { 121 BUG_ON(zero_page != NULL); 122 zero_page = ZERO_PAGE(0); 123 page_cache_get(zero_page); 124 125 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); 126 if (ceph_msgr_wq) 127 return 0; 128 129 pr_err("msgr_init failed to create workqueue\n"); 130 _ceph_msgr_exit(); 131 132 return -ENOMEM; 133 } 134 EXPORT_SYMBOL(ceph_msgr_init); 135 136 void ceph_msgr_exit(void) 137 { 138 BUG_ON(ceph_msgr_wq == NULL); 139 140 _ceph_msgr_exit(); 141 } 142 EXPORT_SYMBOL(ceph_msgr_exit); 143 144 void ceph_msgr_flush(void) 145 { 146 flush_workqueue(ceph_msgr_wq); 147 } 148 EXPORT_SYMBOL(ceph_msgr_flush); 149 150 151 /* 152 * socket callback functions 153 */ 154 155 /* data available on socket, or listen socket received a connect */ 156 static void ceph_data_ready(struct sock *sk, int count_unused) 157 { 158 struct ceph_connection *con = sk->sk_user_data; 159 160 if (sk->sk_state != TCP_CLOSE_WAIT) { 161 dout("ceph_data_ready on %p state = %lu, queueing work\n", 162 con, con->state); 163 queue_con(con); 164 } 165 } 166 167 /* socket has buffer space for writing */ 168 static void ceph_write_space(struct sock *sk) 169 { 170 struct ceph_connection *con = sk->sk_user_data; 171 172 /* only queue to workqueue if there is data we want to write, 173 * and there is sufficient space in the socket buffer to accept 174 * more data. clear SOCK_NOSPACE so that ceph_write_space() 175 * doesn't get called again until try_write() fills the socket 176 * buffer. See net/ipv4/tcp_input.c:tcp_check_space() 177 * and net/core/stream.c:sk_stream_write_space(). 178 */ 179 if (test_bit(WRITE_PENDING, &con->state)) { 180 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { 181 dout("ceph_write_space %p queueing write work\n", con); 182 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 183 queue_con(con); 184 } 185 } else { 186 dout("ceph_write_space %p nothing to write\n", con); 187 } 188 } 189 190 /* socket's state has changed */ 191 static void ceph_state_change(struct sock *sk) 192 { 193 struct ceph_connection *con = sk->sk_user_data; 194 195 dout("ceph_state_change %p state = %lu sk_state = %u\n", 196 con, con->state, sk->sk_state); 197 198 if (test_bit(CLOSED, &con->state)) 199 return; 200 201 switch (sk->sk_state) { 202 case TCP_CLOSE: 203 dout("ceph_state_change TCP_CLOSE\n"); 204 case TCP_CLOSE_WAIT: 205 dout("ceph_state_change TCP_CLOSE_WAIT\n"); 206 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { 207 if (test_bit(CONNECTING, &con->state)) 208 con->error_msg = "connection failed"; 209 else 210 con->error_msg = "socket closed"; 211 queue_con(con); 212 } 213 break; 214 case TCP_ESTABLISHED: 215 dout("ceph_state_change TCP_ESTABLISHED\n"); 216 queue_con(con); 217 break; 218 default: /* Everything else is uninteresting */ 219 break; 220 } 221 } 222 223 /* 224 * set up socket callbacks 225 */ 226 static void set_sock_callbacks(struct socket *sock, 227 struct ceph_connection *con) 228 { 229 struct sock *sk = sock->sk; 230 sk->sk_user_data = con; 231 sk->sk_data_ready = ceph_data_ready; 232 sk->sk_write_space = ceph_write_space; 233 sk->sk_state_change = ceph_state_change; 234 } 235 236 237 /* 238 * socket helpers 239 */ 240 241 /* 242 * initiate connection to a remote socket. 243 */ 244 static int ceph_tcp_connect(struct ceph_connection *con) 245 { 246 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; 247 struct socket *sock; 248 int ret; 249 250 BUG_ON(con->sock); 251 ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, 252 IPPROTO_TCP, &sock); 253 if (ret) 254 return ret; 255 sock->sk->sk_allocation = GFP_NOFS; 256 257 #ifdef CONFIG_LOCKDEP 258 lockdep_set_class(&sock->sk->sk_lock, &socket_class); 259 #endif 260 261 set_sock_callbacks(sock, con); 262 263 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); 264 265 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), 266 O_NONBLOCK); 267 if (ret == -EINPROGRESS) { 268 dout("connect %s EINPROGRESS sk_state = %u\n", 269 ceph_pr_addr(&con->peer_addr.in_addr), 270 sock->sk->sk_state); 271 } else if (ret < 0) { 272 pr_err("connect %s error %d\n", 273 ceph_pr_addr(&con->peer_addr.in_addr), ret); 274 sock_release(sock); 275 con->error_msg = "connect error"; 276 277 return ret; 278 } 279 con->sock = sock; 280 281 return 0; 282 } 283 284 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 285 { 286 struct kvec iov = {buf, len}; 287 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 288 int r; 289 290 r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); 291 if (r == -EAGAIN) 292 r = 0; 293 return r; 294 } 295 296 /* 297 * write something. @more is true if caller will be sending more data 298 * shortly. 299 */ 300 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 301 size_t kvlen, size_t len, int more) 302 { 303 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 304 int r; 305 306 if (more) 307 msg.msg_flags |= MSG_MORE; 308 else 309 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 310 311 r = kernel_sendmsg(sock, &msg, iov, kvlen, len); 312 if (r == -EAGAIN) 313 r = 0; 314 return r; 315 } 316 317 static int ceph_tcp_sendpage(struct socket *sock, struct page *page, 318 int offset, size_t size, int more) 319 { 320 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR); 321 int ret; 322 323 ret = kernel_sendpage(sock, page, offset, size, flags); 324 if (ret == -EAGAIN) 325 ret = 0; 326 327 return ret; 328 } 329 330 331 /* 332 * Shutdown/close the socket for the given connection. 333 */ 334 static int con_close_socket(struct ceph_connection *con) 335 { 336 int rc; 337 338 dout("con_close_socket on %p sock %p\n", con, con->sock); 339 if (!con->sock) 340 return 0; 341 set_bit(SOCK_CLOSED, &con->state); 342 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 343 sock_release(con->sock); 344 con->sock = NULL; 345 clear_bit(SOCK_CLOSED, &con->state); 346 return rc; 347 } 348 349 /* 350 * Reset a connection. Discard all incoming and outgoing messages 351 * and clear *_seq state. 352 */ 353 static void ceph_msg_remove(struct ceph_msg *msg) 354 { 355 list_del_init(&msg->list_head); 356 ceph_msg_put(msg); 357 } 358 static void ceph_msg_remove_list(struct list_head *head) 359 { 360 while (!list_empty(head)) { 361 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 362 list_head); 363 ceph_msg_remove(msg); 364 } 365 } 366 367 static void reset_connection(struct ceph_connection *con) 368 { 369 /* reset connection, out_queue, msg_ and connect_seq */ 370 /* discard existing out_queue and msg_seq */ 371 ceph_msg_remove_list(&con->out_queue); 372 ceph_msg_remove_list(&con->out_sent); 373 374 if (con->in_msg) { 375 ceph_msg_put(con->in_msg); 376 con->in_msg = NULL; 377 } 378 379 con->connect_seq = 0; 380 con->out_seq = 0; 381 if (con->out_msg) { 382 ceph_msg_put(con->out_msg); 383 con->out_msg = NULL; 384 } 385 con->in_seq = 0; 386 con->in_seq_acked = 0; 387 } 388 389 /* 390 * mark a peer down. drop any open connections. 391 */ 392 void ceph_con_close(struct ceph_connection *con) 393 { 394 dout("con_close %p peer %s\n", con, 395 ceph_pr_addr(&con->peer_addr.in_addr)); 396 set_bit(CLOSED, &con->state); /* in case there's queued work */ 397 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 398 clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ 399 clear_bit(KEEPALIVE_PENDING, &con->state); 400 clear_bit(WRITE_PENDING, &con->state); 401 mutex_lock(&con->mutex); 402 reset_connection(con); 403 con->peer_global_seq = 0; 404 cancel_delayed_work(&con->work); 405 mutex_unlock(&con->mutex); 406 queue_con(con); 407 } 408 EXPORT_SYMBOL(ceph_con_close); 409 410 /* 411 * Reopen a closed connection, with a new peer address. 412 */ 413 void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) 414 { 415 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 416 set_bit(OPENING, &con->state); 417 clear_bit(CLOSED, &con->state); 418 memcpy(&con->peer_addr, addr, sizeof(*addr)); 419 con->delay = 0; /* reset backoff memory */ 420 queue_con(con); 421 } 422 EXPORT_SYMBOL(ceph_con_open); 423 424 /* 425 * return true if this connection ever successfully opened 426 */ 427 bool ceph_con_opened(struct ceph_connection *con) 428 { 429 return con->connect_seq > 0; 430 } 431 432 /* 433 * generic get/put 434 */ 435 struct ceph_connection *ceph_con_get(struct ceph_connection *con) 436 { 437 int nref = __atomic_add_unless(&con->nref, 1, 0); 438 439 dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1); 440 441 return nref ? con : NULL; 442 } 443 444 void ceph_con_put(struct ceph_connection *con) 445 { 446 int nref = atomic_dec_return(&con->nref); 447 448 BUG_ON(nref < 0); 449 if (nref == 0) { 450 BUG_ON(con->sock); 451 kfree(con); 452 } 453 dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref); 454 } 455 456 /* 457 * initialize a new connection. 458 */ 459 void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) 460 { 461 dout("con_init %p\n", con); 462 memset(con, 0, sizeof(*con)); 463 atomic_set(&con->nref, 1); 464 con->msgr = msgr; 465 mutex_init(&con->mutex); 466 INIT_LIST_HEAD(&con->out_queue); 467 INIT_LIST_HEAD(&con->out_sent); 468 INIT_DELAYED_WORK(&con->work, con_work); 469 } 470 EXPORT_SYMBOL(ceph_con_init); 471 472 473 /* 474 * We maintain a global counter to order connection attempts. Get 475 * a unique seq greater than @gt. 476 */ 477 static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) 478 { 479 u32 ret; 480 481 spin_lock(&msgr->global_seq_lock); 482 if (msgr->global_seq < gt) 483 msgr->global_seq = gt; 484 ret = ++msgr->global_seq; 485 spin_unlock(&msgr->global_seq_lock); 486 return ret; 487 } 488 489 static void ceph_con_out_kvec_reset(struct ceph_connection *con) 490 { 491 con->out_kvec_left = 0; 492 con->out_kvec_bytes = 0; 493 con->out_kvec_cur = &con->out_kvec[0]; 494 } 495 496 static void ceph_con_out_kvec_add(struct ceph_connection *con, 497 size_t size, void *data) 498 { 499 int index; 500 501 index = con->out_kvec_left; 502 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); 503 504 con->out_kvec[index].iov_len = size; 505 con->out_kvec[index].iov_base = data; 506 con->out_kvec_left++; 507 con->out_kvec_bytes += size; 508 } 509 510 /* 511 * Prepare footer for currently outgoing message, and finish things 512 * off. Assumes out_kvec* are already valid.. we just add on to the end. 513 */ 514 static void prepare_write_message_footer(struct ceph_connection *con) 515 { 516 struct ceph_msg *m = con->out_msg; 517 int v = con->out_kvec_left; 518 519 dout("prepare_write_message_footer %p\n", con); 520 con->out_kvec_is_msg = true; 521 con->out_kvec[v].iov_base = &m->footer; 522 con->out_kvec[v].iov_len = sizeof(m->footer); 523 con->out_kvec_bytes += sizeof(m->footer); 524 con->out_kvec_left++; 525 con->out_more = m->more_to_follow; 526 con->out_msg_done = true; 527 } 528 529 /* 530 * Prepare headers for the next outgoing message. 531 */ 532 static void prepare_write_message(struct ceph_connection *con) 533 { 534 struct ceph_msg *m; 535 u32 crc; 536 537 ceph_con_out_kvec_reset(con); 538 con->out_kvec_is_msg = true; 539 con->out_msg_done = false; 540 541 /* Sneak an ack in there first? If we can get it into the same 542 * TCP packet that's a good thing. */ 543 if (con->in_seq > con->in_seq_acked) { 544 con->in_seq_acked = con->in_seq; 545 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 546 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 547 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), 548 &con->out_temp_ack); 549 } 550 551 m = list_first_entry(&con->out_queue, struct ceph_msg, list_head); 552 con->out_msg = m; 553 554 /* put message on sent list */ 555 ceph_msg_get(m); 556 list_move_tail(&m->list_head, &con->out_sent); 557 558 /* 559 * only assign outgoing seq # if we haven't sent this message 560 * yet. if it is requeued, resend with it's original seq. 561 */ 562 if (m->needs_out_seq) { 563 m->hdr.seq = cpu_to_le64(++con->out_seq); 564 m->needs_out_seq = false; 565 } 566 #ifdef CONFIG_BLOCK 567 else 568 m->bio_iter = NULL; 569 #endif 570 571 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 572 m, con->out_seq, le16_to_cpu(m->hdr.type), 573 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 574 le32_to_cpu(m->hdr.data_len), 575 m->nr_pages); 576 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 577 578 /* tag + hdr + front + middle */ 579 ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 580 ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); 581 ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 582 583 if (m->middle) 584 ceph_con_out_kvec_add(con, m->middle->vec.iov_len, 585 m->middle->vec.iov_base); 586 587 /* fill in crc (except data pages), footer */ 588 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 589 con->out_msg->hdr.crc = cpu_to_le32(crc); 590 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; 591 592 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 593 con->out_msg->footer.front_crc = cpu_to_le32(crc); 594 if (m->middle) { 595 crc = crc32c(0, m->middle->vec.iov_base, 596 m->middle->vec.iov_len); 597 con->out_msg->footer.middle_crc = cpu_to_le32(crc); 598 } else 599 con->out_msg->footer.middle_crc = 0; 600 con->out_msg->footer.data_crc = 0; 601 dout("prepare_write_message front_crc %u data_crc %u\n", 602 le32_to_cpu(con->out_msg->footer.front_crc), 603 le32_to_cpu(con->out_msg->footer.middle_crc)); 604 605 /* is there a data payload? */ 606 if (le32_to_cpu(m->hdr.data_len) > 0) { 607 /* initialize page iterator */ 608 con->out_msg_pos.page = 0; 609 if (m->pages) 610 con->out_msg_pos.page_pos = m->page_alignment; 611 else 612 con->out_msg_pos.page_pos = 0; 613 con->out_msg_pos.data_pos = 0; 614 con->out_msg_pos.did_page_crc = false; 615 con->out_more = 1; /* data + footer will follow */ 616 } else { 617 /* no, queue up footer too and be done */ 618 prepare_write_message_footer(con); 619 } 620 621 set_bit(WRITE_PENDING, &con->state); 622 } 623 624 /* 625 * Prepare an ack. 626 */ 627 static void prepare_write_ack(struct ceph_connection *con) 628 { 629 dout("prepare_write_ack %p %llu -> %llu\n", con, 630 con->in_seq_acked, con->in_seq); 631 con->in_seq_acked = con->in_seq; 632 633 ceph_con_out_kvec_reset(con); 634 635 ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 636 637 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 638 ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack), 639 &con->out_temp_ack); 640 641 con->out_more = 1; /* more will follow.. eventually.. */ 642 set_bit(WRITE_PENDING, &con->state); 643 } 644 645 /* 646 * Prepare to write keepalive byte. 647 */ 648 static void prepare_write_keepalive(struct ceph_connection *con) 649 { 650 dout("prepare_write_keepalive %p\n", con); 651 ceph_con_out_kvec_reset(con); 652 ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); 653 set_bit(WRITE_PENDING, &con->state); 654 } 655 656 /* 657 * Connection negotiation. 658 */ 659 660 static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection *con, 661 int *auth_proto) 662 { 663 struct ceph_auth_handshake *auth; 664 665 if (!con->ops->get_authorizer) { 666 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 667 con->out_connect.authorizer_len = 0; 668 669 return NULL; 670 } 671 672 /* Can't hold the mutex while getting authorizer */ 673 674 mutex_unlock(&con->mutex); 675 676 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); 677 678 mutex_lock(&con->mutex); 679 680 if (IS_ERR(auth)) 681 return auth; 682 if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) 683 return ERR_PTR(-EAGAIN); 684 685 con->auth_reply_buf = auth->authorizer_reply_buf; 686 con->auth_reply_buf_len = auth->authorizer_reply_buf_len; 687 688 689 return auth; 690 } 691 692 /* 693 * We connected to a peer and are saying hello. 694 */ 695 static void prepare_write_banner(struct ceph_connection *con) 696 { 697 ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 698 ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), 699 &con->msgr->my_enc_addr); 700 701 con->out_more = 0; 702 set_bit(WRITE_PENDING, &con->state); 703 } 704 705 static int prepare_write_connect(struct ceph_connection *con) 706 { 707 unsigned int global_seq = get_global_seq(con->msgr, 0); 708 int proto; 709 int auth_proto; 710 struct ceph_auth_handshake *auth; 711 712 switch (con->peer_name.type) { 713 case CEPH_ENTITY_TYPE_MON: 714 proto = CEPH_MONC_PROTOCOL; 715 break; 716 case CEPH_ENTITY_TYPE_OSD: 717 proto = CEPH_OSDC_PROTOCOL; 718 break; 719 case CEPH_ENTITY_TYPE_MDS: 720 proto = CEPH_MDSC_PROTOCOL; 721 break; 722 default: 723 BUG(); 724 } 725 726 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 727 con->connect_seq, global_seq, proto); 728 729 con->out_connect.features = cpu_to_le64(con->msgr->supported_features); 730 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 731 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 732 con->out_connect.global_seq = cpu_to_le32(global_seq); 733 con->out_connect.protocol_version = cpu_to_le32(proto); 734 con->out_connect.flags = 0; 735 736 auth_proto = CEPH_AUTH_UNKNOWN; 737 auth = get_connect_authorizer(con, &auth_proto); 738 if (IS_ERR(auth)) 739 return PTR_ERR(auth); 740 741 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); 742 con->out_connect.authorizer_len = auth ? 743 cpu_to_le32(auth->authorizer_buf_len) : 0; 744 745 ceph_con_out_kvec_add(con, sizeof (con->out_connect), 746 &con->out_connect); 747 if (auth && auth->authorizer_buf_len) 748 ceph_con_out_kvec_add(con, auth->authorizer_buf_len, 749 auth->authorizer_buf); 750 751 con->out_more = 0; 752 set_bit(WRITE_PENDING, &con->state); 753 754 return 0; 755 } 756 757 /* 758 * write as much of pending kvecs to the socket as we can. 759 * 1 -> done 760 * 0 -> socket full, but more to do 761 * <0 -> error 762 */ 763 static int write_partial_kvec(struct ceph_connection *con) 764 { 765 int ret; 766 767 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 768 while (con->out_kvec_bytes > 0) { 769 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 770 con->out_kvec_left, con->out_kvec_bytes, 771 con->out_more); 772 if (ret <= 0) 773 goto out; 774 con->out_kvec_bytes -= ret; 775 if (con->out_kvec_bytes == 0) 776 break; /* done */ 777 778 /* account for full iov entries consumed */ 779 while (ret >= con->out_kvec_cur->iov_len) { 780 BUG_ON(!con->out_kvec_left); 781 ret -= con->out_kvec_cur->iov_len; 782 con->out_kvec_cur++; 783 con->out_kvec_left--; 784 } 785 /* and for a partially-consumed entry */ 786 if (ret) { 787 con->out_kvec_cur->iov_len -= ret; 788 con->out_kvec_cur->iov_base += ret; 789 } 790 } 791 con->out_kvec_left = 0; 792 con->out_kvec_is_msg = false; 793 ret = 1; 794 out: 795 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 796 con->out_kvec_bytes, con->out_kvec_left, ret); 797 return ret; /* done! */ 798 } 799 800 #ifdef CONFIG_BLOCK 801 static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) 802 { 803 if (!bio) { 804 *iter = NULL; 805 *seg = 0; 806 return; 807 } 808 *iter = bio; 809 *seg = bio->bi_idx; 810 } 811 812 static void iter_bio_next(struct bio **bio_iter, int *seg) 813 { 814 if (*bio_iter == NULL) 815 return; 816 817 BUG_ON(*seg >= (*bio_iter)->bi_vcnt); 818 819 (*seg)++; 820 if (*seg == (*bio_iter)->bi_vcnt) 821 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); 822 } 823 #endif 824 825 /* 826 * Write as much message data payload as we can. If we finish, queue 827 * up the footer. 828 * 1 -> done, footer is now queued in out_kvec[]. 829 * 0 -> socket full, but more to do 830 * <0 -> error 831 */ 832 static int write_partial_msg_pages(struct ceph_connection *con) 833 { 834 struct ceph_msg *msg = con->out_msg; 835 unsigned int data_len = le32_to_cpu(msg->hdr.data_len); 836 size_t len; 837 bool do_datacrc = !con->msgr->nocrc; 838 int ret; 839 int total_max_write; 840 int in_trail = 0; 841 size_t trail_len = (msg->trail ? msg->trail->length : 0); 842 843 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 844 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 845 con->out_msg_pos.page_pos); 846 847 #ifdef CONFIG_BLOCK 848 if (msg->bio && !msg->bio_iter) 849 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); 850 #endif 851 852 while (data_len > con->out_msg_pos.data_pos) { 853 struct page *page = NULL; 854 int max_write = PAGE_SIZE; 855 int bio_offset = 0; 856 857 total_max_write = data_len - trail_len - 858 con->out_msg_pos.data_pos; 859 860 /* 861 * if we are calculating the data crc (the default), we need 862 * to map the page. if our pages[] has been revoked, use the 863 * zero page. 864 */ 865 866 /* have we reached the trail part of the data? */ 867 if (con->out_msg_pos.data_pos >= data_len - trail_len) { 868 in_trail = 1; 869 870 total_max_write = data_len - con->out_msg_pos.data_pos; 871 872 page = list_first_entry(&msg->trail->head, 873 struct page, lru); 874 max_write = PAGE_SIZE; 875 } else if (msg->pages) { 876 page = msg->pages[con->out_msg_pos.page]; 877 } else if (msg->pagelist) { 878 page = list_first_entry(&msg->pagelist->head, 879 struct page, lru); 880 #ifdef CONFIG_BLOCK 881 } else if (msg->bio) { 882 struct bio_vec *bv; 883 884 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); 885 page = bv->bv_page; 886 bio_offset = bv->bv_offset; 887 max_write = bv->bv_len; 888 #endif 889 } else { 890 page = zero_page; 891 } 892 len = min_t(int, max_write - con->out_msg_pos.page_pos, 893 total_max_write); 894 895 if (do_datacrc && !con->out_msg_pos.did_page_crc) { 896 void *base; 897 u32 crc; 898 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 899 char *kaddr; 900 901 kaddr = kmap(page); 902 BUG_ON(kaddr == NULL); 903 base = kaddr + con->out_msg_pos.page_pos + bio_offset; 904 crc = crc32c(tmpcrc, base, len); 905 con->out_msg->footer.data_crc = cpu_to_le32(crc); 906 con->out_msg_pos.did_page_crc = true; 907 } 908 ret = ceph_tcp_sendpage(con->sock, page, 909 con->out_msg_pos.page_pos + bio_offset, 910 len, 1); 911 912 if (do_datacrc) 913 kunmap(page); 914 915 if (ret <= 0) 916 goto out; 917 918 con->out_msg_pos.data_pos += ret; 919 con->out_msg_pos.page_pos += ret; 920 if (ret == len) { 921 con->out_msg_pos.page_pos = 0; 922 con->out_msg_pos.page++; 923 con->out_msg_pos.did_page_crc = false; 924 if (in_trail) 925 list_move_tail(&page->lru, 926 &msg->trail->head); 927 else if (msg->pagelist) 928 list_move_tail(&page->lru, 929 &msg->pagelist->head); 930 #ifdef CONFIG_BLOCK 931 else if (msg->bio) 932 iter_bio_next(&msg->bio_iter, &msg->bio_seg); 933 #endif 934 } 935 } 936 937 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 938 939 /* prepare and queue up footer, too */ 940 if (!do_datacrc) 941 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 942 ceph_con_out_kvec_reset(con); 943 prepare_write_message_footer(con); 944 ret = 1; 945 out: 946 return ret; 947 } 948 949 /* 950 * write some zeros 951 */ 952 static int write_partial_skip(struct ceph_connection *con) 953 { 954 int ret; 955 956 while (con->out_skip > 0) { 957 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); 958 959 ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, 1); 960 if (ret <= 0) 961 goto out; 962 con->out_skip -= ret; 963 } 964 ret = 1; 965 out: 966 return ret; 967 } 968 969 /* 970 * Prepare to read connection handshake, or an ack. 971 */ 972 static void prepare_read_banner(struct ceph_connection *con) 973 { 974 dout("prepare_read_banner %p\n", con); 975 con->in_base_pos = 0; 976 } 977 978 static void prepare_read_connect(struct ceph_connection *con) 979 { 980 dout("prepare_read_connect %p\n", con); 981 con->in_base_pos = 0; 982 } 983 984 static void prepare_read_ack(struct ceph_connection *con) 985 { 986 dout("prepare_read_ack %p\n", con); 987 con->in_base_pos = 0; 988 } 989 990 static void prepare_read_tag(struct ceph_connection *con) 991 { 992 dout("prepare_read_tag %p\n", con); 993 con->in_base_pos = 0; 994 con->in_tag = CEPH_MSGR_TAG_READY; 995 } 996 997 /* 998 * Prepare to read a message. 999 */ 1000 static int prepare_read_message(struct ceph_connection *con) 1001 { 1002 dout("prepare_read_message %p\n", con); 1003 BUG_ON(con->in_msg != NULL); 1004 con->in_base_pos = 0; 1005 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 1006 return 0; 1007 } 1008 1009 1010 static int read_partial(struct ceph_connection *con, 1011 int end, int size, void *object) 1012 { 1013 while (con->in_base_pos < end) { 1014 int left = end - con->in_base_pos; 1015 int have = size - left; 1016 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 1017 if (ret <= 0) 1018 return ret; 1019 con->in_base_pos += ret; 1020 } 1021 return 1; 1022 } 1023 1024 1025 /* 1026 * Read all or part of the connect-side handshake on a new connection 1027 */ 1028 static int read_partial_banner(struct ceph_connection *con) 1029 { 1030 int size; 1031 int end; 1032 int ret; 1033 1034 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 1035 1036 /* peer's banner */ 1037 size = strlen(CEPH_BANNER); 1038 end = size; 1039 ret = read_partial(con, end, size, con->in_banner); 1040 if (ret <= 0) 1041 goto out; 1042 1043 size = sizeof (con->actual_peer_addr); 1044 end += size; 1045 ret = read_partial(con, end, size, &con->actual_peer_addr); 1046 if (ret <= 0) 1047 goto out; 1048 1049 size = sizeof (con->peer_addr_for_me); 1050 end += size; 1051 ret = read_partial(con, end, size, &con->peer_addr_for_me); 1052 if (ret <= 0) 1053 goto out; 1054 1055 out: 1056 return ret; 1057 } 1058 1059 static int read_partial_connect(struct ceph_connection *con) 1060 { 1061 int size; 1062 int end; 1063 int ret; 1064 1065 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1066 1067 size = sizeof (con->in_reply); 1068 end = size; 1069 ret = read_partial(con, end, size, &con->in_reply); 1070 if (ret <= 0) 1071 goto out; 1072 1073 size = le32_to_cpu(con->in_reply.authorizer_len); 1074 end += size; 1075 ret = read_partial(con, end, size, con->auth_reply_buf); 1076 if (ret <= 0) 1077 goto out; 1078 1079 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 1080 con, (int)con->in_reply.tag, 1081 le32_to_cpu(con->in_reply.connect_seq), 1082 le32_to_cpu(con->in_reply.global_seq)); 1083 out: 1084 return ret; 1085 1086 } 1087 1088 /* 1089 * Verify the hello banner looks okay. 1090 */ 1091 static int verify_hello(struct ceph_connection *con) 1092 { 1093 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 1094 pr_err("connect to %s got bad banner\n", 1095 ceph_pr_addr(&con->peer_addr.in_addr)); 1096 con->error_msg = "protocol error, bad banner"; 1097 return -1; 1098 } 1099 return 0; 1100 } 1101 1102 static bool addr_is_blank(struct sockaddr_storage *ss) 1103 { 1104 switch (ss->ss_family) { 1105 case AF_INET: 1106 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; 1107 case AF_INET6: 1108 return 1109 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && 1110 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && 1111 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && 1112 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; 1113 } 1114 return false; 1115 } 1116 1117 static int addr_port(struct sockaddr_storage *ss) 1118 { 1119 switch (ss->ss_family) { 1120 case AF_INET: 1121 return ntohs(((struct sockaddr_in *)ss)->sin_port); 1122 case AF_INET6: 1123 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port); 1124 } 1125 return 0; 1126 } 1127 1128 static void addr_set_port(struct sockaddr_storage *ss, int p) 1129 { 1130 switch (ss->ss_family) { 1131 case AF_INET: 1132 ((struct sockaddr_in *)ss)->sin_port = htons(p); 1133 break; 1134 case AF_INET6: 1135 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p); 1136 break; 1137 } 1138 } 1139 1140 /* 1141 * Unlike other *_pton function semantics, zero indicates success. 1142 */ 1143 static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss, 1144 char delim, const char **ipend) 1145 { 1146 struct sockaddr_in *in4 = (struct sockaddr_in *) ss; 1147 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) ss; 1148 1149 memset(ss, 0, sizeof(*ss)); 1150 1151 if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) { 1152 ss->ss_family = AF_INET; 1153 return 0; 1154 } 1155 1156 if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) { 1157 ss->ss_family = AF_INET6; 1158 return 0; 1159 } 1160 1161 return -EINVAL; 1162 } 1163 1164 /* 1165 * Extract hostname string and resolve using kernel DNS facility. 1166 */ 1167 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER 1168 static int ceph_dns_resolve_name(const char *name, size_t namelen, 1169 struct sockaddr_storage *ss, char delim, const char **ipend) 1170 { 1171 const char *end, *delim_p; 1172 char *colon_p, *ip_addr = NULL; 1173 int ip_len, ret; 1174 1175 /* 1176 * The end of the hostname occurs immediately preceding the delimiter or 1177 * the port marker (':') where the delimiter takes precedence. 1178 */ 1179 delim_p = memchr(name, delim, namelen); 1180 colon_p = memchr(name, ':', namelen); 1181 1182 if (delim_p && colon_p) 1183 end = delim_p < colon_p ? delim_p : colon_p; 1184 else if (!delim_p && colon_p) 1185 end = colon_p; 1186 else { 1187 end = delim_p; 1188 if (!end) /* case: hostname:/ */ 1189 end = name + namelen; 1190 } 1191 1192 if (end <= name) 1193 return -EINVAL; 1194 1195 /* do dns_resolve upcall */ 1196 ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL); 1197 if (ip_len > 0) 1198 ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL); 1199 else 1200 ret = -ESRCH; 1201 1202 kfree(ip_addr); 1203 1204 *ipend = end; 1205 1206 pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name, 1207 ret, ret ? "failed" : ceph_pr_addr(ss)); 1208 1209 return ret; 1210 } 1211 #else 1212 static inline int ceph_dns_resolve_name(const char *name, size_t namelen, 1213 struct sockaddr_storage *ss, char delim, const char **ipend) 1214 { 1215 return -EINVAL; 1216 } 1217 #endif 1218 1219 /* 1220 * Parse a server name (IP or hostname). If a valid IP address is not found 1221 * then try to extract a hostname to resolve using userspace DNS upcall. 1222 */ 1223 static int ceph_parse_server_name(const char *name, size_t namelen, 1224 struct sockaddr_storage *ss, char delim, const char **ipend) 1225 { 1226 int ret; 1227 1228 ret = ceph_pton(name, namelen, ss, delim, ipend); 1229 if (ret) 1230 ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend); 1231 1232 return ret; 1233 } 1234 1235 /* 1236 * Parse an ip[:port] list into an addr array. Use the default 1237 * monitor port if a port isn't specified. 1238 */ 1239 int ceph_parse_ips(const char *c, const char *end, 1240 struct ceph_entity_addr *addr, 1241 int max_count, int *count) 1242 { 1243 int i, ret = -EINVAL; 1244 const char *p = c; 1245 1246 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 1247 for (i = 0; i < max_count; i++) { 1248 const char *ipend; 1249 struct sockaddr_storage *ss = &addr[i].in_addr; 1250 int port; 1251 char delim = ','; 1252 1253 if (*p == '[') { 1254 delim = ']'; 1255 p++; 1256 } 1257 1258 ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend); 1259 if (ret) 1260 goto bad; 1261 ret = -EINVAL; 1262 1263 p = ipend; 1264 1265 if (delim == ']') { 1266 if (*p != ']') { 1267 dout("missing matching ']'\n"); 1268 goto bad; 1269 } 1270 p++; 1271 } 1272 1273 /* port? */ 1274 if (p < end && *p == ':') { 1275 port = 0; 1276 p++; 1277 while (p < end && *p >= '0' && *p <= '9') { 1278 port = (port * 10) + (*p - '0'); 1279 p++; 1280 } 1281 if (port > 65535 || port == 0) 1282 goto bad; 1283 } else { 1284 port = CEPH_MON_PORT; 1285 } 1286 1287 addr_set_port(ss, port); 1288 1289 dout("parse_ips got %s\n", ceph_pr_addr(ss)); 1290 1291 if (p == end) 1292 break; 1293 if (*p != ',') 1294 goto bad; 1295 p++; 1296 } 1297 1298 if (p != end) 1299 goto bad; 1300 1301 if (count) 1302 *count = i + 1; 1303 return 0; 1304 1305 bad: 1306 pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); 1307 return ret; 1308 } 1309 EXPORT_SYMBOL(ceph_parse_ips); 1310 1311 static int process_banner(struct ceph_connection *con) 1312 { 1313 dout("process_banner on %p\n", con); 1314 1315 if (verify_hello(con) < 0) 1316 return -1; 1317 1318 ceph_decode_addr(&con->actual_peer_addr); 1319 ceph_decode_addr(&con->peer_addr_for_me); 1320 1321 /* 1322 * Make sure the other end is who we wanted. note that the other 1323 * end may not yet know their ip address, so if it's 0.0.0.0, give 1324 * them the benefit of the doubt. 1325 */ 1326 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 1327 sizeof(con->peer_addr)) != 0 && 1328 !(addr_is_blank(&con->actual_peer_addr.in_addr) && 1329 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 1330 pr_warning("wrong peer, want %s/%d, got %s/%d\n", 1331 ceph_pr_addr(&con->peer_addr.in_addr), 1332 (int)le32_to_cpu(con->peer_addr.nonce), 1333 ceph_pr_addr(&con->actual_peer_addr.in_addr), 1334 (int)le32_to_cpu(con->actual_peer_addr.nonce)); 1335 con->error_msg = "wrong peer at address"; 1336 return -1; 1337 } 1338 1339 /* 1340 * did we learn our address? 1341 */ 1342 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { 1343 int port = addr_port(&con->msgr->inst.addr.in_addr); 1344 1345 memcpy(&con->msgr->inst.addr.in_addr, 1346 &con->peer_addr_for_me.in_addr, 1347 sizeof(con->peer_addr_for_me.in_addr)); 1348 addr_set_port(&con->msgr->inst.addr.in_addr, port); 1349 encode_my_addr(con->msgr); 1350 dout("process_banner learned my addr is %s\n", 1351 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); 1352 } 1353 1354 set_bit(NEGOTIATING, &con->state); 1355 prepare_read_connect(con); 1356 return 0; 1357 } 1358 1359 static void fail_protocol(struct ceph_connection *con) 1360 { 1361 reset_connection(con); 1362 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1363 1364 mutex_unlock(&con->mutex); 1365 if (con->ops->bad_proto) 1366 con->ops->bad_proto(con); 1367 mutex_lock(&con->mutex); 1368 } 1369 1370 static int process_connect(struct ceph_connection *con) 1371 { 1372 u64 sup_feat = con->msgr->supported_features; 1373 u64 req_feat = con->msgr->required_features; 1374 u64 server_feat = le64_to_cpu(con->in_reply.features); 1375 int ret; 1376 1377 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1378 1379 switch (con->in_reply.tag) { 1380 case CEPH_MSGR_TAG_FEATURES: 1381 pr_err("%s%lld %s feature set mismatch," 1382 " my %llx < server's %llx, missing %llx\n", 1383 ENTITY_NAME(con->peer_name), 1384 ceph_pr_addr(&con->peer_addr.in_addr), 1385 sup_feat, server_feat, server_feat & ~sup_feat); 1386 con->error_msg = "missing required protocol features"; 1387 fail_protocol(con); 1388 return -1; 1389 1390 case CEPH_MSGR_TAG_BADPROTOVER: 1391 pr_err("%s%lld %s protocol version mismatch," 1392 " my %d != server's %d\n", 1393 ENTITY_NAME(con->peer_name), 1394 ceph_pr_addr(&con->peer_addr.in_addr), 1395 le32_to_cpu(con->out_connect.protocol_version), 1396 le32_to_cpu(con->in_reply.protocol_version)); 1397 con->error_msg = "protocol version mismatch"; 1398 fail_protocol(con); 1399 return -1; 1400 1401 case CEPH_MSGR_TAG_BADAUTHORIZER: 1402 con->auth_retry++; 1403 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 1404 con->auth_retry); 1405 if (con->auth_retry == 2) { 1406 con->error_msg = "connect authorization failure"; 1407 return -1; 1408 } 1409 con->auth_retry = 1; 1410 ceph_con_out_kvec_reset(con); 1411 ret = prepare_write_connect(con); 1412 if (ret < 0) 1413 return ret; 1414 prepare_read_connect(con); 1415 break; 1416 1417 case CEPH_MSGR_TAG_RESETSESSION: 1418 /* 1419 * If we connected with a large connect_seq but the peer 1420 * has no record of a session with us (no connection, or 1421 * connect_seq == 0), they will send RESETSESION to indicate 1422 * that they must have reset their session, and may have 1423 * dropped messages. 1424 */ 1425 dout("process_connect got RESET peer seq %u\n", 1426 le32_to_cpu(con->in_connect.connect_seq)); 1427 pr_err("%s%lld %s connection reset\n", 1428 ENTITY_NAME(con->peer_name), 1429 ceph_pr_addr(&con->peer_addr.in_addr)); 1430 reset_connection(con); 1431 ceph_con_out_kvec_reset(con); 1432 ret = prepare_write_connect(con); 1433 if (ret < 0) 1434 return ret; 1435 prepare_read_connect(con); 1436 1437 /* Tell ceph about it. */ 1438 mutex_unlock(&con->mutex); 1439 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); 1440 if (con->ops->peer_reset) 1441 con->ops->peer_reset(con); 1442 mutex_lock(&con->mutex); 1443 if (test_bit(CLOSED, &con->state) || 1444 test_bit(OPENING, &con->state)) 1445 return -EAGAIN; 1446 break; 1447 1448 case CEPH_MSGR_TAG_RETRY_SESSION: 1449 /* 1450 * If we sent a smaller connect_seq than the peer has, try 1451 * again with a larger value. 1452 */ 1453 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n", 1454 le32_to_cpu(con->out_connect.connect_seq), 1455 le32_to_cpu(con->in_connect.connect_seq)); 1456 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1457 ceph_con_out_kvec_reset(con); 1458 ret = prepare_write_connect(con); 1459 if (ret < 0) 1460 return ret; 1461 prepare_read_connect(con); 1462 break; 1463 1464 case CEPH_MSGR_TAG_RETRY_GLOBAL: 1465 /* 1466 * If we sent a smaller global_seq than the peer has, try 1467 * again with a larger value. 1468 */ 1469 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 1470 con->peer_global_seq, 1471 le32_to_cpu(con->in_connect.global_seq)); 1472 get_global_seq(con->msgr, 1473 le32_to_cpu(con->in_connect.global_seq)); 1474 ceph_con_out_kvec_reset(con); 1475 ret = prepare_write_connect(con); 1476 if (ret < 0) 1477 return ret; 1478 prepare_read_connect(con); 1479 break; 1480 1481 case CEPH_MSGR_TAG_READY: 1482 if (req_feat & ~server_feat) { 1483 pr_err("%s%lld %s protocol feature mismatch," 1484 " my required %llx > server's %llx, need %llx\n", 1485 ENTITY_NAME(con->peer_name), 1486 ceph_pr_addr(&con->peer_addr.in_addr), 1487 req_feat, server_feat, req_feat & ~server_feat); 1488 con->error_msg = "missing required protocol features"; 1489 fail_protocol(con); 1490 return -1; 1491 } 1492 clear_bit(CONNECTING, &con->state); 1493 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1494 con->connect_seq++; 1495 con->peer_features = server_feat; 1496 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1497 con->peer_global_seq, 1498 le32_to_cpu(con->in_reply.connect_seq), 1499 con->connect_seq); 1500 WARN_ON(con->connect_seq != 1501 le32_to_cpu(con->in_reply.connect_seq)); 1502 1503 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1504 set_bit(LOSSYTX, &con->state); 1505 1506 prepare_read_tag(con); 1507 break; 1508 1509 case CEPH_MSGR_TAG_WAIT: 1510 /* 1511 * If there is a connection race (we are opening 1512 * connections to each other), one of us may just have 1513 * to WAIT. This shouldn't happen if we are the 1514 * client. 1515 */ 1516 pr_err("process_connect got WAIT as client\n"); 1517 con->error_msg = "protocol error, got WAIT as client"; 1518 return -1; 1519 1520 default: 1521 pr_err("connect protocol error, will retry\n"); 1522 con->error_msg = "protocol error, garbage tag during connect"; 1523 return -1; 1524 } 1525 return 0; 1526 } 1527 1528 1529 /* 1530 * read (part of) an ack 1531 */ 1532 static int read_partial_ack(struct ceph_connection *con) 1533 { 1534 int size = sizeof (con->in_temp_ack); 1535 int end = size; 1536 1537 return read_partial(con, end, size, &con->in_temp_ack); 1538 } 1539 1540 1541 /* 1542 * We can finally discard anything that's been acked. 1543 */ 1544 static void process_ack(struct ceph_connection *con) 1545 { 1546 struct ceph_msg *m; 1547 u64 ack = le64_to_cpu(con->in_temp_ack); 1548 u64 seq; 1549 1550 while (!list_empty(&con->out_sent)) { 1551 m = list_first_entry(&con->out_sent, struct ceph_msg, 1552 list_head); 1553 seq = le64_to_cpu(m->hdr.seq); 1554 if (seq > ack) 1555 break; 1556 dout("got ack for seq %llu type %d at %p\n", seq, 1557 le16_to_cpu(m->hdr.type), m); 1558 m->ack_stamp = jiffies; 1559 ceph_msg_remove(m); 1560 } 1561 prepare_read_tag(con); 1562 } 1563 1564 1565 1566 1567 static int read_partial_message_section(struct ceph_connection *con, 1568 struct kvec *section, 1569 unsigned int sec_len, u32 *crc) 1570 { 1571 int ret, left; 1572 1573 BUG_ON(!section); 1574 1575 while (section->iov_len < sec_len) { 1576 BUG_ON(section->iov_base == NULL); 1577 left = sec_len - section->iov_len; 1578 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 1579 section->iov_len, left); 1580 if (ret <= 0) 1581 return ret; 1582 section->iov_len += ret; 1583 } 1584 if (section->iov_len == sec_len) 1585 *crc = crc32c(0, section->iov_base, section->iov_len); 1586 1587 return 1; 1588 } 1589 1590 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1591 struct ceph_msg_header *hdr, 1592 int *skip); 1593 1594 1595 static int read_partial_message_pages(struct ceph_connection *con, 1596 struct page **pages, 1597 unsigned int data_len, bool do_datacrc) 1598 { 1599 void *p; 1600 int ret; 1601 int left; 1602 1603 left = min((int)(data_len - con->in_msg_pos.data_pos), 1604 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1605 /* (page) data */ 1606 BUG_ON(pages == NULL); 1607 p = kmap(pages[con->in_msg_pos.page]); 1608 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1609 left); 1610 if (ret > 0 && do_datacrc) 1611 con->in_data_crc = 1612 crc32c(con->in_data_crc, 1613 p + con->in_msg_pos.page_pos, ret); 1614 kunmap(pages[con->in_msg_pos.page]); 1615 if (ret <= 0) 1616 return ret; 1617 con->in_msg_pos.data_pos += ret; 1618 con->in_msg_pos.page_pos += ret; 1619 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1620 con->in_msg_pos.page_pos = 0; 1621 con->in_msg_pos.page++; 1622 } 1623 1624 return ret; 1625 } 1626 1627 #ifdef CONFIG_BLOCK 1628 static int read_partial_message_bio(struct ceph_connection *con, 1629 struct bio **bio_iter, int *bio_seg, 1630 unsigned int data_len, bool do_datacrc) 1631 { 1632 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg); 1633 void *p; 1634 int ret, left; 1635 1636 if (IS_ERR(bv)) 1637 return PTR_ERR(bv); 1638 1639 left = min((int)(data_len - con->in_msg_pos.data_pos), 1640 (int)(bv->bv_len - con->in_msg_pos.page_pos)); 1641 1642 p = kmap(bv->bv_page) + bv->bv_offset; 1643 1644 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1645 left); 1646 if (ret > 0 && do_datacrc) 1647 con->in_data_crc = 1648 crc32c(con->in_data_crc, 1649 p + con->in_msg_pos.page_pos, ret); 1650 kunmap(bv->bv_page); 1651 if (ret <= 0) 1652 return ret; 1653 con->in_msg_pos.data_pos += ret; 1654 con->in_msg_pos.page_pos += ret; 1655 if (con->in_msg_pos.page_pos == bv->bv_len) { 1656 con->in_msg_pos.page_pos = 0; 1657 iter_bio_next(bio_iter, bio_seg); 1658 } 1659 1660 return ret; 1661 } 1662 #endif 1663 1664 /* 1665 * read (part of) a message. 1666 */ 1667 static int read_partial_message(struct ceph_connection *con) 1668 { 1669 struct ceph_msg *m = con->in_msg; 1670 int size; 1671 int end; 1672 int ret; 1673 unsigned int front_len, middle_len, data_len; 1674 bool do_datacrc = !con->msgr->nocrc; 1675 int skip; 1676 u64 seq; 1677 u32 crc; 1678 1679 dout("read_partial_message con %p msg %p\n", con, m); 1680 1681 /* header */ 1682 size = sizeof (con->in_hdr); 1683 end = size; 1684 ret = read_partial(con, end, size, &con->in_hdr); 1685 if (ret <= 0) 1686 return ret; 1687 1688 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); 1689 if (cpu_to_le32(crc) != con->in_hdr.crc) { 1690 pr_err("read_partial_message bad hdr " 1691 " crc %u != expected %u\n", 1692 crc, con->in_hdr.crc); 1693 return -EBADMSG; 1694 } 1695 1696 front_len = le32_to_cpu(con->in_hdr.front_len); 1697 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1698 return -EIO; 1699 middle_len = le32_to_cpu(con->in_hdr.middle_len); 1700 if (middle_len > CEPH_MSG_MAX_DATA_LEN) 1701 return -EIO; 1702 data_len = le32_to_cpu(con->in_hdr.data_len); 1703 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1704 return -EIO; 1705 1706 /* verify seq# */ 1707 seq = le64_to_cpu(con->in_hdr.seq); 1708 if ((s64)seq - (s64)con->in_seq < 1) { 1709 pr_info("skipping %s%lld %s seq %lld expected %lld\n", 1710 ENTITY_NAME(con->peer_name), 1711 ceph_pr_addr(&con->peer_addr.in_addr), 1712 seq, con->in_seq + 1); 1713 con->in_base_pos = -front_len - middle_len - data_len - 1714 sizeof(m->footer); 1715 con->in_tag = CEPH_MSGR_TAG_READY; 1716 return 0; 1717 } else if ((s64)seq - (s64)con->in_seq > 1) { 1718 pr_err("read_partial_message bad seq %lld expected %lld\n", 1719 seq, con->in_seq + 1); 1720 con->error_msg = "bad message sequence # for incoming message"; 1721 return -EBADMSG; 1722 } 1723 1724 /* allocate message? */ 1725 if (!con->in_msg) { 1726 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1727 con->in_hdr.front_len, con->in_hdr.data_len); 1728 skip = 0; 1729 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1730 if (skip) { 1731 /* skip this message */ 1732 dout("alloc_msg said skip message\n"); 1733 BUG_ON(con->in_msg); 1734 con->in_base_pos = -front_len - middle_len - data_len - 1735 sizeof(m->footer); 1736 con->in_tag = CEPH_MSGR_TAG_READY; 1737 con->in_seq++; 1738 return 0; 1739 } 1740 if (!con->in_msg) { 1741 con->error_msg = 1742 "error allocating memory for incoming message"; 1743 return -ENOMEM; 1744 } 1745 m = con->in_msg; 1746 m->front.iov_len = 0; /* haven't read it yet */ 1747 if (m->middle) 1748 m->middle->vec.iov_len = 0; 1749 1750 con->in_msg_pos.page = 0; 1751 if (m->pages) 1752 con->in_msg_pos.page_pos = m->page_alignment; 1753 else 1754 con->in_msg_pos.page_pos = 0; 1755 con->in_msg_pos.data_pos = 0; 1756 } 1757 1758 /* front */ 1759 ret = read_partial_message_section(con, &m->front, front_len, 1760 &con->in_front_crc); 1761 if (ret <= 0) 1762 return ret; 1763 1764 /* middle */ 1765 if (m->middle) { 1766 ret = read_partial_message_section(con, &m->middle->vec, 1767 middle_len, 1768 &con->in_middle_crc); 1769 if (ret <= 0) 1770 return ret; 1771 } 1772 #ifdef CONFIG_BLOCK 1773 if (m->bio && !m->bio_iter) 1774 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); 1775 #endif 1776 1777 /* (page) data */ 1778 while (con->in_msg_pos.data_pos < data_len) { 1779 if (m->pages) { 1780 ret = read_partial_message_pages(con, m->pages, 1781 data_len, do_datacrc); 1782 if (ret <= 0) 1783 return ret; 1784 #ifdef CONFIG_BLOCK 1785 } else if (m->bio) { 1786 1787 ret = read_partial_message_bio(con, 1788 &m->bio_iter, &m->bio_seg, 1789 data_len, do_datacrc); 1790 if (ret <= 0) 1791 return ret; 1792 #endif 1793 } else { 1794 BUG_ON(1); 1795 } 1796 } 1797 1798 /* footer */ 1799 size = sizeof (m->footer); 1800 end += size; 1801 ret = read_partial(con, end, size, &m->footer); 1802 if (ret <= 0) 1803 return ret; 1804 1805 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1806 m, front_len, m->footer.front_crc, middle_len, 1807 m->footer.middle_crc, data_len, m->footer.data_crc); 1808 1809 /* crc ok? */ 1810 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 1811 pr_err("read_partial_message %p front crc %u != exp. %u\n", 1812 m, con->in_front_crc, m->footer.front_crc); 1813 return -EBADMSG; 1814 } 1815 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 1816 pr_err("read_partial_message %p middle crc %u != exp %u\n", 1817 m, con->in_middle_crc, m->footer.middle_crc); 1818 return -EBADMSG; 1819 } 1820 if (do_datacrc && 1821 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 1822 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 1823 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 1824 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 1825 return -EBADMSG; 1826 } 1827 1828 return 1; /* done! */ 1829 } 1830 1831 /* 1832 * Process message. This happens in the worker thread. The callback should 1833 * be careful not to do anything that waits on other incoming messages or it 1834 * may deadlock. 1835 */ 1836 static void process_message(struct ceph_connection *con) 1837 { 1838 struct ceph_msg *msg; 1839 1840 msg = con->in_msg; 1841 con->in_msg = NULL; 1842 1843 /* if first message, set peer_name */ 1844 if (con->peer_name.type == 0) 1845 con->peer_name = msg->hdr.src; 1846 1847 con->in_seq++; 1848 mutex_unlock(&con->mutex); 1849 1850 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1851 msg, le64_to_cpu(msg->hdr.seq), 1852 ENTITY_NAME(msg->hdr.src), 1853 le16_to_cpu(msg->hdr.type), 1854 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1855 le32_to_cpu(msg->hdr.front_len), 1856 le32_to_cpu(msg->hdr.data_len), 1857 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 1858 con->ops->dispatch(con, msg); 1859 1860 mutex_lock(&con->mutex); 1861 prepare_read_tag(con); 1862 } 1863 1864 1865 /* 1866 * Write something to the socket. Called in a worker thread when the 1867 * socket appears to be writeable and we have something ready to send. 1868 */ 1869 static int try_write(struct ceph_connection *con) 1870 { 1871 int ret = 1; 1872 1873 dout("try_write start %p state %lu nref %d\n", con, con->state, 1874 atomic_read(&con->nref)); 1875 1876 more: 1877 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1878 1879 /* open the socket first? */ 1880 if (con->sock == NULL) { 1881 ceph_con_out_kvec_reset(con); 1882 prepare_write_banner(con); 1883 ret = prepare_write_connect(con); 1884 if (ret < 0) 1885 goto out; 1886 prepare_read_banner(con); 1887 set_bit(CONNECTING, &con->state); 1888 clear_bit(NEGOTIATING, &con->state); 1889 1890 BUG_ON(con->in_msg); 1891 con->in_tag = CEPH_MSGR_TAG_READY; 1892 dout("try_write initiating connect on %p new state %lu\n", 1893 con, con->state); 1894 ret = ceph_tcp_connect(con); 1895 if (ret < 0) { 1896 con->error_msg = "connect error"; 1897 goto out; 1898 } 1899 } 1900 1901 more_kvec: 1902 /* kvec data queued? */ 1903 if (con->out_skip) { 1904 ret = write_partial_skip(con); 1905 if (ret <= 0) 1906 goto out; 1907 } 1908 if (con->out_kvec_left) { 1909 ret = write_partial_kvec(con); 1910 if (ret <= 0) 1911 goto out; 1912 } 1913 1914 /* msg pages? */ 1915 if (con->out_msg) { 1916 if (con->out_msg_done) { 1917 ceph_msg_put(con->out_msg); 1918 con->out_msg = NULL; /* we're done with this one */ 1919 goto do_next; 1920 } 1921 1922 ret = write_partial_msg_pages(con); 1923 if (ret == 1) 1924 goto more_kvec; /* we need to send the footer, too! */ 1925 if (ret == 0) 1926 goto out; 1927 if (ret < 0) { 1928 dout("try_write write_partial_msg_pages err %d\n", 1929 ret); 1930 goto out; 1931 } 1932 } 1933 1934 do_next: 1935 if (!test_bit(CONNECTING, &con->state)) { 1936 /* is anything else pending? */ 1937 if (!list_empty(&con->out_queue)) { 1938 prepare_write_message(con); 1939 goto more; 1940 } 1941 if (con->in_seq > con->in_seq_acked) { 1942 prepare_write_ack(con); 1943 goto more; 1944 } 1945 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { 1946 prepare_write_keepalive(con); 1947 goto more; 1948 } 1949 } 1950 1951 /* Nothing to do! */ 1952 clear_bit(WRITE_PENDING, &con->state); 1953 dout("try_write nothing else to write.\n"); 1954 ret = 0; 1955 out: 1956 dout("try_write done on %p ret %d\n", con, ret); 1957 return ret; 1958 } 1959 1960 1961 1962 /* 1963 * Read what we can from the socket. 1964 */ 1965 static int try_read(struct ceph_connection *con) 1966 { 1967 int ret = -1; 1968 1969 if (!con->sock) 1970 return 0; 1971 1972 if (test_bit(STANDBY, &con->state)) 1973 return 0; 1974 1975 dout("try_read start on %p\n", con); 1976 1977 more: 1978 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1979 con->in_base_pos); 1980 1981 /* 1982 * process_connect and process_message drop and re-take 1983 * con->mutex. make sure we handle a racing close or reopen. 1984 */ 1985 if (test_bit(CLOSED, &con->state) || 1986 test_bit(OPENING, &con->state)) { 1987 ret = -EAGAIN; 1988 goto out; 1989 } 1990 1991 if (test_bit(CONNECTING, &con->state)) { 1992 if (!test_bit(NEGOTIATING, &con->state)) { 1993 dout("try_read connecting\n"); 1994 ret = read_partial_banner(con); 1995 if (ret <= 0) 1996 goto out; 1997 ret = process_banner(con); 1998 if (ret < 0) 1999 goto out; 2000 } 2001 ret = read_partial_connect(con); 2002 if (ret <= 0) 2003 goto out; 2004 ret = process_connect(con); 2005 if (ret < 0) 2006 goto out; 2007 goto more; 2008 } 2009 2010 if (con->in_base_pos < 0) { 2011 /* 2012 * skipping + discarding content. 2013 * 2014 * FIXME: there must be a better way to do this! 2015 */ 2016 static char buf[SKIP_BUF_SIZE]; 2017 int skip = min((int) sizeof (buf), -con->in_base_pos); 2018 2019 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); 2020 ret = ceph_tcp_recvmsg(con->sock, buf, skip); 2021 if (ret <= 0) 2022 goto out; 2023 con->in_base_pos += ret; 2024 if (con->in_base_pos) 2025 goto more; 2026 } 2027 if (con->in_tag == CEPH_MSGR_TAG_READY) { 2028 /* 2029 * what's next? 2030 */ 2031 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 2032 if (ret <= 0) 2033 goto out; 2034 dout("try_read got tag %d\n", (int)con->in_tag); 2035 switch (con->in_tag) { 2036 case CEPH_MSGR_TAG_MSG: 2037 prepare_read_message(con); 2038 break; 2039 case CEPH_MSGR_TAG_ACK: 2040 prepare_read_ack(con); 2041 break; 2042 case CEPH_MSGR_TAG_CLOSE: 2043 set_bit(CLOSED, &con->state); /* fixme */ 2044 goto out; 2045 default: 2046 goto bad_tag; 2047 } 2048 } 2049 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 2050 ret = read_partial_message(con); 2051 if (ret <= 0) { 2052 switch (ret) { 2053 case -EBADMSG: 2054 con->error_msg = "bad crc"; 2055 ret = -EIO; 2056 break; 2057 case -EIO: 2058 con->error_msg = "io error"; 2059 break; 2060 } 2061 goto out; 2062 } 2063 if (con->in_tag == CEPH_MSGR_TAG_READY) 2064 goto more; 2065 process_message(con); 2066 goto more; 2067 } 2068 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 2069 ret = read_partial_ack(con); 2070 if (ret <= 0) 2071 goto out; 2072 process_ack(con); 2073 goto more; 2074 } 2075 2076 out: 2077 dout("try_read done on %p ret %d\n", con, ret); 2078 return ret; 2079 2080 bad_tag: 2081 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 2082 con->error_msg = "protocol error, garbage tag"; 2083 ret = -1; 2084 goto out; 2085 } 2086 2087 2088 /* 2089 * Atomically queue work on a connection. Bump @con reference to 2090 * avoid races with connection teardown. 2091 */ 2092 static void queue_con(struct ceph_connection *con) 2093 { 2094 if (test_bit(DEAD, &con->state)) { 2095 dout("queue_con %p ignoring: DEAD\n", 2096 con); 2097 return; 2098 } 2099 2100 if (!con->ops->get(con)) { 2101 dout("queue_con %p ref count 0\n", con); 2102 return; 2103 } 2104 2105 if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { 2106 dout("queue_con %p - already queued\n", con); 2107 con->ops->put(con); 2108 } else { 2109 dout("queue_con %p\n", con); 2110 } 2111 } 2112 2113 /* 2114 * Do some work on a connection. Drop a connection ref when we're done. 2115 */ 2116 static void con_work(struct work_struct *work) 2117 { 2118 struct ceph_connection *con = container_of(work, struct ceph_connection, 2119 work.work); 2120 int ret; 2121 2122 mutex_lock(&con->mutex); 2123 restart: 2124 if (test_and_clear_bit(BACKOFF, &con->state)) { 2125 dout("con_work %p backing off\n", con); 2126 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2127 round_jiffies_relative(con->delay))) { 2128 dout("con_work %p backoff %lu\n", con, con->delay); 2129 mutex_unlock(&con->mutex); 2130 return; 2131 } else { 2132 con->ops->put(con); 2133 dout("con_work %p FAILED to back off %lu\n", con, 2134 con->delay); 2135 } 2136 } 2137 2138 if (test_bit(STANDBY, &con->state)) { 2139 dout("con_work %p STANDBY\n", con); 2140 goto done; 2141 } 2142 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 2143 dout("con_work CLOSED\n"); 2144 con_close_socket(con); 2145 goto done; 2146 } 2147 if (test_and_clear_bit(OPENING, &con->state)) { 2148 /* reopen w/ new peer */ 2149 dout("con_work OPENING\n"); 2150 con_close_socket(con); 2151 } 2152 2153 if (test_and_clear_bit(SOCK_CLOSED, &con->state)) 2154 goto fault; 2155 2156 ret = try_read(con); 2157 if (ret == -EAGAIN) 2158 goto restart; 2159 if (ret < 0) 2160 goto fault; 2161 2162 ret = try_write(con); 2163 if (ret == -EAGAIN) 2164 goto restart; 2165 if (ret < 0) 2166 goto fault; 2167 2168 done: 2169 mutex_unlock(&con->mutex); 2170 done_unlocked: 2171 con->ops->put(con); 2172 return; 2173 2174 fault: 2175 mutex_unlock(&con->mutex); 2176 ceph_fault(con); /* error/fault path */ 2177 goto done_unlocked; 2178 } 2179 2180 2181 /* 2182 * Generic error/fault handler. A retry mechanism is used with 2183 * exponential backoff 2184 */ 2185 static void ceph_fault(struct ceph_connection *con) 2186 { 2187 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2188 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2189 dout("fault %p state %lu to peer %s\n", 2190 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2191 2192 if (test_bit(LOSSYTX, &con->state)) { 2193 dout("fault on LOSSYTX channel\n"); 2194 goto out; 2195 } 2196 2197 mutex_lock(&con->mutex); 2198 if (test_bit(CLOSED, &con->state)) 2199 goto out_unlock; 2200 2201 con_close_socket(con); 2202 2203 if (con->in_msg) { 2204 ceph_msg_put(con->in_msg); 2205 con->in_msg = NULL; 2206 } 2207 2208 /* Requeue anything that hasn't been acked */ 2209 list_splice_init(&con->out_sent, &con->out_queue); 2210 2211 /* If there are no messages queued or keepalive pending, place 2212 * the connection in a STANDBY state */ 2213 if (list_empty(&con->out_queue) && 2214 !test_bit(KEEPALIVE_PENDING, &con->state)) { 2215 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2216 clear_bit(WRITE_PENDING, &con->state); 2217 set_bit(STANDBY, &con->state); 2218 } else { 2219 /* retry after a delay. */ 2220 if (con->delay == 0) 2221 con->delay = BASE_DELAY_INTERVAL; 2222 else if (con->delay < MAX_DELAY_INTERVAL) 2223 con->delay *= 2; 2224 con->ops->get(con); 2225 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2226 round_jiffies_relative(con->delay))) { 2227 dout("fault queued %p delay %lu\n", con, con->delay); 2228 } else { 2229 con->ops->put(con); 2230 dout("fault failed to queue %p delay %lu, backoff\n", 2231 con, con->delay); 2232 /* 2233 * In many cases we see a socket state change 2234 * while con_work is running and end up 2235 * queuing (non-delayed) work, such that we 2236 * can't backoff with a delay. Set a flag so 2237 * that when con_work restarts we schedule the 2238 * delay then. 2239 */ 2240 set_bit(BACKOFF, &con->state); 2241 } 2242 } 2243 2244 out_unlock: 2245 mutex_unlock(&con->mutex); 2246 out: 2247 /* 2248 * in case we faulted due to authentication, invalidate our 2249 * current tickets so that we can get new ones. 2250 */ 2251 if (con->auth_retry && con->ops->invalidate_authorizer) { 2252 dout("calling invalidate_authorizer()\n"); 2253 con->ops->invalidate_authorizer(con); 2254 } 2255 2256 if (con->ops->fault) 2257 con->ops->fault(con); 2258 } 2259 2260 2261 2262 /* 2263 * create a new messenger instance 2264 */ 2265 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, 2266 u32 supported_features, 2267 u32 required_features) 2268 { 2269 struct ceph_messenger *msgr; 2270 2271 msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); 2272 if (msgr == NULL) 2273 return ERR_PTR(-ENOMEM); 2274 2275 msgr->supported_features = supported_features; 2276 msgr->required_features = required_features; 2277 2278 spin_lock_init(&msgr->global_seq_lock); 2279 2280 if (myaddr) 2281 msgr->inst.addr = *myaddr; 2282 2283 /* select a random nonce */ 2284 msgr->inst.addr.type = 0; 2285 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2286 encode_my_addr(msgr); 2287 2288 dout("messenger_create %p\n", msgr); 2289 return msgr; 2290 } 2291 EXPORT_SYMBOL(ceph_messenger_create); 2292 2293 void ceph_messenger_destroy(struct ceph_messenger *msgr) 2294 { 2295 dout("destroy %p\n", msgr); 2296 kfree(msgr); 2297 dout("destroyed messenger %p\n", msgr); 2298 } 2299 EXPORT_SYMBOL(ceph_messenger_destroy); 2300 2301 static void clear_standby(struct ceph_connection *con) 2302 { 2303 /* come back from STANDBY? */ 2304 if (test_and_clear_bit(STANDBY, &con->state)) { 2305 mutex_lock(&con->mutex); 2306 dout("clear_standby %p and ++connect_seq\n", con); 2307 con->connect_seq++; 2308 WARN_ON(test_bit(WRITE_PENDING, &con->state)); 2309 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); 2310 mutex_unlock(&con->mutex); 2311 } 2312 } 2313 2314 /* 2315 * Queue up an outgoing message on the given connection. 2316 */ 2317 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 2318 { 2319 if (test_bit(CLOSED, &con->state)) { 2320 dout("con_send %p closed, dropping %p\n", con, msg); 2321 ceph_msg_put(msg); 2322 return; 2323 } 2324 2325 /* set src+dst */ 2326 msg->hdr.src = con->msgr->inst.name; 2327 2328 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 2329 2330 msg->needs_out_seq = true; 2331 2332 /* queue */ 2333 mutex_lock(&con->mutex); 2334 BUG_ON(!list_empty(&msg->list_head)); 2335 list_add_tail(&msg->list_head, &con->out_queue); 2336 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 2337 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), 2338 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 2339 le32_to_cpu(msg->hdr.front_len), 2340 le32_to_cpu(msg->hdr.middle_len), 2341 le32_to_cpu(msg->hdr.data_len)); 2342 mutex_unlock(&con->mutex); 2343 2344 /* if there wasn't anything waiting to send before, queue 2345 * new work */ 2346 clear_standby(con); 2347 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2348 queue_con(con); 2349 } 2350 EXPORT_SYMBOL(ceph_con_send); 2351 2352 /* 2353 * Revoke a message that was previously queued for send 2354 */ 2355 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) 2356 { 2357 mutex_lock(&con->mutex); 2358 if (!list_empty(&msg->list_head)) { 2359 dout("con_revoke %p msg %p - was on queue\n", con, msg); 2360 list_del_init(&msg->list_head); 2361 ceph_msg_put(msg); 2362 msg->hdr.seq = 0; 2363 } 2364 if (con->out_msg == msg) { 2365 dout("con_revoke %p msg %p - was sending\n", con, msg); 2366 con->out_msg = NULL; 2367 if (con->out_kvec_is_msg) { 2368 con->out_skip = con->out_kvec_bytes; 2369 con->out_kvec_is_msg = false; 2370 } 2371 ceph_msg_put(msg); 2372 msg->hdr.seq = 0; 2373 } 2374 mutex_unlock(&con->mutex); 2375 } 2376 2377 /* 2378 * Revoke a message that we may be reading data into 2379 */ 2380 void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) 2381 { 2382 mutex_lock(&con->mutex); 2383 if (con->in_msg && con->in_msg == msg) { 2384 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); 2385 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); 2386 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); 2387 2388 /* skip rest of message */ 2389 dout("con_revoke_pages %p msg %p revoked\n", con, msg); 2390 con->in_base_pos = con->in_base_pos - 2391 sizeof(struct ceph_msg_header) - 2392 front_len - 2393 middle_len - 2394 data_len - 2395 sizeof(struct ceph_msg_footer); 2396 ceph_msg_put(con->in_msg); 2397 con->in_msg = NULL; 2398 con->in_tag = CEPH_MSGR_TAG_READY; 2399 con->in_seq++; 2400 } else { 2401 dout("con_revoke_pages %p msg %p pages %p no-op\n", 2402 con, con->in_msg, msg); 2403 } 2404 mutex_unlock(&con->mutex); 2405 } 2406 2407 /* 2408 * Queue a keepalive byte to ensure the tcp connection is alive. 2409 */ 2410 void ceph_con_keepalive(struct ceph_connection *con) 2411 { 2412 dout("con_keepalive %p\n", con); 2413 clear_standby(con); 2414 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2415 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2416 queue_con(con); 2417 } 2418 EXPORT_SYMBOL(ceph_con_keepalive); 2419 2420 2421 /* 2422 * construct a new message with given type, size 2423 * the new msg has a ref count of 1. 2424 */ 2425 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 2426 bool can_fail) 2427 { 2428 struct ceph_msg *m; 2429 2430 m = kmalloc(sizeof(*m), flags); 2431 if (m == NULL) 2432 goto out; 2433 kref_init(&m->kref); 2434 INIT_LIST_HEAD(&m->list_head); 2435 2436 m->hdr.tid = 0; 2437 m->hdr.type = cpu_to_le16(type); 2438 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 2439 m->hdr.version = 0; 2440 m->hdr.front_len = cpu_to_le32(front_len); 2441 m->hdr.middle_len = 0; 2442 m->hdr.data_len = 0; 2443 m->hdr.data_off = 0; 2444 m->hdr.reserved = 0; 2445 m->footer.front_crc = 0; 2446 m->footer.middle_crc = 0; 2447 m->footer.data_crc = 0; 2448 m->footer.flags = 0; 2449 m->front_max = front_len; 2450 m->front_is_vmalloc = false; 2451 m->more_to_follow = false; 2452 m->ack_stamp = 0; 2453 m->pool = NULL; 2454 2455 /* middle */ 2456 m->middle = NULL; 2457 2458 /* data */ 2459 m->nr_pages = 0; 2460 m->page_alignment = 0; 2461 m->pages = NULL; 2462 m->pagelist = NULL; 2463 m->bio = NULL; 2464 m->bio_iter = NULL; 2465 m->bio_seg = 0; 2466 m->trail = NULL; 2467 2468 /* front */ 2469 if (front_len) { 2470 if (front_len > PAGE_CACHE_SIZE) { 2471 m->front.iov_base = __vmalloc(front_len, flags, 2472 PAGE_KERNEL); 2473 m->front_is_vmalloc = true; 2474 } else { 2475 m->front.iov_base = kmalloc(front_len, flags); 2476 } 2477 if (m->front.iov_base == NULL) { 2478 dout("ceph_msg_new can't allocate %d bytes\n", 2479 front_len); 2480 goto out2; 2481 } 2482 } else { 2483 m->front.iov_base = NULL; 2484 } 2485 m->front.iov_len = front_len; 2486 2487 dout("ceph_msg_new %p front %d\n", m, front_len); 2488 return m; 2489 2490 out2: 2491 ceph_msg_put(m); 2492 out: 2493 if (!can_fail) { 2494 pr_err("msg_new can't create type %d front %d\n", type, 2495 front_len); 2496 WARN_ON(1); 2497 } else { 2498 dout("msg_new can't create type %d front %d\n", type, 2499 front_len); 2500 } 2501 return NULL; 2502 } 2503 EXPORT_SYMBOL(ceph_msg_new); 2504 2505 /* 2506 * Allocate "middle" portion of a message, if it is needed and wasn't 2507 * allocated by alloc_msg. This allows us to read a small fixed-size 2508 * per-type header in the front and then gracefully fail (i.e., 2509 * propagate the error to the caller based on info in the front) when 2510 * the middle is too large. 2511 */ 2512 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 2513 { 2514 int type = le16_to_cpu(msg->hdr.type); 2515 int middle_len = le32_to_cpu(msg->hdr.middle_len); 2516 2517 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, 2518 ceph_msg_type_name(type), middle_len); 2519 BUG_ON(!middle_len); 2520 BUG_ON(msg->middle); 2521 2522 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS); 2523 if (!msg->middle) 2524 return -ENOMEM; 2525 return 0; 2526 } 2527 2528 /* 2529 * Generic message allocator, for incoming messages. 2530 */ 2531 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 2532 struct ceph_msg_header *hdr, 2533 int *skip) 2534 { 2535 int type = le16_to_cpu(hdr->type); 2536 int front_len = le32_to_cpu(hdr->front_len); 2537 int middle_len = le32_to_cpu(hdr->middle_len); 2538 struct ceph_msg *msg = NULL; 2539 int ret; 2540 2541 if (con->ops->alloc_msg) { 2542 mutex_unlock(&con->mutex); 2543 msg = con->ops->alloc_msg(con, hdr, skip); 2544 mutex_lock(&con->mutex); 2545 if (!msg || *skip) 2546 return NULL; 2547 } 2548 if (!msg) { 2549 *skip = 0; 2550 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 2551 if (!msg) { 2552 pr_err("unable to allocate msg type %d len %d\n", 2553 type, front_len); 2554 return NULL; 2555 } 2556 msg->page_alignment = le16_to_cpu(hdr->data_off); 2557 } 2558 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2559 2560 if (middle_len && !msg->middle) { 2561 ret = ceph_alloc_middle(con, msg); 2562 if (ret < 0) { 2563 ceph_msg_put(msg); 2564 return NULL; 2565 } 2566 } 2567 2568 return msg; 2569 } 2570 2571 2572 /* 2573 * Free a generically kmalloc'd message. 2574 */ 2575 void ceph_msg_kfree(struct ceph_msg *m) 2576 { 2577 dout("msg_kfree %p\n", m); 2578 if (m->front_is_vmalloc) 2579 vfree(m->front.iov_base); 2580 else 2581 kfree(m->front.iov_base); 2582 kfree(m); 2583 } 2584 2585 /* 2586 * Drop a msg ref. Destroy as needed. 2587 */ 2588 void ceph_msg_last_put(struct kref *kref) 2589 { 2590 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 2591 2592 dout("ceph_msg_put last one on %p\n", m); 2593 WARN_ON(!list_empty(&m->list_head)); 2594 2595 /* drop middle, data, if any */ 2596 if (m->middle) { 2597 ceph_buffer_put(m->middle); 2598 m->middle = NULL; 2599 } 2600 m->nr_pages = 0; 2601 m->pages = NULL; 2602 2603 if (m->pagelist) { 2604 ceph_pagelist_release(m->pagelist); 2605 kfree(m->pagelist); 2606 m->pagelist = NULL; 2607 } 2608 2609 m->trail = NULL; 2610 2611 if (m->pool) 2612 ceph_msgpool_put(m->pool, m); 2613 else 2614 ceph_msg_kfree(m); 2615 } 2616 EXPORT_SYMBOL(ceph_msg_last_put); 2617 2618 void ceph_msg_dump(struct ceph_msg *msg) 2619 { 2620 pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, 2621 msg->front_max, msg->nr_pages); 2622 print_hex_dump(KERN_DEBUG, "header: ", 2623 DUMP_PREFIX_OFFSET, 16, 1, 2624 &msg->hdr, sizeof(msg->hdr), true); 2625 print_hex_dump(KERN_DEBUG, " front: ", 2626 DUMP_PREFIX_OFFSET, 16, 1, 2627 msg->front.iov_base, msg->front.iov_len, true); 2628 if (msg->middle) 2629 print_hex_dump(KERN_DEBUG, "middle: ", 2630 DUMP_PREFIX_OFFSET, 16, 1, 2631 msg->middle->vec.iov_base, 2632 msg->middle->vec.iov_len, true); 2633 print_hex_dump(KERN_DEBUG, "footer: ", 2634 DUMP_PREFIX_OFFSET, 16, 1, 2635 &msg->footer, sizeof(msg->footer), true); 2636 } 2637 EXPORT_SYMBOL(ceph_msg_dump); 2638