1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/crc32c.h> 4 #include <linux/ctype.h> 5 #include <linux/highmem.h> 6 #include <linux/inet.h> 7 #include <linux/kthread.h> 8 #include <linux/net.h> 9 #include <linux/slab.h> 10 #include <linux/socket.h> 11 #include <linux/string.h> 12 #include <linux/bio.h> 13 #include <linux/blkdev.h> 14 #include <linux/dns_resolver.h> 15 #include <net/tcp.h> 16 17 #include <linux/ceph/libceph.h> 18 #include <linux/ceph/messenger.h> 19 #include <linux/ceph/decode.h> 20 #include <linux/ceph/pagelist.h> 21 #include <linux/export.h> 22 23 /* 24 * Ceph uses the messenger to exchange ceph_msg messages with other 25 * hosts in the system. The messenger provides ordered and reliable 26 * delivery. We tolerate TCP disconnects by reconnecting (with 27 * exponential backoff) in the case of a fault (disconnection, bad 28 * crc, protocol error). Acks allow sent messages to be discarded by 29 * the sender. 30 */ 31 32 /* static tag bytes (protocol control messages) */ 33 static char tag_msg = CEPH_MSGR_TAG_MSG; 34 static char tag_ack = CEPH_MSGR_TAG_ACK; 35 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 36 37 #ifdef CONFIG_LOCKDEP 38 static struct lock_class_key socket_class; 39 #endif 40 41 42 static void queue_con(struct ceph_connection *con); 43 static void con_work(struct work_struct *); 44 static void ceph_fault(struct ceph_connection *con); 45 46 /* 47 * nicely render a sockaddr as a string. 48 */ 49 #define MAX_ADDR_STR 20 50 #define MAX_ADDR_STR_LEN 60 51 static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN]; 52 static DEFINE_SPINLOCK(addr_str_lock); 53 static int last_addr_str; 54 55 const char *ceph_pr_addr(const struct sockaddr_storage *ss) 56 { 57 int i; 58 char *s; 59 struct sockaddr_in *in4 = (void *)ss; 60 struct sockaddr_in6 *in6 = (void *)ss; 61 62 spin_lock(&addr_str_lock); 63 i = last_addr_str++; 64 if (last_addr_str == MAX_ADDR_STR) 65 last_addr_str = 0; 66 spin_unlock(&addr_str_lock); 67 s = addr_str[i]; 68 69 switch (ss->ss_family) { 70 case AF_INET: 71 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr, 72 (unsigned int)ntohs(in4->sin_port)); 73 break; 74 75 case AF_INET6: 76 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr, 77 (unsigned int)ntohs(in6->sin6_port)); 78 break; 79 80 default: 81 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)", 82 (int)ss->ss_family); 83 } 84 85 return s; 86 } 87 EXPORT_SYMBOL(ceph_pr_addr); 88 89 static void encode_my_addr(struct ceph_messenger *msgr) 90 { 91 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); 92 ceph_encode_addr(&msgr->my_enc_addr); 93 } 94 95 /* 96 * work queue for all reading and writing to/from the socket. 97 */ 98 struct workqueue_struct *ceph_msgr_wq; 99 100 int ceph_msgr_init(void) 101 { 102 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); 103 if (!ceph_msgr_wq) { 104 pr_err("msgr_init failed to create workqueue\n"); 105 return -ENOMEM; 106 } 107 return 0; 108 } 109 EXPORT_SYMBOL(ceph_msgr_init); 110 111 void ceph_msgr_exit(void) 112 { 113 destroy_workqueue(ceph_msgr_wq); 114 } 115 EXPORT_SYMBOL(ceph_msgr_exit); 116 117 void ceph_msgr_flush(void) 118 { 119 flush_workqueue(ceph_msgr_wq); 120 } 121 EXPORT_SYMBOL(ceph_msgr_flush); 122 123 124 /* 125 * socket callback functions 126 */ 127 128 /* data available on socket, or listen socket received a connect */ 129 static void ceph_data_ready(struct sock *sk, int count_unused) 130 { 131 struct ceph_connection *con = 132 (struct ceph_connection *)sk->sk_user_data; 133 if (sk->sk_state != TCP_CLOSE_WAIT) { 134 dout("ceph_data_ready on %p state = %lu, queueing work\n", 135 con, con->state); 136 queue_con(con); 137 } 138 } 139 140 /* socket has buffer space for writing */ 141 static void ceph_write_space(struct sock *sk) 142 { 143 struct ceph_connection *con = 144 (struct ceph_connection *)sk->sk_user_data; 145 146 /* only queue to workqueue if there is data we want to write. */ 147 if (test_bit(WRITE_PENDING, &con->state)) { 148 dout("ceph_write_space %p queueing write work\n", con); 149 queue_con(con); 150 } else { 151 dout("ceph_write_space %p nothing to write\n", con); 152 } 153 154 /* since we have our own write_space, clear the SOCK_NOSPACE flag */ 155 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 156 } 157 158 /* socket's state has changed */ 159 static void ceph_state_change(struct sock *sk) 160 { 161 struct ceph_connection *con = 162 (struct ceph_connection *)sk->sk_user_data; 163 164 dout("ceph_state_change %p state = %lu sk_state = %u\n", 165 con, con->state, sk->sk_state); 166 167 if (test_bit(CLOSED, &con->state)) 168 return; 169 170 switch (sk->sk_state) { 171 case TCP_CLOSE: 172 dout("ceph_state_change TCP_CLOSE\n"); 173 case TCP_CLOSE_WAIT: 174 dout("ceph_state_change TCP_CLOSE_WAIT\n"); 175 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { 176 if (test_bit(CONNECTING, &con->state)) 177 con->error_msg = "connection failed"; 178 else 179 con->error_msg = "socket closed"; 180 queue_con(con); 181 } 182 break; 183 case TCP_ESTABLISHED: 184 dout("ceph_state_change TCP_ESTABLISHED\n"); 185 queue_con(con); 186 break; 187 } 188 } 189 190 /* 191 * set up socket callbacks 192 */ 193 static void set_sock_callbacks(struct socket *sock, 194 struct ceph_connection *con) 195 { 196 struct sock *sk = sock->sk; 197 sk->sk_user_data = (void *)con; 198 sk->sk_data_ready = ceph_data_ready; 199 sk->sk_write_space = ceph_write_space; 200 sk->sk_state_change = ceph_state_change; 201 } 202 203 204 /* 205 * socket helpers 206 */ 207 208 /* 209 * initiate connection to a remote socket. 210 */ 211 static struct socket *ceph_tcp_connect(struct ceph_connection *con) 212 { 213 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; 214 struct socket *sock; 215 int ret; 216 217 BUG_ON(con->sock); 218 ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, 219 IPPROTO_TCP, &sock); 220 if (ret) 221 return ERR_PTR(ret); 222 con->sock = sock; 223 sock->sk->sk_allocation = GFP_NOFS; 224 225 #ifdef CONFIG_LOCKDEP 226 lockdep_set_class(&sock->sk->sk_lock, &socket_class); 227 #endif 228 229 set_sock_callbacks(sock, con); 230 231 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); 232 233 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), 234 O_NONBLOCK); 235 if (ret == -EINPROGRESS) { 236 dout("connect %s EINPROGRESS sk_state = %u\n", 237 ceph_pr_addr(&con->peer_addr.in_addr), 238 sock->sk->sk_state); 239 ret = 0; 240 } 241 if (ret < 0) { 242 pr_err("connect %s error %d\n", 243 ceph_pr_addr(&con->peer_addr.in_addr), ret); 244 sock_release(sock); 245 con->sock = NULL; 246 con->error_msg = "connect error"; 247 } 248 249 if (ret < 0) 250 return ERR_PTR(ret); 251 return sock; 252 } 253 254 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 255 { 256 struct kvec iov = {buf, len}; 257 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 258 int r; 259 260 r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); 261 if (r == -EAGAIN) 262 r = 0; 263 return r; 264 } 265 266 /* 267 * write something. @more is true if caller will be sending more data 268 * shortly. 269 */ 270 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 271 size_t kvlen, size_t len, int more) 272 { 273 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 274 int r; 275 276 if (more) 277 msg.msg_flags |= MSG_MORE; 278 else 279 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 280 281 r = kernel_sendmsg(sock, &msg, iov, kvlen, len); 282 if (r == -EAGAIN) 283 r = 0; 284 return r; 285 } 286 287 288 /* 289 * Shutdown/close the socket for the given connection. 290 */ 291 static int con_close_socket(struct ceph_connection *con) 292 { 293 int rc; 294 295 dout("con_close_socket on %p sock %p\n", con, con->sock); 296 if (!con->sock) 297 return 0; 298 set_bit(SOCK_CLOSED, &con->state); 299 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 300 sock_release(con->sock); 301 con->sock = NULL; 302 clear_bit(SOCK_CLOSED, &con->state); 303 return rc; 304 } 305 306 /* 307 * Reset a connection. Discard all incoming and outgoing messages 308 * and clear *_seq state. 309 */ 310 static void ceph_msg_remove(struct ceph_msg *msg) 311 { 312 list_del_init(&msg->list_head); 313 ceph_msg_put(msg); 314 } 315 static void ceph_msg_remove_list(struct list_head *head) 316 { 317 while (!list_empty(head)) { 318 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 319 list_head); 320 ceph_msg_remove(msg); 321 } 322 } 323 324 static void reset_connection(struct ceph_connection *con) 325 { 326 /* reset connection, out_queue, msg_ and connect_seq */ 327 /* discard existing out_queue and msg_seq */ 328 ceph_msg_remove_list(&con->out_queue); 329 ceph_msg_remove_list(&con->out_sent); 330 331 if (con->in_msg) { 332 ceph_msg_put(con->in_msg); 333 con->in_msg = NULL; 334 } 335 336 con->connect_seq = 0; 337 con->out_seq = 0; 338 if (con->out_msg) { 339 ceph_msg_put(con->out_msg); 340 con->out_msg = NULL; 341 } 342 con->in_seq = 0; 343 con->in_seq_acked = 0; 344 } 345 346 /* 347 * mark a peer down. drop any open connections. 348 */ 349 void ceph_con_close(struct ceph_connection *con) 350 { 351 dout("con_close %p peer %s\n", con, 352 ceph_pr_addr(&con->peer_addr.in_addr)); 353 set_bit(CLOSED, &con->state); /* in case there's queued work */ 354 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 355 clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ 356 clear_bit(KEEPALIVE_PENDING, &con->state); 357 clear_bit(WRITE_PENDING, &con->state); 358 mutex_lock(&con->mutex); 359 reset_connection(con); 360 con->peer_global_seq = 0; 361 cancel_delayed_work(&con->work); 362 mutex_unlock(&con->mutex); 363 queue_con(con); 364 } 365 EXPORT_SYMBOL(ceph_con_close); 366 367 /* 368 * Reopen a closed connection, with a new peer address. 369 */ 370 void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) 371 { 372 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 373 set_bit(OPENING, &con->state); 374 clear_bit(CLOSED, &con->state); 375 memcpy(&con->peer_addr, addr, sizeof(*addr)); 376 con->delay = 0; /* reset backoff memory */ 377 queue_con(con); 378 } 379 EXPORT_SYMBOL(ceph_con_open); 380 381 /* 382 * return true if this connection ever successfully opened 383 */ 384 bool ceph_con_opened(struct ceph_connection *con) 385 { 386 return con->connect_seq > 0; 387 } 388 389 /* 390 * generic get/put 391 */ 392 struct ceph_connection *ceph_con_get(struct ceph_connection *con) 393 { 394 dout("con_get %p nref = %d -> %d\n", con, 395 atomic_read(&con->nref), atomic_read(&con->nref) + 1); 396 if (atomic_inc_not_zero(&con->nref)) 397 return con; 398 return NULL; 399 } 400 401 void ceph_con_put(struct ceph_connection *con) 402 { 403 dout("con_put %p nref = %d -> %d\n", con, 404 atomic_read(&con->nref), atomic_read(&con->nref) - 1); 405 BUG_ON(atomic_read(&con->nref) == 0); 406 if (atomic_dec_and_test(&con->nref)) { 407 BUG_ON(con->sock); 408 kfree(con); 409 } 410 } 411 412 /* 413 * initialize a new connection. 414 */ 415 void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) 416 { 417 dout("con_init %p\n", con); 418 memset(con, 0, sizeof(*con)); 419 atomic_set(&con->nref, 1); 420 con->msgr = msgr; 421 mutex_init(&con->mutex); 422 INIT_LIST_HEAD(&con->out_queue); 423 INIT_LIST_HEAD(&con->out_sent); 424 INIT_DELAYED_WORK(&con->work, con_work); 425 } 426 EXPORT_SYMBOL(ceph_con_init); 427 428 429 /* 430 * We maintain a global counter to order connection attempts. Get 431 * a unique seq greater than @gt. 432 */ 433 static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) 434 { 435 u32 ret; 436 437 spin_lock(&msgr->global_seq_lock); 438 if (msgr->global_seq < gt) 439 msgr->global_seq = gt; 440 ret = ++msgr->global_seq; 441 spin_unlock(&msgr->global_seq_lock); 442 return ret; 443 } 444 445 446 /* 447 * Prepare footer for currently outgoing message, and finish things 448 * off. Assumes out_kvec* are already valid.. we just add on to the end. 449 */ 450 static void prepare_write_message_footer(struct ceph_connection *con, int v) 451 { 452 struct ceph_msg *m = con->out_msg; 453 454 dout("prepare_write_message_footer %p\n", con); 455 con->out_kvec_is_msg = true; 456 con->out_kvec[v].iov_base = &m->footer; 457 con->out_kvec[v].iov_len = sizeof(m->footer); 458 con->out_kvec_bytes += sizeof(m->footer); 459 con->out_kvec_left++; 460 con->out_more = m->more_to_follow; 461 con->out_msg_done = true; 462 } 463 464 /* 465 * Prepare headers for the next outgoing message. 466 */ 467 static void prepare_write_message(struct ceph_connection *con) 468 { 469 struct ceph_msg *m; 470 int v = 0; 471 472 con->out_kvec_bytes = 0; 473 con->out_kvec_is_msg = true; 474 con->out_msg_done = false; 475 476 /* Sneak an ack in there first? If we can get it into the same 477 * TCP packet that's a good thing. */ 478 if (con->in_seq > con->in_seq_acked) { 479 con->in_seq_acked = con->in_seq; 480 con->out_kvec[v].iov_base = &tag_ack; 481 con->out_kvec[v++].iov_len = 1; 482 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 483 con->out_kvec[v].iov_base = &con->out_temp_ack; 484 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack); 485 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); 486 } 487 488 m = list_first_entry(&con->out_queue, 489 struct ceph_msg, list_head); 490 con->out_msg = m; 491 492 /* put message on sent list */ 493 ceph_msg_get(m); 494 list_move_tail(&m->list_head, &con->out_sent); 495 496 /* 497 * only assign outgoing seq # if we haven't sent this message 498 * yet. if it is requeued, resend with it's original seq. 499 */ 500 if (m->needs_out_seq) { 501 m->hdr.seq = cpu_to_le64(++con->out_seq); 502 m->needs_out_seq = false; 503 } 504 505 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 506 m, con->out_seq, le16_to_cpu(m->hdr.type), 507 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 508 le32_to_cpu(m->hdr.data_len), 509 m->nr_pages); 510 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 511 512 /* tag + hdr + front + middle */ 513 con->out_kvec[v].iov_base = &tag_msg; 514 con->out_kvec[v++].iov_len = 1; 515 con->out_kvec[v].iov_base = &m->hdr; 516 con->out_kvec[v++].iov_len = sizeof(m->hdr); 517 con->out_kvec[v++] = m->front; 518 if (m->middle) 519 con->out_kvec[v++] = m->middle->vec; 520 con->out_kvec_left = v; 521 con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len + 522 (m->middle ? m->middle->vec.iov_len : 0); 523 con->out_kvec_cur = con->out_kvec; 524 525 /* fill in crc (except data pages), footer */ 526 con->out_msg->hdr.crc = 527 cpu_to_le32(crc32c(0, (void *)&m->hdr, 528 sizeof(m->hdr) - sizeof(m->hdr.crc))); 529 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; 530 con->out_msg->footer.front_crc = 531 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len)); 532 if (m->middle) 533 con->out_msg->footer.middle_crc = 534 cpu_to_le32(crc32c(0, m->middle->vec.iov_base, 535 m->middle->vec.iov_len)); 536 else 537 con->out_msg->footer.middle_crc = 0; 538 con->out_msg->footer.data_crc = 0; 539 dout("prepare_write_message front_crc %u data_crc %u\n", 540 le32_to_cpu(con->out_msg->footer.front_crc), 541 le32_to_cpu(con->out_msg->footer.middle_crc)); 542 543 /* is there a data payload? */ 544 if (le32_to_cpu(m->hdr.data_len) > 0) { 545 /* initialize page iterator */ 546 con->out_msg_pos.page = 0; 547 if (m->pages) 548 con->out_msg_pos.page_pos = m->page_alignment; 549 else 550 con->out_msg_pos.page_pos = 0; 551 con->out_msg_pos.data_pos = 0; 552 con->out_msg_pos.did_page_crc = 0; 553 con->out_more = 1; /* data + footer will follow */ 554 } else { 555 /* no, queue up footer too and be done */ 556 prepare_write_message_footer(con, v); 557 } 558 559 set_bit(WRITE_PENDING, &con->state); 560 } 561 562 /* 563 * Prepare an ack. 564 */ 565 static void prepare_write_ack(struct ceph_connection *con) 566 { 567 dout("prepare_write_ack %p %llu -> %llu\n", con, 568 con->in_seq_acked, con->in_seq); 569 con->in_seq_acked = con->in_seq; 570 571 con->out_kvec[0].iov_base = &tag_ack; 572 con->out_kvec[0].iov_len = 1; 573 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 574 con->out_kvec[1].iov_base = &con->out_temp_ack; 575 con->out_kvec[1].iov_len = sizeof(con->out_temp_ack); 576 con->out_kvec_left = 2; 577 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); 578 con->out_kvec_cur = con->out_kvec; 579 con->out_more = 1; /* more will follow.. eventually.. */ 580 set_bit(WRITE_PENDING, &con->state); 581 } 582 583 /* 584 * Prepare to write keepalive byte. 585 */ 586 static void prepare_write_keepalive(struct ceph_connection *con) 587 { 588 dout("prepare_write_keepalive %p\n", con); 589 con->out_kvec[0].iov_base = &tag_keepalive; 590 con->out_kvec[0].iov_len = 1; 591 con->out_kvec_left = 1; 592 con->out_kvec_bytes = 1; 593 con->out_kvec_cur = con->out_kvec; 594 set_bit(WRITE_PENDING, &con->state); 595 } 596 597 /* 598 * Connection negotiation. 599 */ 600 601 static int prepare_connect_authorizer(struct ceph_connection *con) 602 { 603 void *auth_buf; 604 int auth_len = 0; 605 int auth_protocol = 0; 606 607 mutex_unlock(&con->mutex); 608 if (con->ops->get_authorizer) 609 con->ops->get_authorizer(con, &auth_buf, &auth_len, 610 &auth_protocol, &con->auth_reply_buf, 611 &con->auth_reply_buf_len, 612 con->auth_retry); 613 mutex_lock(&con->mutex); 614 615 if (test_bit(CLOSED, &con->state) || 616 test_bit(OPENING, &con->state)) 617 return -EAGAIN; 618 619 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 620 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 621 622 if (auth_len) { 623 con->out_kvec[con->out_kvec_left].iov_base = auth_buf; 624 con->out_kvec[con->out_kvec_left].iov_len = auth_len; 625 con->out_kvec_left++; 626 con->out_kvec_bytes += auth_len; 627 } 628 return 0; 629 } 630 631 /* 632 * We connected to a peer and are saying hello. 633 */ 634 static void prepare_write_banner(struct ceph_messenger *msgr, 635 struct ceph_connection *con) 636 { 637 int len = strlen(CEPH_BANNER); 638 639 con->out_kvec[0].iov_base = CEPH_BANNER; 640 con->out_kvec[0].iov_len = len; 641 con->out_kvec[1].iov_base = &msgr->my_enc_addr; 642 con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr); 643 con->out_kvec_left = 2; 644 con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr); 645 con->out_kvec_cur = con->out_kvec; 646 con->out_more = 0; 647 set_bit(WRITE_PENDING, &con->state); 648 } 649 650 static int prepare_write_connect(struct ceph_messenger *msgr, 651 struct ceph_connection *con, 652 int after_banner) 653 { 654 unsigned global_seq = get_global_seq(con->msgr, 0); 655 int proto; 656 657 switch (con->peer_name.type) { 658 case CEPH_ENTITY_TYPE_MON: 659 proto = CEPH_MONC_PROTOCOL; 660 break; 661 case CEPH_ENTITY_TYPE_OSD: 662 proto = CEPH_OSDC_PROTOCOL; 663 break; 664 case CEPH_ENTITY_TYPE_MDS: 665 proto = CEPH_MDSC_PROTOCOL; 666 break; 667 default: 668 BUG(); 669 } 670 671 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 672 con->connect_seq, global_seq, proto); 673 674 con->out_connect.features = cpu_to_le64(msgr->supported_features); 675 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 676 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 677 con->out_connect.global_seq = cpu_to_le32(global_seq); 678 con->out_connect.protocol_version = cpu_to_le32(proto); 679 con->out_connect.flags = 0; 680 681 if (!after_banner) { 682 con->out_kvec_left = 0; 683 con->out_kvec_bytes = 0; 684 } 685 con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect; 686 con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect); 687 con->out_kvec_left++; 688 con->out_kvec_bytes += sizeof(con->out_connect); 689 con->out_kvec_cur = con->out_kvec; 690 con->out_more = 0; 691 set_bit(WRITE_PENDING, &con->state); 692 693 return prepare_connect_authorizer(con); 694 } 695 696 697 /* 698 * write as much of pending kvecs to the socket as we can. 699 * 1 -> done 700 * 0 -> socket full, but more to do 701 * <0 -> error 702 */ 703 static int write_partial_kvec(struct ceph_connection *con) 704 { 705 int ret; 706 707 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 708 while (con->out_kvec_bytes > 0) { 709 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 710 con->out_kvec_left, con->out_kvec_bytes, 711 con->out_more); 712 if (ret <= 0) 713 goto out; 714 con->out_kvec_bytes -= ret; 715 if (con->out_kvec_bytes == 0) 716 break; /* done */ 717 while (ret > 0) { 718 if (ret >= con->out_kvec_cur->iov_len) { 719 ret -= con->out_kvec_cur->iov_len; 720 con->out_kvec_cur++; 721 con->out_kvec_left--; 722 } else { 723 con->out_kvec_cur->iov_len -= ret; 724 con->out_kvec_cur->iov_base += ret; 725 ret = 0; 726 break; 727 } 728 } 729 } 730 con->out_kvec_left = 0; 731 con->out_kvec_is_msg = false; 732 ret = 1; 733 out: 734 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 735 con->out_kvec_bytes, con->out_kvec_left, ret); 736 return ret; /* done! */ 737 } 738 739 #ifdef CONFIG_BLOCK 740 static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) 741 { 742 if (!bio) { 743 *iter = NULL; 744 *seg = 0; 745 return; 746 } 747 *iter = bio; 748 *seg = bio->bi_idx; 749 } 750 751 static void iter_bio_next(struct bio **bio_iter, int *seg) 752 { 753 if (*bio_iter == NULL) 754 return; 755 756 BUG_ON(*seg >= (*bio_iter)->bi_vcnt); 757 758 (*seg)++; 759 if (*seg == (*bio_iter)->bi_vcnt) 760 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); 761 } 762 #endif 763 764 /* 765 * Write as much message data payload as we can. If we finish, queue 766 * up the footer. 767 * 1 -> done, footer is now queued in out_kvec[]. 768 * 0 -> socket full, but more to do 769 * <0 -> error 770 */ 771 static int write_partial_msg_pages(struct ceph_connection *con) 772 { 773 struct ceph_msg *msg = con->out_msg; 774 unsigned data_len = le32_to_cpu(msg->hdr.data_len); 775 size_t len; 776 int crc = con->msgr->nocrc; 777 int ret; 778 int total_max_write; 779 int in_trail = 0; 780 size_t trail_len = (msg->trail ? msg->trail->length : 0); 781 782 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 783 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 784 con->out_msg_pos.page_pos); 785 786 #ifdef CONFIG_BLOCK 787 if (msg->bio && !msg->bio_iter) 788 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); 789 #endif 790 791 while (data_len > con->out_msg_pos.data_pos) { 792 struct page *page = NULL; 793 void *kaddr = NULL; 794 int max_write = PAGE_SIZE; 795 int page_shift = 0; 796 797 total_max_write = data_len - trail_len - 798 con->out_msg_pos.data_pos; 799 800 /* 801 * if we are calculating the data crc (the default), we need 802 * to map the page. if our pages[] has been revoked, use the 803 * zero page. 804 */ 805 806 /* have we reached the trail part of the data? */ 807 if (con->out_msg_pos.data_pos >= data_len - trail_len) { 808 in_trail = 1; 809 810 total_max_write = data_len - con->out_msg_pos.data_pos; 811 812 page = list_first_entry(&msg->trail->head, 813 struct page, lru); 814 if (crc) 815 kaddr = kmap(page); 816 max_write = PAGE_SIZE; 817 } else if (msg->pages) { 818 page = msg->pages[con->out_msg_pos.page]; 819 if (crc) 820 kaddr = kmap(page); 821 } else if (msg->pagelist) { 822 page = list_first_entry(&msg->pagelist->head, 823 struct page, lru); 824 if (crc) 825 kaddr = kmap(page); 826 #ifdef CONFIG_BLOCK 827 } else if (msg->bio) { 828 struct bio_vec *bv; 829 830 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); 831 page = bv->bv_page; 832 page_shift = bv->bv_offset; 833 if (crc) 834 kaddr = kmap(page) + page_shift; 835 max_write = bv->bv_len; 836 #endif 837 } else { 838 page = con->msgr->zero_page; 839 if (crc) 840 kaddr = page_address(con->msgr->zero_page); 841 } 842 len = min_t(int, max_write - con->out_msg_pos.page_pos, 843 total_max_write); 844 845 if (crc && !con->out_msg_pos.did_page_crc) { 846 void *base = kaddr + con->out_msg_pos.page_pos; 847 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 848 849 BUG_ON(kaddr == NULL); 850 con->out_msg->footer.data_crc = 851 cpu_to_le32(crc32c(tmpcrc, base, len)); 852 con->out_msg_pos.did_page_crc = 1; 853 } 854 ret = kernel_sendpage(con->sock, page, 855 con->out_msg_pos.page_pos + page_shift, 856 len, 857 MSG_DONTWAIT | MSG_NOSIGNAL | 858 MSG_MORE); 859 860 if (crc && 861 (msg->pages || msg->pagelist || msg->bio || in_trail)) 862 kunmap(page); 863 864 if (ret == -EAGAIN) 865 ret = 0; 866 if (ret <= 0) 867 goto out; 868 869 con->out_msg_pos.data_pos += ret; 870 con->out_msg_pos.page_pos += ret; 871 if (ret == len) { 872 con->out_msg_pos.page_pos = 0; 873 con->out_msg_pos.page++; 874 con->out_msg_pos.did_page_crc = 0; 875 if (in_trail) 876 list_move_tail(&page->lru, 877 &msg->trail->head); 878 else if (msg->pagelist) 879 list_move_tail(&page->lru, 880 &msg->pagelist->head); 881 #ifdef CONFIG_BLOCK 882 else if (msg->bio) 883 iter_bio_next(&msg->bio_iter, &msg->bio_seg); 884 #endif 885 } 886 } 887 888 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 889 890 /* prepare and queue up footer, too */ 891 if (!crc) 892 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 893 con->out_kvec_bytes = 0; 894 con->out_kvec_left = 0; 895 con->out_kvec_cur = con->out_kvec; 896 prepare_write_message_footer(con, 0); 897 ret = 1; 898 out: 899 return ret; 900 } 901 902 /* 903 * write some zeros 904 */ 905 static int write_partial_skip(struct ceph_connection *con) 906 { 907 int ret; 908 909 while (con->out_skip > 0) { 910 struct kvec iov = { 911 .iov_base = page_address(con->msgr->zero_page), 912 .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE) 913 }; 914 915 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1); 916 if (ret <= 0) 917 goto out; 918 con->out_skip -= ret; 919 } 920 ret = 1; 921 out: 922 return ret; 923 } 924 925 /* 926 * Prepare to read connection handshake, or an ack. 927 */ 928 static void prepare_read_banner(struct ceph_connection *con) 929 { 930 dout("prepare_read_banner %p\n", con); 931 con->in_base_pos = 0; 932 } 933 934 static void prepare_read_connect(struct ceph_connection *con) 935 { 936 dout("prepare_read_connect %p\n", con); 937 con->in_base_pos = 0; 938 } 939 940 static void prepare_read_ack(struct ceph_connection *con) 941 { 942 dout("prepare_read_ack %p\n", con); 943 con->in_base_pos = 0; 944 } 945 946 static void prepare_read_tag(struct ceph_connection *con) 947 { 948 dout("prepare_read_tag %p\n", con); 949 con->in_base_pos = 0; 950 con->in_tag = CEPH_MSGR_TAG_READY; 951 } 952 953 /* 954 * Prepare to read a message. 955 */ 956 static int prepare_read_message(struct ceph_connection *con) 957 { 958 dout("prepare_read_message %p\n", con); 959 BUG_ON(con->in_msg != NULL); 960 con->in_base_pos = 0; 961 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 962 return 0; 963 } 964 965 966 static int read_partial(struct ceph_connection *con, 967 int *to, int size, void *object) 968 { 969 *to += size; 970 while (con->in_base_pos < *to) { 971 int left = *to - con->in_base_pos; 972 int have = size - left; 973 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 974 if (ret <= 0) 975 return ret; 976 con->in_base_pos += ret; 977 } 978 return 1; 979 } 980 981 982 /* 983 * Read all or part of the connect-side handshake on a new connection 984 */ 985 static int read_partial_banner(struct ceph_connection *con) 986 { 987 int ret, to = 0; 988 989 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 990 991 /* peer's banner */ 992 ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); 993 if (ret <= 0) 994 goto out; 995 ret = read_partial(con, &to, sizeof(con->actual_peer_addr), 996 &con->actual_peer_addr); 997 if (ret <= 0) 998 goto out; 999 ret = read_partial(con, &to, sizeof(con->peer_addr_for_me), 1000 &con->peer_addr_for_me); 1001 if (ret <= 0) 1002 goto out; 1003 out: 1004 return ret; 1005 } 1006 1007 static int read_partial_connect(struct ceph_connection *con) 1008 { 1009 int ret, to = 0; 1010 1011 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1012 1013 ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); 1014 if (ret <= 0) 1015 goto out; 1016 ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len), 1017 con->auth_reply_buf); 1018 if (ret <= 0) 1019 goto out; 1020 1021 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 1022 con, (int)con->in_reply.tag, 1023 le32_to_cpu(con->in_reply.connect_seq), 1024 le32_to_cpu(con->in_reply.global_seq)); 1025 out: 1026 return ret; 1027 1028 } 1029 1030 /* 1031 * Verify the hello banner looks okay. 1032 */ 1033 static int verify_hello(struct ceph_connection *con) 1034 { 1035 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 1036 pr_err("connect to %s got bad banner\n", 1037 ceph_pr_addr(&con->peer_addr.in_addr)); 1038 con->error_msg = "protocol error, bad banner"; 1039 return -1; 1040 } 1041 return 0; 1042 } 1043 1044 static bool addr_is_blank(struct sockaddr_storage *ss) 1045 { 1046 switch (ss->ss_family) { 1047 case AF_INET: 1048 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; 1049 case AF_INET6: 1050 return 1051 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && 1052 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && 1053 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && 1054 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; 1055 } 1056 return false; 1057 } 1058 1059 static int addr_port(struct sockaddr_storage *ss) 1060 { 1061 switch (ss->ss_family) { 1062 case AF_INET: 1063 return ntohs(((struct sockaddr_in *)ss)->sin_port); 1064 case AF_INET6: 1065 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port); 1066 } 1067 return 0; 1068 } 1069 1070 static void addr_set_port(struct sockaddr_storage *ss, int p) 1071 { 1072 switch (ss->ss_family) { 1073 case AF_INET: 1074 ((struct sockaddr_in *)ss)->sin_port = htons(p); 1075 break; 1076 case AF_INET6: 1077 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p); 1078 break; 1079 } 1080 } 1081 1082 /* 1083 * Unlike other *_pton function semantics, zero indicates success. 1084 */ 1085 static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss, 1086 char delim, const char **ipend) 1087 { 1088 struct sockaddr_in *in4 = (void *)ss; 1089 struct sockaddr_in6 *in6 = (void *)ss; 1090 1091 memset(ss, 0, sizeof(*ss)); 1092 1093 if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) { 1094 ss->ss_family = AF_INET; 1095 return 0; 1096 } 1097 1098 if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) { 1099 ss->ss_family = AF_INET6; 1100 return 0; 1101 } 1102 1103 return -EINVAL; 1104 } 1105 1106 /* 1107 * Extract hostname string and resolve using kernel DNS facility. 1108 */ 1109 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER 1110 static int ceph_dns_resolve_name(const char *name, size_t namelen, 1111 struct sockaddr_storage *ss, char delim, const char **ipend) 1112 { 1113 const char *end, *delim_p; 1114 char *colon_p, *ip_addr = NULL; 1115 int ip_len, ret; 1116 1117 /* 1118 * The end of the hostname occurs immediately preceding the delimiter or 1119 * the port marker (':') where the delimiter takes precedence. 1120 */ 1121 delim_p = memchr(name, delim, namelen); 1122 colon_p = memchr(name, ':', namelen); 1123 1124 if (delim_p && colon_p) 1125 end = delim_p < colon_p ? delim_p : colon_p; 1126 else if (!delim_p && colon_p) 1127 end = colon_p; 1128 else { 1129 end = delim_p; 1130 if (!end) /* case: hostname:/ */ 1131 end = name + namelen; 1132 } 1133 1134 if (end <= name) 1135 return -EINVAL; 1136 1137 /* do dns_resolve upcall */ 1138 ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL); 1139 if (ip_len > 0) 1140 ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL); 1141 else 1142 ret = -ESRCH; 1143 1144 kfree(ip_addr); 1145 1146 *ipend = end; 1147 1148 pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name, 1149 ret, ret ? "failed" : ceph_pr_addr(ss)); 1150 1151 return ret; 1152 } 1153 #else 1154 static inline int ceph_dns_resolve_name(const char *name, size_t namelen, 1155 struct sockaddr_storage *ss, char delim, const char **ipend) 1156 { 1157 return -EINVAL; 1158 } 1159 #endif 1160 1161 /* 1162 * Parse a server name (IP or hostname). If a valid IP address is not found 1163 * then try to extract a hostname to resolve using userspace DNS upcall. 1164 */ 1165 static int ceph_parse_server_name(const char *name, size_t namelen, 1166 struct sockaddr_storage *ss, char delim, const char **ipend) 1167 { 1168 int ret; 1169 1170 ret = ceph_pton(name, namelen, ss, delim, ipend); 1171 if (ret) 1172 ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend); 1173 1174 return ret; 1175 } 1176 1177 /* 1178 * Parse an ip[:port] list into an addr array. Use the default 1179 * monitor port if a port isn't specified. 1180 */ 1181 int ceph_parse_ips(const char *c, const char *end, 1182 struct ceph_entity_addr *addr, 1183 int max_count, int *count) 1184 { 1185 int i, ret = -EINVAL; 1186 const char *p = c; 1187 1188 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 1189 for (i = 0; i < max_count; i++) { 1190 const char *ipend; 1191 struct sockaddr_storage *ss = &addr[i].in_addr; 1192 int port; 1193 char delim = ','; 1194 1195 if (*p == '[') { 1196 delim = ']'; 1197 p++; 1198 } 1199 1200 ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend); 1201 if (ret) 1202 goto bad; 1203 ret = -EINVAL; 1204 1205 p = ipend; 1206 1207 if (delim == ']') { 1208 if (*p != ']') { 1209 dout("missing matching ']'\n"); 1210 goto bad; 1211 } 1212 p++; 1213 } 1214 1215 /* port? */ 1216 if (p < end && *p == ':') { 1217 port = 0; 1218 p++; 1219 while (p < end && *p >= '0' && *p <= '9') { 1220 port = (port * 10) + (*p - '0'); 1221 p++; 1222 } 1223 if (port > 65535 || port == 0) 1224 goto bad; 1225 } else { 1226 port = CEPH_MON_PORT; 1227 } 1228 1229 addr_set_port(ss, port); 1230 1231 dout("parse_ips got %s\n", ceph_pr_addr(ss)); 1232 1233 if (p == end) 1234 break; 1235 if (*p != ',') 1236 goto bad; 1237 p++; 1238 } 1239 1240 if (p != end) 1241 goto bad; 1242 1243 if (count) 1244 *count = i + 1; 1245 return 0; 1246 1247 bad: 1248 pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); 1249 return ret; 1250 } 1251 EXPORT_SYMBOL(ceph_parse_ips); 1252 1253 static int process_banner(struct ceph_connection *con) 1254 { 1255 dout("process_banner on %p\n", con); 1256 1257 if (verify_hello(con) < 0) 1258 return -1; 1259 1260 ceph_decode_addr(&con->actual_peer_addr); 1261 ceph_decode_addr(&con->peer_addr_for_me); 1262 1263 /* 1264 * Make sure the other end is who we wanted. note that the other 1265 * end may not yet know their ip address, so if it's 0.0.0.0, give 1266 * them the benefit of the doubt. 1267 */ 1268 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 1269 sizeof(con->peer_addr)) != 0 && 1270 !(addr_is_blank(&con->actual_peer_addr.in_addr) && 1271 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 1272 pr_warning("wrong peer, want %s/%d, got %s/%d\n", 1273 ceph_pr_addr(&con->peer_addr.in_addr), 1274 (int)le32_to_cpu(con->peer_addr.nonce), 1275 ceph_pr_addr(&con->actual_peer_addr.in_addr), 1276 (int)le32_to_cpu(con->actual_peer_addr.nonce)); 1277 con->error_msg = "wrong peer at address"; 1278 return -1; 1279 } 1280 1281 /* 1282 * did we learn our address? 1283 */ 1284 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { 1285 int port = addr_port(&con->msgr->inst.addr.in_addr); 1286 1287 memcpy(&con->msgr->inst.addr.in_addr, 1288 &con->peer_addr_for_me.in_addr, 1289 sizeof(con->peer_addr_for_me.in_addr)); 1290 addr_set_port(&con->msgr->inst.addr.in_addr, port); 1291 encode_my_addr(con->msgr); 1292 dout("process_banner learned my addr is %s\n", 1293 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); 1294 } 1295 1296 set_bit(NEGOTIATING, &con->state); 1297 prepare_read_connect(con); 1298 return 0; 1299 } 1300 1301 static void fail_protocol(struct ceph_connection *con) 1302 { 1303 reset_connection(con); 1304 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1305 1306 mutex_unlock(&con->mutex); 1307 if (con->ops->bad_proto) 1308 con->ops->bad_proto(con); 1309 mutex_lock(&con->mutex); 1310 } 1311 1312 static int process_connect(struct ceph_connection *con) 1313 { 1314 u64 sup_feat = con->msgr->supported_features; 1315 u64 req_feat = con->msgr->required_features; 1316 u64 server_feat = le64_to_cpu(con->in_reply.features); 1317 int ret; 1318 1319 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1320 1321 switch (con->in_reply.tag) { 1322 case CEPH_MSGR_TAG_FEATURES: 1323 pr_err("%s%lld %s feature set mismatch," 1324 " my %llx < server's %llx, missing %llx\n", 1325 ENTITY_NAME(con->peer_name), 1326 ceph_pr_addr(&con->peer_addr.in_addr), 1327 sup_feat, server_feat, server_feat & ~sup_feat); 1328 con->error_msg = "missing required protocol features"; 1329 fail_protocol(con); 1330 return -1; 1331 1332 case CEPH_MSGR_TAG_BADPROTOVER: 1333 pr_err("%s%lld %s protocol version mismatch," 1334 " my %d != server's %d\n", 1335 ENTITY_NAME(con->peer_name), 1336 ceph_pr_addr(&con->peer_addr.in_addr), 1337 le32_to_cpu(con->out_connect.protocol_version), 1338 le32_to_cpu(con->in_reply.protocol_version)); 1339 con->error_msg = "protocol version mismatch"; 1340 fail_protocol(con); 1341 return -1; 1342 1343 case CEPH_MSGR_TAG_BADAUTHORIZER: 1344 con->auth_retry++; 1345 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 1346 con->auth_retry); 1347 if (con->auth_retry == 2) { 1348 con->error_msg = "connect authorization failure"; 1349 return -1; 1350 } 1351 con->auth_retry = 1; 1352 ret = prepare_write_connect(con->msgr, con, 0); 1353 if (ret < 0) 1354 return ret; 1355 prepare_read_connect(con); 1356 break; 1357 1358 case CEPH_MSGR_TAG_RESETSESSION: 1359 /* 1360 * If we connected with a large connect_seq but the peer 1361 * has no record of a session with us (no connection, or 1362 * connect_seq == 0), they will send RESETSESION to indicate 1363 * that they must have reset their session, and may have 1364 * dropped messages. 1365 */ 1366 dout("process_connect got RESET peer seq %u\n", 1367 le32_to_cpu(con->in_connect.connect_seq)); 1368 pr_err("%s%lld %s connection reset\n", 1369 ENTITY_NAME(con->peer_name), 1370 ceph_pr_addr(&con->peer_addr.in_addr)); 1371 reset_connection(con); 1372 prepare_write_connect(con->msgr, con, 0); 1373 prepare_read_connect(con); 1374 1375 /* Tell ceph about it. */ 1376 mutex_unlock(&con->mutex); 1377 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); 1378 if (con->ops->peer_reset) 1379 con->ops->peer_reset(con); 1380 mutex_lock(&con->mutex); 1381 if (test_bit(CLOSED, &con->state) || 1382 test_bit(OPENING, &con->state)) 1383 return -EAGAIN; 1384 break; 1385 1386 case CEPH_MSGR_TAG_RETRY_SESSION: 1387 /* 1388 * If we sent a smaller connect_seq than the peer has, try 1389 * again with a larger value. 1390 */ 1391 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n", 1392 le32_to_cpu(con->out_connect.connect_seq), 1393 le32_to_cpu(con->in_connect.connect_seq)); 1394 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1395 prepare_write_connect(con->msgr, con, 0); 1396 prepare_read_connect(con); 1397 break; 1398 1399 case CEPH_MSGR_TAG_RETRY_GLOBAL: 1400 /* 1401 * If we sent a smaller global_seq than the peer has, try 1402 * again with a larger value. 1403 */ 1404 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 1405 con->peer_global_seq, 1406 le32_to_cpu(con->in_connect.global_seq)); 1407 get_global_seq(con->msgr, 1408 le32_to_cpu(con->in_connect.global_seq)); 1409 prepare_write_connect(con->msgr, con, 0); 1410 prepare_read_connect(con); 1411 break; 1412 1413 case CEPH_MSGR_TAG_READY: 1414 if (req_feat & ~server_feat) { 1415 pr_err("%s%lld %s protocol feature mismatch," 1416 " my required %llx > server's %llx, need %llx\n", 1417 ENTITY_NAME(con->peer_name), 1418 ceph_pr_addr(&con->peer_addr.in_addr), 1419 req_feat, server_feat, req_feat & ~server_feat); 1420 con->error_msg = "missing required protocol features"; 1421 fail_protocol(con); 1422 return -1; 1423 } 1424 clear_bit(CONNECTING, &con->state); 1425 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1426 con->connect_seq++; 1427 con->peer_features = server_feat; 1428 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1429 con->peer_global_seq, 1430 le32_to_cpu(con->in_reply.connect_seq), 1431 con->connect_seq); 1432 WARN_ON(con->connect_seq != 1433 le32_to_cpu(con->in_reply.connect_seq)); 1434 1435 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1436 set_bit(LOSSYTX, &con->state); 1437 1438 prepare_read_tag(con); 1439 break; 1440 1441 case CEPH_MSGR_TAG_WAIT: 1442 /* 1443 * If there is a connection race (we are opening 1444 * connections to each other), one of us may just have 1445 * to WAIT. This shouldn't happen if we are the 1446 * client. 1447 */ 1448 pr_err("process_connect got WAIT as client\n"); 1449 con->error_msg = "protocol error, got WAIT as client"; 1450 return -1; 1451 1452 default: 1453 pr_err("connect protocol error, will retry\n"); 1454 con->error_msg = "protocol error, garbage tag during connect"; 1455 return -1; 1456 } 1457 return 0; 1458 } 1459 1460 1461 /* 1462 * read (part of) an ack 1463 */ 1464 static int read_partial_ack(struct ceph_connection *con) 1465 { 1466 int to = 0; 1467 1468 return read_partial(con, &to, sizeof(con->in_temp_ack), 1469 &con->in_temp_ack); 1470 } 1471 1472 1473 /* 1474 * We can finally discard anything that's been acked. 1475 */ 1476 static void process_ack(struct ceph_connection *con) 1477 { 1478 struct ceph_msg *m; 1479 u64 ack = le64_to_cpu(con->in_temp_ack); 1480 u64 seq; 1481 1482 while (!list_empty(&con->out_sent)) { 1483 m = list_first_entry(&con->out_sent, struct ceph_msg, 1484 list_head); 1485 seq = le64_to_cpu(m->hdr.seq); 1486 if (seq > ack) 1487 break; 1488 dout("got ack for seq %llu type %d at %p\n", seq, 1489 le16_to_cpu(m->hdr.type), m); 1490 m->ack_stamp = jiffies; 1491 ceph_msg_remove(m); 1492 } 1493 prepare_read_tag(con); 1494 } 1495 1496 1497 1498 1499 static int read_partial_message_section(struct ceph_connection *con, 1500 struct kvec *section, 1501 unsigned int sec_len, u32 *crc) 1502 { 1503 int ret, left; 1504 1505 BUG_ON(!section); 1506 1507 while (section->iov_len < sec_len) { 1508 BUG_ON(section->iov_base == NULL); 1509 left = sec_len - section->iov_len; 1510 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 1511 section->iov_len, left); 1512 if (ret <= 0) 1513 return ret; 1514 section->iov_len += ret; 1515 if (section->iov_len == sec_len) 1516 *crc = crc32c(0, section->iov_base, 1517 section->iov_len); 1518 } 1519 1520 return 1; 1521 } 1522 1523 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1524 struct ceph_msg_header *hdr, 1525 int *skip); 1526 1527 1528 static int read_partial_message_pages(struct ceph_connection *con, 1529 struct page **pages, 1530 unsigned data_len, int datacrc) 1531 { 1532 void *p; 1533 int ret; 1534 int left; 1535 1536 left = min((int)(data_len - con->in_msg_pos.data_pos), 1537 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1538 /* (page) data */ 1539 BUG_ON(pages == NULL); 1540 p = kmap(pages[con->in_msg_pos.page]); 1541 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1542 left); 1543 if (ret > 0 && datacrc) 1544 con->in_data_crc = 1545 crc32c(con->in_data_crc, 1546 p + con->in_msg_pos.page_pos, ret); 1547 kunmap(pages[con->in_msg_pos.page]); 1548 if (ret <= 0) 1549 return ret; 1550 con->in_msg_pos.data_pos += ret; 1551 con->in_msg_pos.page_pos += ret; 1552 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1553 con->in_msg_pos.page_pos = 0; 1554 con->in_msg_pos.page++; 1555 } 1556 1557 return ret; 1558 } 1559 1560 #ifdef CONFIG_BLOCK 1561 static int read_partial_message_bio(struct ceph_connection *con, 1562 struct bio **bio_iter, int *bio_seg, 1563 unsigned data_len, int datacrc) 1564 { 1565 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg); 1566 void *p; 1567 int ret, left; 1568 1569 if (IS_ERR(bv)) 1570 return PTR_ERR(bv); 1571 1572 left = min((int)(data_len - con->in_msg_pos.data_pos), 1573 (int)(bv->bv_len - con->in_msg_pos.page_pos)); 1574 1575 p = kmap(bv->bv_page) + bv->bv_offset; 1576 1577 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1578 left); 1579 if (ret > 0 && datacrc) 1580 con->in_data_crc = 1581 crc32c(con->in_data_crc, 1582 p + con->in_msg_pos.page_pos, ret); 1583 kunmap(bv->bv_page); 1584 if (ret <= 0) 1585 return ret; 1586 con->in_msg_pos.data_pos += ret; 1587 con->in_msg_pos.page_pos += ret; 1588 if (con->in_msg_pos.page_pos == bv->bv_len) { 1589 con->in_msg_pos.page_pos = 0; 1590 iter_bio_next(bio_iter, bio_seg); 1591 } 1592 1593 return ret; 1594 } 1595 #endif 1596 1597 /* 1598 * read (part of) a message. 1599 */ 1600 static int read_partial_message(struct ceph_connection *con) 1601 { 1602 struct ceph_msg *m = con->in_msg; 1603 int ret; 1604 int to, left; 1605 unsigned front_len, middle_len, data_len; 1606 int datacrc = con->msgr->nocrc; 1607 int skip; 1608 u64 seq; 1609 1610 dout("read_partial_message con %p msg %p\n", con, m); 1611 1612 /* header */ 1613 while (con->in_base_pos < sizeof(con->in_hdr)) { 1614 left = sizeof(con->in_hdr) - con->in_base_pos; 1615 ret = ceph_tcp_recvmsg(con->sock, 1616 (char *)&con->in_hdr + con->in_base_pos, 1617 left); 1618 if (ret <= 0) 1619 return ret; 1620 con->in_base_pos += ret; 1621 if (con->in_base_pos == sizeof(con->in_hdr)) { 1622 u32 crc = crc32c(0, (void *)&con->in_hdr, 1623 sizeof(con->in_hdr) - sizeof(con->in_hdr.crc)); 1624 if (crc != le32_to_cpu(con->in_hdr.crc)) { 1625 pr_err("read_partial_message bad hdr " 1626 " crc %u != expected %u\n", 1627 crc, con->in_hdr.crc); 1628 return -EBADMSG; 1629 } 1630 } 1631 } 1632 front_len = le32_to_cpu(con->in_hdr.front_len); 1633 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1634 return -EIO; 1635 middle_len = le32_to_cpu(con->in_hdr.middle_len); 1636 if (middle_len > CEPH_MSG_MAX_DATA_LEN) 1637 return -EIO; 1638 data_len = le32_to_cpu(con->in_hdr.data_len); 1639 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1640 return -EIO; 1641 1642 /* verify seq# */ 1643 seq = le64_to_cpu(con->in_hdr.seq); 1644 if ((s64)seq - (s64)con->in_seq < 1) { 1645 pr_info("skipping %s%lld %s seq %lld expected %lld\n", 1646 ENTITY_NAME(con->peer_name), 1647 ceph_pr_addr(&con->peer_addr.in_addr), 1648 seq, con->in_seq + 1); 1649 con->in_base_pos = -front_len - middle_len - data_len - 1650 sizeof(m->footer); 1651 con->in_tag = CEPH_MSGR_TAG_READY; 1652 return 0; 1653 } else if ((s64)seq - (s64)con->in_seq > 1) { 1654 pr_err("read_partial_message bad seq %lld expected %lld\n", 1655 seq, con->in_seq + 1); 1656 con->error_msg = "bad message sequence # for incoming message"; 1657 return -EBADMSG; 1658 } 1659 1660 /* allocate message? */ 1661 if (!con->in_msg) { 1662 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1663 con->in_hdr.front_len, con->in_hdr.data_len); 1664 skip = 0; 1665 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1666 if (skip) { 1667 /* skip this message */ 1668 dout("alloc_msg said skip message\n"); 1669 BUG_ON(con->in_msg); 1670 con->in_base_pos = -front_len - middle_len - data_len - 1671 sizeof(m->footer); 1672 con->in_tag = CEPH_MSGR_TAG_READY; 1673 con->in_seq++; 1674 return 0; 1675 } 1676 if (!con->in_msg) { 1677 con->error_msg = 1678 "error allocating memory for incoming message"; 1679 return -ENOMEM; 1680 } 1681 m = con->in_msg; 1682 m->front.iov_len = 0; /* haven't read it yet */ 1683 if (m->middle) 1684 m->middle->vec.iov_len = 0; 1685 1686 con->in_msg_pos.page = 0; 1687 if (m->pages) 1688 con->in_msg_pos.page_pos = m->page_alignment; 1689 else 1690 con->in_msg_pos.page_pos = 0; 1691 con->in_msg_pos.data_pos = 0; 1692 } 1693 1694 /* front */ 1695 ret = read_partial_message_section(con, &m->front, front_len, 1696 &con->in_front_crc); 1697 if (ret <= 0) 1698 return ret; 1699 1700 /* middle */ 1701 if (m->middle) { 1702 ret = read_partial_message_section(con, &m->middle->vec, 1703 middle_len, 1704 &con->in_middle_crc); 1705 if (ret <= 0) 1706 return ret; 1707 } 1708 #ifdef CONFIG_BLOCK 1709 if (m->bio && !m->bio_iter) 1710 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); 1711 #endif 1712 1713 /* (page) data */ 1714 while (con->in_msg_pos.data_pos < data_len) { 1715 if (m->pages) { 1716 ret = read_partial_message_pages(con, m->pages, 1717 data_len, datacrc); 1718 if (ret <= 0) 1719 return ret; 1720 #ifdef CONFIG_BLOCK 1721 } else if (m->bio) { 1722 1723 ret = read_partial_message_bio(con, 1724 &m->bio_iter, &m->bio_seg, 1725 data_len, datacrc); 1726 if (ret <= 0) 1727 return ret; 1728 #endif 1729 } else { 1730 BUG_ON(1); 1731 } 1732 } 1733 1734 /* footer */ 1735 to = sizeof(m->hdr) + sizeof(m->footer); 1736 while (con->in_base_pos < to) { 1737 left = to - con->in_base_pos; 1738 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + 1739 (con->in_base_pos - sizeof(m->hdr)), 1740 left); 1741 if (ret <= 0) 1742 return ret; 1743 con->in_base_pos += ret; 1744 } 1745 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1746 m, front_len, m->footer.front_crc, middle_len, 1747 m->footer.middle_crc, data_len, m->footer.data_crc); 1748 1749 /* crc ok? */ 1750 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 1751 pr_err("read_partial_message %p front crc %u != exp. %u\n", 1752 m, con->in_front_crc, m->footer.front_crc); 1753 return -EBADMSG; 1754 } 1755 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 1756 pr_err("read_partial_message %p middle crc %u != exp %u\n", 1757 m, con->in_middle_crc, m->footer.middle_crc); 1758 return -EBADMSG; 1759 } 1760 if (datacrc && 1761 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 1762 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 1763 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 1764 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 1765 return -EBADMSG; 1766 } 1767 1768 return 1; /* done! */ 1769 } 1770 1771 /* 1772 * Process message. This happens in the worker thread. The callback should 1773 * be careful not to do anything that waits on other incoming messages or it 1774 * may deadlock. 1775 */ 1776 static void process_message(struct ceph_connection *con) 1777 { 1778 struct ceph_msg *msg; 1779 1780 msg = con->in_msg; 1781 con->in_msg = NULL; 1782 1783 /* if first message, set peer_name */ 1784 if (con->peer_name.type == 0) 1785 con->peer_name = msg->hdr.src; 1786 1787 con->in_seq++; 1788 mutex_unlock(&con->mutex); 1789 1790 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1791 msg, le64_to_cpu(msg->hdr.seq), 1792 ENTITY_NAME(msg->hdr.src), 1793 le16_to_cpu(msg->hdr.type), 1794 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1795 le32_to_cpu(msg->hdr.front_len), 1796 le32_to_cpu(msg->hdr.data_len), 1797 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 1798 con->ops->dispatch(con, msg); 1799 1800 mutex_lock(&con->mutex); 1801 prepare_read_tag(con); 1802 } 1803 1804 1805 /* 1806 * Write something to the socket. Called in a worker thread when the 1807 * socket appears to be writeable and we have something ready to send. 1808 */ 1809 static int try_write(struct ceph_connection *con) 1810 { 1811 struct ceph_messenger *msgr = con->msgr; 1812 int ret = 1; 1813 1814 dout("try_write start %p state %lu nref %d\n", con, con->state, 1815 atomic_read(&con->nref)); 1816 1817 more: 1818 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1819 1820 /* open the socket first? */ 1821 if (con->sock == NULL) { 1822 prepare_write_banner(msgr, con); 1823 prepare_write_connect(msgr, con, 1); 1824 prepare_read_banner(con); 1825 set_bit(CONNECTING, &con->state); 1826 clear_bit(NEGOTIATING, &con->state); 1827 1828 BUG_ON(con->in_msg); 1829 con->in_tag = CEPH_MSGR_TAG_READY; 1830 dout("try_write initiating connect on %p new state %lu\n", 1831 con, con->state); 1832 con->sock = ceph_tcp_connect(con); 1833 if (IS_ERR(con->sock)) { 1834 con->sock = NULL; 1835 con->error_msg = "connect error"; 1836 ret = -1; 1837 goto out; 1838 } 1839 } 1840 1841 more_kvec: 1842 /* kvec data queued? */ 1843 if (con->out_skip) { 1844 ret = write_partial_skip(con); 1845 if (ret <= 0) 1846 goto out; 1847 } 1848 if (con->out_kvec_left) { 1849 ret = write_partial_kvec(con); 1850 if (ret <= 0) 1851 goto out; 1852 } 1853 1854 /* msg pages? */ 1855 if (con->out_msg) { 1856 if (con->out_msg_done) { 1857 ceph_msg_put(con->out_msg); 1858 con->out_msg = NULL; /* we're done with this one */ 1859 goto do_next; 1860 } 1861 1862 ret = write_partial_msg_pages(con); 1863 if (ret == 1) 1864 goto more_kvec; /* we need to send the footer, too! */ 1865 if (ret == 0) 1866 goto out; 1867 if (ret < 0) { 1868 dout("try_write write_partial_msg_pages err %d\n", 1869 ret); 1870 goto out; 1871 } 1872 } 1873 1874 do_next: 1875 if (!test_bit(CONNECTING, &con->state)) { 1876 /* is anything else pending? */ 1877 if (!list_empty(&con->out_queue)) { 1878 prepare_write_message(con); 1879 goto more; 1880 } 1881 if (con->in_seq > con->in_seq_acked) { 1882 prepare_write_ack(con); 1883 goto more; 1884 } 1885 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { 1886 prepare_write_keepalive(con); 1887 goto more; 1888 } 1889 } 1890 1891 /* Nothing to do! */ 1892 clear_bit(WRITE_PENDING, &con->state); 1893 dout("try_write nothing else to write.\n"); 1894 ret = 0; 1895 out: 1896 dout("try_write done on %p ret %d\n", con, ret); 1897 return ret; 1898 } 1899 1900 1901 1902 /* 1903 * Read what we can from the socket. 1904 */ 1905 static int try_read(struct ceph_connection *con) 1906 { 1907 int ret = -1; 1908 1909 if (!con->sock) 1910 return 0; 1911 1912 if (test_bit(STANDBY, &con->state)) 1913 return 0; 1914 1915 dout("try_read start on %p\n", con); 1916 1917 more: 1918 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1919 con->in_base_pos); 1920 1921 /* 1922 * process_connect and process_message drop and re-take 1923 * con->mutex. make sure we handle a racing close or reopen. 1924 */ 1925 if (test_bit(CLOSED, &con->state) || 1926 test_bit(OPENING, &con->state)) { 1927 ret = -EAGAIN; 1928 goto out; 1929 } 1930 1931 if (test_bit(CONNECTING, &con->state)) { 1932 if (!test_bit(NEGOTIATING, &con->state)) { 1933 dout("try_read connecting\n"); 1934 ret = read_partial_banner(con); 1935 if (ret <= 0) 1936 goto out; 1937 ret = process_banner(con); 1938 if (ret < 0) 1939 goto out; 1940 } 1941 ret = read_partial_connect(con); 1942 if (ret <= 0) 1943 goto out; 1944 ret = process_connect(con); 1945 if (ret < 0) 1946 goto out; 1947 goto more; 1948 } 1949 1950 if (con->in_base_pos < 0) { 1951 /* 1952 * skipping + discarding content. 1953 * 1954 * FIXME: there must be a better way to do this! 1955 */ 1956 static char buf[1024]; 1957 int skip = min(1024, -con->in_base_pos); 1958 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); 1959 ret = ceph_tcp_recvmsg(con->sock, buf, skip); 1960 if (ret <= 0) 1961 goto out; 1962 con->in_base_pos += ret; 1963 if (con->in_base_pos) 1964 goto more; 1965 } 1966 if (con->in_tag == CEPH_MSGR_TAG_READY) { 1967 /* 1968 * what's next? 1969 */ 1970 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 1971 if (ret <= 0) 1972 goto out; 1973 dout("try_read got tag %d\n", (int)con->in_tag); 1974 switch (con->in_tag) { 1975 case CEPH_MSGR_TAG_MSG: 1976 prepare_read_message(con); 1977 break; 1978 case CEPH_MSGR_TAG_ACK: 1979 prepare_read_ack(con); 1980 break; 1981 case CEPH_MSGR_TAG_CLOSE: 1982 set_bit(CLOSED, &con->state); /* fixme */ 1983 goto out; 1984 default: 1985 goto bad_tag; 1986 } 1987 } 1988 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 1989 ret = read_partial_message(con); 1990 if (ret <= 0) { 1991 switch (ret) { 1992 case -EBADMSG: 1993 con->error_msg = "bad crc"; 1994 ret = -EIO; 1995 break; 1996 case -EIO: 1997 con->error_msg = "io error"; 1998 break; 1999 } 2000 goto out; 2001 } 2002 if (con->in_tag == CEPH_MSGR_TAG_READY) 2003 goto more; 2004 process_message(con); 2005 goto more; 2006 } 2007 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 2008 ret = read_partial_ack(con); 2009 if (ret <= 0) 2010 goto out; 2011 process_ack(con); 2012 goto more; 2013 } 2014 2015 out: 2016 dout("try_read done on %p ret %d\n", con, ret); 2017 return ret; 2018 2019 bad_tag: 2020 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 2021 con->error_msg = "protocol error, garbage tag"; 2022 ret = -1; 2023 goto out; 2024 } 2025 2026 2027 /* 2028 * Atomically queue work on a connection. Bump @con reference to 2029 * avoid races with connection teardown. 2030 */ 2031 static void queue_con(struct ceph_connection *con) 2032 { 2033 if (test_bit(DEAD, &con->state)) { 2034 dout("queue_con %p ignoring: DEAD\n", 2035 con); 2036 return; 2037 } 2038 2039 if (!con->ops->get(con)) { 2040 dout("queue_con %p ref count 0\n", con); 2041 return; 2042 } 2043 2044 if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { 2045 dout("queue_con %p - already queued\n", con); 2046 con->ops->put(con); 2047 } else { 2048 dout("queue_con %p\n", con); 2049 } 2050 } 2051 2052 /* 2053 * Do some work on a connection. Drop a connection ref when we're done. 2054 */ 2055 static void con_work(struct work_struct *work) 2056 { 2057 struct ceph_connection *con = container_of(work, struct ceph_connection, 2058 work.work); 2059 int ret; 2060 2061 mutex_lock(&con->mutex); 2062 restart: 2063 if (test_and_clear_bit(BACKOFF, &con->state)) { 2064 dout("con_work %p backing off\n", con); 2065 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2066 round_jiffies_relative(con->delay))) { 2067 dout("con_work %p backoff %lu\n", con, con->delay); 2068 mutex_unlock(&con->mutex); 2069 return; 2070 } else { 2071 con->ops->put(con); 2072 dout("con_work %p FAILED to back off %lu\n", con, 2073 con->delay); 2074 } 2075 } 2076 2077 if (test_bit(STANDBY, &con->state)) { 2078 dout("con_work %p STANDBY\n", con); 2079 goto done; 2080 } 2081 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 2082 dout("con_work CLOSED\n"); 2083 con_close_socket(con); 2084 goto done; 2085 } 2086 if (test_and_clear_bit(OPENING, &con->state)) { 2087 /* reopen w/ new peer */ 2088 dout("con_work OPENING\n"); 2089 con_close_socket(con); 2090 } 2091 2092 if (test_and_clear_bit(SOCK_CLOSED, &con->state)) 2093 goto fault; 2094 2095 ret = try_read(con); 2096 if (ret == -EAGAIN) 2097 goto restart; 2098 if (ret < 0) 2099 goto fault; 2100 2101 ret = try_write(con); 2102 if (ret == -EAGAIN) 2103 goto restart; 2104 if (ret < 0) 2105 goto fault; 2106 2107 done: 2108 mutex_unlock(&con->mutex); 2109 done_unlocked: 2110 con->ops->put(con); 2111 return; 2112 2113 fault: 2114 mutex_unlock(&con->mutex); 2115 ceph_fault(con); /* error/fault path */ 2116 goto done_unlocked; 2117 } 2118 2119 2120 /* 2121 * Generic error/fault handler. A retry mechanism is used with 2122 * exponential backoff 2123 */ 2124 static void ceph_fault(struct ceph_connection *con) 2125 { 2126 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2127 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2128 dout("fault %p state %lu to peer %s\n", 2129 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2130 2131 if (test_bit(LOSSYTX, &con->state)) { 2132 dout("fault on LOSSYTX channel\n"); 2133 goto out; 2134 } 2135 2136 mutex_lock(&con->mutex); 2137 if (test_bit(CLOSED, &con->state)) 2138 goto out_unlock; 2139 2140 con_close_socket(con); 2141 2142 if (con->in_msg) { 2143 ceph_msg_put(con->in_msg); 2144 con->in_msg = NULL; 2145 } 2146 2147 /* Requeue anything that hasn't been acked */ 2148 list_splice_init(&con->out_sent, &con->out_queue); 2149 2150 /* If there are no messages queued or keepalive pending, place 2151 * the connection in a STANDBY state */ 2152 if (list_empty(&con->out_queue) && 2153 !test_bit(KEEPALIVE_PENDING, &con->state)) { 2154 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2155 clear_bit(WRITE_PENDING, &con->state); 2156 set_bit(STANDBY, &con->state); 2157 } else { 2158 /* retry after a delay. */ 2159 if (con->delay == 0) 2160 con->delay = BASE_DELAY_INTERVAL; 2161 else if (con->delay < MAX_DELAY_INTERVAL) 2162 con->delay *= 2; 2163 con->ops->get(con); 2164 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2165 round_jiffies_relative(con->delay))) { 2166 dout("fault queued %p delay %lu\n", con, con->delay); 2167 } else { 2168 con->ops->put(con); 2169 dout("fault failed to queue %p delay %lu, backoff\n", 2170 con, con->delay); 2171 /* 2172 * In many cases we see a socket state change 2173 * while con_work is running and end up 2174 * queuing (non-delayed) work, such that we 2175 * can't backoff with a delay. Set a flag so 2176 * that when con_work restarts we schedule the 2177 * delay then. 2178 */ 2179 set_bit(BACKOFF, &con->state); 2180 } 2181 } 2182 2183 out_unlock: 2184 mutex_unlock(&con->mutex); 2185 out: 2186 /* 2187 * in case we faulted due to authentication, invalidate our 2188 * current tickets so that we can get new ones. 2189 */ 2190 if (con->auth_retry && con->ops->invalidate_authorizer) { 2191 dout("calling invalidate_authorizer()\n"); 2192 con->ops->invalidate_authorizer(con); 2193 } 2194 2195 if (con->ops->fault) 2196 con->ops->fault(con); 2197 } 2198 2199 2200 2201 /* 2202 * create a new messenger instance 2203 */ 2204 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, 2205 u32 supported_features, 2206 u32 required_features) 2207 { 2208 struct ceph_messenger *msgr; 2209 2210 msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); 2211 if (msgr == NULL) 2212 return ERR_PTR(-ENOMEM); 2213 2214 msgr->supported_features = supported_features; 2215 msgr->required_features = required_features; 2216 2217 spin_lock_init(&msgr->global_seq_lock); 2218 2219 /* the zero page is needed if a request is "canceled" while the message 2220 * is being written over the socket */ 2221 msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO); 2222 if (!msgr->zero_page) { 2223 kfree(msgr); 2224 return ERR_PTR(-ENOMEM); 2225 } 2226 kmap(msgr->zero_page); 2227 2228 if (myaddr) 2229 msgr->inst.addr = *myaddr; 2230 2231 /* select a random nonce */ 2232 msgr->inst.addr.type = 0; 2233 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2234 encode_my_addr(msgr); 2235 2236 dout("messenger_create %p\n", msgr); 2237 return msgr; 2238 } 2239 EXPORT_SYMBOL(ceph_messenger_create); 2240 2241 void ceph_messenger_destroy(struct ceph_messenger *msgr) 2242 { 2243 dout("destroy %p\n", msgr); 2244 kunmap(msgr->zero_page); 2245 __free_page(msgr->zero_page); 2246 kfree(msgr); 2247 dout("destroyed messenger %p\n", msgr); 2248 } 2249 EXPORT_SYMBOL(ceph_messenger_destroy); 2250 2251 static void clear_standby(struct ceph_connection *con) 2252 { 2253 /* come back from STANDBY? */ 2254 if (test_and_clear_bit(STANDBY, &con->state)) { 2255 mutex_lock(&con->mutex); 2256 dout("clear_standby %p and ++connect_seq\n", con); 2257 con->connect_seq++; 2258 WARN_ON(test_bit(WRITE_PENDING, &con->state)); 2259 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); 2260 mutex_unlock(&con->mutex); 2261 } 2262 } 2263 2264 /* 2265 * Queue up an outgoing message on the given connection. 2266 */ 2267 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 2268 { 2269 if (test_bit(CLOSED, &con->state)) { 2270 dout("con_send %p closed, dropping %p\n", con, msg); 2271 ceph_msg_put(msg); 2272 return; 2273 } 2274 2275 /* set src+dst */ 2276 msg->hdr.src = con->msgr->inst.name; 2277 2278 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 2279 2280 msg->needs_out_seq = true; 2281 2282 /* queue */ 2283 mutex_lock(&con->mutex); 2284 BUG_ON(!list_empty(&msg->list_head)); 2285 list_add_tail(&msg->list_head, &con->out_queue); 2286 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 2287 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), 2288 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 2289 le32_to_cpu(msg->hdr.front_len), 2290 le32_to_cpu(msg->hdr.middle_len), 2291 le32_to_cpu(msg->hdr.data_len)); 2292 mutex_unlock(&con->mutex); 2293 2294 /* if there wasn't anything waiting to send before, queue 2295 * new work */ 2296 clear_standby(con); 2297 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2298 queue_con(con); 2299 } 2300 EXPORT_SYMBOL(ceph_con_send); 2301 2302 /* 2303 * Revoke a message that was previously queued for send 2304 */ 2305 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) 2306 { 2307 mutex_lock(&con->mutex); 2308 if (!list_empty(&msg->list_head)) { 2309 dout("con_revoke %p msg %p - was on queue\n", con, msg); 2310 list_del_init(&msg->list_head); 2311 ceph_msg_put(msg); 2312 msg->hdr.seq = 0; 2313 } 2314 if (con->out_msg == msg) { 2315 dout("con_revoke %p msg %p - was sending\n", con, msg); 2316 con->out_msg = NULL; 2317 if (con->out_kvec_is_msg) { 2318 con->out_skip = con->out_kvec_bytes; 2319 con->out_kvec_is_msg = false; 2320 } 2321 ceph_msg_put(msg); 2322 msg->hdr.seq = 0; 2323 } 2324 mutex_unlock(&con->mutex); 2325 } 2326 2327 /* 2328 * Revoke a message that we may be reading data into 2329 */ 2330 void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) 2331 { 2332 mutex_lock(&con->mutex); 2333 if (con->in_msg && con->in_msg == msg) { 2334 unsigned front_len = le32_to_cpu(con->in_hdr.front_len); 2335 unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len); 2336 unsigned data_len = le32_to_cpu(con->in_hdr.data_len); 2337 2338 /* skip rest of message */ 2339 dout("con_revoke_pages %p msg %p revoked\n", con, msg); 2340 con->in_base_pos = con->in_base_pos - 2341 sizeof(struct ceph_msg_header) - 2342 front_len - 2343 middle_len - 2344 data_len - 2345 sizeof(struct ceph_msg_footer); 2346 ceph_msg_put(con->in_msg); 2347 con->in_msg = NULL; 2348 con->in_tag = CEPH_MSGR_TAG_READY; 2349 con->in_seq++; 2350 } else { 2351 dout("con_revoke_pages %p msg %p pages %p no-op\n", 2352 con, con->in_msg, msg); 2353 } 2354 mutex_unlock(&con->mutex); 2355 } 2356 2357 /* 2358 * Queue a keepalive byte to ensure the tcp connection is alive. 2359 */ 2360 void ceph_con_keepalive(struct ceph_connection *con) 2361 { 2362 dout("con_keepalive %p\n", con); 2363 clear_standby(con); 2364 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2365 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2366 queue_con(con); 2367 } 2368 EXPORT_SYMBOL(ceph_con_keepalive); 2369 2370 2371 /* 2372 * construct a new message with given type, size 2373 * the new msg has a ref count of 1. 2374 */ 2375 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 2376 bool can_fail) 2377 { 2378 struct ceph_msg *m; 2379 2380 m = kmalloc(sizeof(*m), flags); 2381 if (m == NULL) 2382 goto out; 2383 kref_init(&m->kref); 2384 INIT_LIST_HEAD(&m->list_head); 2385 2386 m->hdr.tid = 0; 2387 m->hdr.type = cpu_to_le16(type); 2388 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 2389 m->hdr.version = 0; 2390 m->hdr.front_len = cpu_to_le32(front_len); 2391 m->hdr.middle_len = 0; 2392 m->hdr.data_len = 0; 2393 m->hdr.data_off = 0; 2394 m->hdr.reserved = 0; 2395 m->footer.front_crc = 0; 2396 m->footer.middle_crc = 0; 2397 m->footer.data_crc = 0; 2398 m->footer.flags = 0; 2399 m->front_max = front_len; 2400 m->front_is_vmalloc = false; 2401 m->more_to_follow = false; 2402 m->ack_stamp = 0; 2403 m->pool = NULL; 2404 2405 /* middle */ 2406 m->middle = NULL; 2407 2408 /* data */ 2409 m->nr_pages = 0; 2410 m->page_alignment = 0; 2411 m->pages = NULL; 2412 m->pagelist = NULL; 2413 m->bio = NULL; 2414 m->bio_iter = NULL; 2415 m->bio_seg = 0; 2416 m->trail = NULL; 2417 2418 /* front */ 2419 if (front_len) { 2420 if (front_len > PAGE_CACHE_SIZE) { 2421 m->front.iov_base = __vmalloc(front_len, flags, 2422 PAGE_KERNEL); 2423 m->front_is_vmalloc = true; 2424 } else { 2425 m->front.iov_base = kmalloc(front_len, flags); 2426 } 2427 if (m->front.iov_base == NULL) { 2428 dout("ceph_msg_new can't allocate %d bytes\n", 2429 front_len); 2430 goto out2; 2431 } 2432 } else { 2433 m->front.iov_base = NULL; 2434 } 2435 m->front.iov_len = front_len; 2436 2437 dout("ceph_msg_new %p front %d\n", m, front_len); 2438 return m; 2439 2440 out2: 2441 ceph_msg_put(m); 2442 out: 2443 if (!can_fail) { 2444 pr_err("msg_new can't create type %d front %d\n", type, 2445 front_len); 2446 WARN_ON(1); 2447 } else { 2448 dout("msg_new can't create type %d front %d\n", type, 2449 front_len); 2450 } 2451 return NULL; 2452 } 2453 EXPORT_SYMBOL(ceph_msg_new); 2454 2455 /* 2456 * Allocate "middle" portion of a message, if it is needed and wasn't 2457 * allocated by alloc_msg. This allows us to read a small fixed-size 2458 * per-type header in the front and then gracefully fail (i.e., 2459 * propagate the error to the caller based on info in the front) when 2460 * the middle is too large. 2461 */ 2462 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 2463 { 2464 int type = le16_to_cpu(msg->hdr.type); 2465 int middle_len = le32_to_cpu(msg->hdr.middle_len); 2466 2467 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, 2468 ceph_msg_type_name(type), middle_len); 2469 BUG_ON(!middle_len); 2470 BUG_ON(msg->middle); 2471 2472 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS); 2473 if (!msg->middle) 2474 return -ENOMEM; 2475 return 0; 2476 } 2477 2478 /* 2479 * Generic message allocator, for incoming messages. 2480 */ 2481 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 2482 struct ceph_msg_header *hdr, 2483 int *skip) 2484 { 2485 int type = le16_to_cpu(hdr->type); 2486 int front_len = le32_to_cpu(hdr->front_len); 2487 int middle_len = le32_to_cpu(hdr->middle_len); 2488 struct ceph_msg *msg = NULL; 2489 int ret; 2490 2491 if (con->ops->alloc_msg) { 2492 mutex_unlock(&con->mutex); 2493 msg = con->ops->alloc_msg(con, hdr, skip); 2494 mutex_lock(&con->mutex); 2495 if (!msg || *skip) 2496 return NULL; 2497 } 2498 if (!msg) { 2499 *skip = 0; 2500 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 2501 if (!msg) { 2502 pr_err("unable to allocate msg type %d len %d\n", 2503 type, front_len); 2504 return NULL; 2505 } 2506 msg->page_alignment = le16_to_cpu(hdr->data_off); 2507 } 2508 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2509 2510 if (middle_len && !msg->middle) { 2511 ret = ceph_alloc_middle(con, msg); 2512 if (ret < 0) { 2513 ceph_msg_put(msg); 2514 return NULL; 2515 } 2516 } 2517 2518 return msg; 2519 } 2520 2521 2522 /* 2523 * Free a generically kmalloc'd message. 2524 */ 2525 void ceph_msg_kfree(struct ceph_msg *m) 2526 { 2527 dout("msg_kfree %p\n", m); 2528 if (m->front_is_vmalloc) 2529 vfree(m->front.iov_base); 2530 else 2531 kfree(m->front.iov_base); 2532 kfree(m); 2533 } 2534 2535 /* 2536 * Drop a msg ref. Destroy as needed. 2537 */ 2538 void ceph_msg_last_put(struct kref *kref) 2539 { 2540 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 2541 2542 dout("ceph_msg_put last one on %p\n", m); 2543 WARN_ON(!list_empty(&m->list_head)); 2544 2545 /* drop middle, data, if any */ 2546 if (m->middle) { 2547 ceph_buffer_put(m->middle); 2548 m->middle = NULL; 2549 } 2550 m->nr_pages = 0; 2551 m->pages = NULL; 2552 2553 if (m->pagelist) { 2554 ceph_pagelist_release(m->pagelist); 2555 kfree(m->pagelist); 2556 m->pagelist = NULL; 2557 } 2558 2559 m->trail = NULL; 2560 2561 if (m->pool) 2562 ceph_msgpool_put(m->pool, m); 2563 else 2564 ceph_msg_kfree(m); 2565 } 2566 EXPORT_SYMBOL(ceph_msg_last_put); 2567 2568 void ceph_msg_dump(struct ceph_msg *msg) 2569 { 2570 pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, 2571 msg->front_max, msg->nr_pages); 2572 print_hex_dump(KERN_DEBUG, "header: ", 2573 DUMP_PREFIX_OFFSET, 16, 1, 2574 &msg->hdr, sizeof(msg->hdr), true); 2575 print_hex_dump(KERN_DEBUG, " front: ", 2576 DUMP_PREFIX_OFFSET, 16, 1, 2577 msg->front.iov_base, msg->front.iov_len, true); 2578 if (msg->middle) 2579 print_hex_dump(KERN_DEBUG, "middle: ", 2580 DUMP_PREFIX_OFFSET, 16, 1, 2581 msg->middle->vec.iov_base, 2582 msg->middle->vec.iov_len, true); 2583 print_hex_dump(KERN_DEBUG, "footer: ", 2584 DUMP_PREFIX_OFFSET, 16, 1, 2585 &msg->footer, sizeof(msg->footer), true); 2586 } 2587 EXPORT_SYMBOL(ceph_msg_dump); 2588