1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/crc32c.h> 4 #include <linux/ctype.h> 5 #include <linux/highmem.h> 6 #include <linux/inet.h> 7 #include <linux/kthread.h> 8 #include <linux/net.h> 9 #include <linux/slab.h> 10 #include <linux/socket.h> 11 #include <linux/string.h> 12 #include <linux/bio.h> 13 #include <linux/blkdev.h> 14 #include <net/tcp.h> 15 16 #include <linux/ceph/libceph.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/pagelist.h> 20 21 /* 22 * Ceph uses the messenger to exchange ceph_msg messages with other 23 * hosts in the system. The messenger provides ordered and reliable 24 * delivery. We tolerate TCP disconnects by reconnecting (with 25 * exponential backoff) in the case of a fault (disconnection, bad 26 * crc, protocol error). Acks allow sent messages to be discarded by 27 * the sender. 28 */ 29 30 /* static tag bytes (protocol control messages) */ 31 static char tag_msg = CEPH_MSGR_TAG_MSG; 32 static char tag_ack = CEPH_MSGR_TAG_ACK; 33 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 34 35 #ifdef CONFIG_LOCKDEP 36 static struct lock_class_key socket_class; 37 #endif 38 39 40 static void queue_con(struct ceph_connection *con); 41 static void con_work(struct work_struct *); 42 static void ceph_fault(struct ceph_connection *con); 43 44 /* 45 * nicely render a sockaddr as a string. 46 */ 47 #define MAX_ADDR_STR 20 48 #define MAX_ADDR_STR_LEN 60 49 static char addr_str[MAX_ADDR_STR][MAX_ADDR_STR_LEN]; 50 static DEFINE_SPINLOCK(addr_str_lock); 51 static int last_addr_str; 52 53 const char *ceph_pr_addr(const struct sockaddr_storage *ss) 54 { 55 int i; 56 char *s; 57 struct sockaddr_in *in4 = (void *)ss; 58 struct sockaddr_in6 *in6 = (void *)ss; 59 60 spin_lock(&addr_str_lock); 61 i = last_addr_str++; 62 if (last_addr_str == MAX_ADDR_STR) 63 last_addr_str = 0; 64 spin_unlock(&addr_str_lock); 65 s = addr_str[i]; 66 67 switch (ss->ss_family) { 68 case AF_INET: 69 snprintf(s, MAX_ADDR_STR_LEN, "%pI4:%u", &in4->sin_addr, 70 (unsigned int)ntohs(in4->sin_port)); 71 break; 72 73 case AF_INET6: 74 snprintf(s, MAX_ADDR_STR_LEN, "[%pI6c]:%u", &in6->sin6_addr, 75 (unsigned int)ntohs(in6->sin6_port)); 76 break; 77 78 default: 79 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %d)", 80 (int)ss->ss_family); 81 } 82 83 return s; 84 } 85 EXPORT_SYMBOL(ceph_pr_addr); 86 87 static void encode_my_addr(struct ceph_messenger *msgr) 88 { 89 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); 90 ceph_encode_addr(&msgr->my_enc_addr); 91 } 92 93 /* 94 * work queue for all reading and writing to/from the socket. 95 */ 96 struct workqueue_struct *ceph_msgr_wq; 97 98 int ceph_msgr_init(void) 99 { 100 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); 101 if (!ceph_msgr_wq) { 102 pr_err("msgr_init failed to create workqueue\n"); 103 return -ENOMEM; 104 } 105 return 0; 106 } 107 EXPORT_SYMBOL(ceph_msgr_init); 108 109 void ceph_msgr_exit(void) 110 { 111 destroy_workqueue(ceph_msgr_wq); 112 } 113 EXPORT_SYMBOL(ceph_msgr_exit); 114 115 void ceph_msgr_flush(void) 116 { 117 flush_workqueue(ceph_msgr_wq); 118 } 119 EXPORT_SYMBOL(ceph_msgr_flush); 120 121 122 /* 123 * socket callback functions 124 */ 125 126 /* data available on socket, or listen socket received a connect */ 127 static void ceph_data_ready(struct sock *sk, int count_unused) 128 { 129 struct ceph_connection *con = 130 (struct ceph_connection *)sk->sk_user_data; 131 if (sk->sk_state != TCP_CLOSE_WAIT) { 132 dout("ceph_data_ready on %p state = %lu, queueing work\n", 133 con, con->state); 134 queue_con(con); 135 } 136 } 137 138 /* socket has buffer space for writing */ 139 static void ceph_write_space(struct sock *sk) 140 { 141 struct ceph_connection *con = 142 (struct ceph_connection *)sk->sk_user_data; 143 144 /* only queue to workqueue if there is data we want to write. */ 145 if (test_bit(WRITE_PENDING, &con->state)) { 146 dout("ceph_write_space %p queueing write work\n", con); 147 queue_con(con); 148 } else { 149 dout("ceph_write_space %p nothing to write\n", con); 150 } 151 152 /* since we have our own write_space, clear the SOCK_NOSPACE flag */ 153 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 154 } 155 156 /* socket's state has changed */ 157 static void ceph_state_change(struct sock *sk) 158 { 159 struct ceph_connection *con = 160 (struct ceph_connection *)sk->sk_user_data; 161 162 dout("ceph_state_change %p state = %lu sk_state = %u\n", 163 con, con->state, sk->sk_state); 164 165 if (test_bit(CLOSED, &con->state)) 166 return; 167 168 switch (sk->sk_state) { 169 case TCP_CLOSE: 170 dout("ceph_state_change TCP_CLOSE\n"); 171 case TCP_CLOSE_WAIT: 172 dout("ceph_state_change TCP_CLOSE_WAIT\n"); 173 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { 174 if (test_bit(CONNECTING, &con->state)) 175 con->error_msg = "connection failed"; 176 else 177 con->error_msg = "socket closed"; 178 queue_con(con); 179 } 180 break; 181 case TCP_ESTABLISHED: 182 dout("ceph_state_change TCP_ESTABLISHED\n"); 183 queue_con(con); 184 break; 185 } 186 } 187 188 /* 189 * set up socket callbacks 190 */ 191 static void set_sock_callbacks(struct socket *sock, 192 struct ceph_connection *con) 193 { 194 struct sock *sk = sock->sk; 195 sk->sk_user_data = (void *)con; 196 sk->sk_data_ready = ceph_data_ready; 197 sk->sk_write_space = ceph_write_space; 198 sk->sk_state_change = ceph_state_change; 199 } 200 201 202 /* 203 * socket helpers 204 */ 205 206 /* 207 * initiate connection to a remote socket. 208 */ 209 static struct socket *ceph_tcp_connect(struct ceph_connection *con) 210 { 211 struct sockaddr_storage *paddr = &con->peer_addr.in_addr; 212 struct socket *sock; 213 int ret; 214 215 BUG_ON(con->sock); 216 ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, 217 IPPROTO_TCP, &sock); 218 if (ret) 219 return ERR_PTR(ret); 220 con->sock = sock; 221 sock->sk->sk_allocation = GFP_NOFS; 222 223 #ifdef CONFIG_LOCKDEP 224 lockdep_set_class(&sock->sk->sk_lock, &socket_class); 225 #endif 226 227 set_sock_callbacks(sock, con); 228 229 dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr)); 230 231 ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr), 232 O_NONBLOCK); 233 if (ret == -EINPROGRESS) { 234 dout("connect %s EINPROGRESS sk_state = %u\n", 235 ceph_pr_addr(&con->peer_addr.in_addr), 236 sock->sk->sk_state); 237 ret = 0; 238 } 239 if (ret < 0) { 240 pr_err("connect %s error %d\n", 241 ceph_pr_addr(&con->peer_addr.in_addr), ret); 242 sock_release(sock); 243 con->sock = NULL; 244 con->error_msg = "connect error"; 245 } 246 247 if (ret < 0) 248 return ERR_PTR(ret); 249 return sock; 250 } 251 252 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 253 { 254 struct kvec iov = {buf, len}; 255 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 256 int r; 257 258 r = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags); 259 if (r == -EAGAIN) 260 r = 0; 261 return r; 262 } 263 264 /* 265 * write something. @more is true if caller will be sending more data 266 * shortly. 267 */ 268 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 269 size_t kvlen, size_t len, int more) 270 { 271 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 272 int r; 273 274 if (more) 275 msg.msg_flags |= MSG_MORE; 276 else 277 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 278 279 r = kernel_sendmsg(sock, &msg, iov, kvlen, len); 280 if (r == -EAGAIN) 281 r = 0; 282 return r; 283 } 284 285 286 /* 287 * Shutdown/close the socket for the given connection. 288 */ 289 static int con_close_socket(struct ceph_connection *con) 290 { 291 int rc; 292 293 dout("con_close_socket on %p sock %p\n", con, con->sock); 294 if (!con->sock) 295 return 0; 296 set_bit(SOCK_CLOSED, &con->state); 297 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 298 sock_release(con->sock); 299 con->sock = NULL; 300 clear_bit(SOCK_CLOSED, &con->state); 301 return rc; 302 } 303 304 /* 305 * Reset a connection. Discard all incoming and outgoing messages 306 * and clear *_seq state. 307 */ 308 static void ceph_msg_remove(struct ceph_msg *msg) 309 { 310 list_del_init(&msg->list_head); 311 ceph_msg_put(msg); 312 } 313 static void ceph_msg_remove_list(struct list_head *head) 314 { 315 while (!list_empty(head)) { 316 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 317 list_head); 318 ceph_msg_remove(msg); 319 } 320 } 321 322 static void reset_connection(struct ceph_connection *con) 323 { 324 /* reset connection, out_queue, msg_ and connect_seq */ 325 /* discard existing out_queue and msg_seq */ 326 ceph_msg_remove_list(&con->out_queue); 327 ceph_msg_remove_list(&con->out_sent); 328 329 if (con->in_msg) { 330 ceph_msg_put(con->in_msg); 331 con->in_msg = NULL; 332 } 333 334 con->connect_seq = 0; 335 con->out_seq = 0; 336 if (con->out_msg) { 337 ceph_msg_put(con->out_msg); 338 con->out_msg = NULL; 339 } 340 con->in_seq = 0; 341 con->in_seq_acked = 0; 342 } 343 344 /* 345 * mark a peer down. drop any open connections. 346 */ 347 void ceph_con_close(struct ceph_connection *con) 348 { 349 dout("con_close %p peer %s\n", con, 350 ceph_pr_addr(&con->peer_addr.in_addr)); 351 set_bit(CLOSED, &con->state); /* in case there's queued work */ 352 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 353 clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ 354 clear_bit(KEEPALIVE_PENDING, &con->state); 355 clear_bit(WRITE_PENDING, &con->state); 356 mutex_lock(&con->mutex); 357 reset_connection(con); 358 con->peer_global_seq = 0; 359 cancel_delayed_work(&con->work); 360 mutex_unlock(&con->mutex); 361 queue_con(con); 362 } 363 EXPORT_SYMBOL(ceph_con_close); 364 365 /* 366 * Reopen a closed connection, with a new peer address. 367 */ 368 void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr) 369 { 370 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 371 set_bit(OPENING, &con->state); 372 clear_bit(CLOSED, &con->state); 373 memcpy(&con->peer_addr, addr, sizeof(*addr)); 374 con->delay = 0; /* reset backoff memory */ 375 queue_con(con); 376 } 377 EXPORT_SYMBOL(ceph_con_open); 378 379 /* 380 * return true if this connection ever successfully opened 381 */ 382 bool ceph_con_opened(struct ceph_connection *con) 383 { 384 return con->connect_seq > 0; 385 } 386 387 /* 388 * generic get/put 389 */ 390 struct ceph_connection *ceph_con_get(struct ceph_connection *con) 391 { 392 dout("con_get %p nref = %d -> %d\n", con, 393 atomic_read(&con->nref), atomic_read(&con->nref) + 1); 394 if (atomic_inc_not_zero(&con->nref)) 395 return con; 396 return NULL; 397 } 398 399 void ceph_con_put(struct ceph_connection *con) 400 { 401 dout("con_put %p nref = %d -> %d\n", con, 402 atomic_read(&con->nref), atomic_read(&con->nref) - 1); 403 BUG_ON(atomic_read(&con->nref) == 0); 404 if (atomic_dec_and_test(&con->nref)) { 405 BUG_ON(con->sock); 406 kfree(con); 407 } 408 } 409 410 /* 411 * initialize a new connection. 412 */ 413 void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) 414 { 415 dout("con_init %p\n", con); 416 memset(con, 0, sizeof(*con)); 417 atomic_set(&con->nref, 1); 418 con->msgr = msgr; 419 mutex_init(&con->mutex); 420 INIT_LIST_HEAD(&con->out_queue); 421 INIT_LIST_HEAD(&con->out_sent); 422 INIT_DELAYED_WORK(&con->work, con_work); 423 } 424 EXPORT_SYMBOL(ceph_con_init); 425 426 427 /* 428 * We maintain a global counter to order connection attempts. Get 429 * a unique seq greater than @gt. 430 */ 431 static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) 432 { 433 u32 ret; 434 435 spin_lock(&msgr->global_seq_lock); 436 if (msgr->global_seq < gt) 437 msgr->global_seq = gt; 438 ret = ++msgr->global_seq; 439 spin_unlock(&msgr->global_seq_lock); 440 return ret; 441 } 442 443 444 /* 445 * Prepare footer for currently outgoing message, and finish things 446 * off. Assumes out_kvec* are already valid.. we just add on to the end. 447 */ 448 static void prepare_write_message_footer(struct ceph_connection *con, int v) 449 { 450 struct ceph_msg *m = con->out_msg; 451 452 dout("prepare_write_message_footer %p\n", con); 453 con->out_kvec_is_msg = true; 454 con->out_kvec[v].iov_base = &m->footer; 455 con->out_kvec[v].iov_len = sizeof(m->footer); 456 con->out_kvec_bytes += sizeof(m->footer); 457 con->out_kvec_left++; 458 con->out_more = m->more_to_follow; 459 con->out_msg_done = true; 460 } 461 462 /* 463 * Prepare headers for the next outgoing message. 464 */ 465 static void prepare_write_message(struct ceph_connection *con) 466 { 467 struct ceph_msg *m; 468 int v = 0; 469 470 con->out_kvec_bytes = 0; 471 con->out_kvec_is_msg = true; 472 con->out_msg_done = false; 473 474 /* Sneak an ack in there first? If we can get it into the same 475 * TCP packet that's a good thing. */ 476 if (con->in_seq > con->in_seq_acked) { 477 con->in_seq_acked = con->in_seq; 478 con->out_kvec[v].iov_base = &tag_ack; 479 con->out_kvec[v++].iov_len = 1; 480 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 481 con->out_kvec[v].iov_base = &con->out_temp_ack; 482 con->out_kvec[v++].iov_len = sizeof(con->out_temp_ack); 483 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); 484 } 485 486 m = list_first_entry(&con->out_queue, 487 struct ceph_msg, list_head); 488 con->out_msg = m; 489 if (test_bit(LOSSYTX, &con->state)) { 490 list_del_init(&m->list_head); 491 } else { 492 /* put message on sent list */ 493 ceph_msg_get(m); 494 list_move_tail(&m->list_head, &con->out_sent); 495 } 496 497 /* 498 * only assign outgoing seq # if we haven't sent this message 499 * yet. if it is requeued, resend with it's original seq. 500 */ 501 if (m->needs_out_seq) { 502 m->hdr.seq = cpu_to_le64(++con->out_seq); 503 m->needs_out_seq = false; 504 } 505 506 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n", 507 m, con->out_seq, le16_to_cpu(m->hdr.type), 508 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 509 le32_to_cpu(m->hdr.data_len), 510 m->nr_pages); 511 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 512 513 /* tag + hdr + front + middle */ 514 con->out_kvec[v].iov_base = &tag_msg; 515 con->out_kvec[v++].iov_len = 1; 516 con->out_kvec[v].iov_base = &m->hdr; 517 con->out_kvec[v++].iov_len = sizeof(m->hdr); 518 con->out_kvec[v++] = m->front; 519 if (m->middle) 520 con->out_kvec[v++] = m->middle->vec; 521 con->out_kvec_left = v; 522 con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len + 523 (m->middle ? m->middle->vec.iov_len : 0); 524 con->out_kvec_cur = con->out_kvec; 525 526 /* fill in crc (except data pages), footer */ 527 con->out_msg->hdr.crc = 528 cpu_to_le32(crc32c(0, (void *)&m->hdr, 529 sizeof(m->hdr) - sizeof(m->hdr.crc))); 530 con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE; 531 con->out_msg->footer.front_crc = 532 cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len)); 533 if (m->middle) 534 con->out_msg->footer.middle_crc = 535 cpu_to_le32(crc32c(0, m->middle->vec.iov_base, 536 m->middle->vec.iov_len)); 537 else 538 con->out_msg->footer.middle_crc = 0; 539 con->out_msg->footer.data_crc = 0; 540 dout("prepare_write_message front_crc %u data_crc %u\n", 541 le32_to_cpu(con->out_msg->footer.front_crc), 542 le32_to_cpu(con->out_msg->footer.middle_crc)); 543 544 /* is there a data payload? */ 545 if (le32_to_cpu(m->hdr.data_len) > 0) { 546 /* initialize page iterator */ 547 con->out_msg_pos.page = 0; 548 if (m->pages) 549 con->out_msg_pos.page_pos = m->page_alignment; 550 else 551 con->out_msg_pos.page_pos = 0; 552 con->out_msg_pos.data_pos = 0; 553 con->out_msg_pos.did_page_crc = 0; 554 con->out_more = 1; /* data + footer will follow */ 555 } else { 556 /* no, queue up footer too and be done */ 557 prepare_write_message_footer(con, v); 558 } 559 560 set_bit(WRITE_PENDING, &con->state); 561 } 562 563 /* 564 * Prepare an ack. 565 */ 566 static void prepare_write_ack(struct ceph_connection *con) 567 { 568 dout("prepare_write_ack %p %llu -> %llu\n", con, 569 con->in_seq_acked, con->in_seq); 570 con->in_seq_acked = con->in_seq; 571 572 con->out_kvec[0].iov_base = &tag_ack; 573 con->out_kvec[0].iov_len = 1; 574 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 575 con->out_kvec[1].iov_base = &con->out_temp_ack; 576 con->out_kvec[1].iov_len = sizeof(con->out_temp_ack); 577 con->out_kvec_left = 2; 578 con->out_kvec_bytes = 1 + sizeof(con->out_temp_ack); 579 con->out_kvec_cur = con->out_kvec; 580 con->out_more = 1; /* more will follow.. eventually.. */ 581 set_bit(WRITE_PENDING, &con->state); 582 } 583 584 /* 585 * Prepare to write keepalive byte. 586 */ 587 static void prepare_write_keepalive(struct ceph_connection *con) 588 { 589 dout("prepare_write_keepalive %p\n", con); 590 con->out_kvec[0].iov_base = &tag_keepalive; 591 con->out_kvec[0].iov_len = 1; 592 con->out_kvec_left = 1; 593 con->out_kvec_bytes = 1; 594 con->out_kvec_cur = con->out_kvec; 595 set_bit(WRITE_PENDING, &con->state); 596 } 597 598 /* 599 * Connection negotiation. 600 */ 601 602 static int prepare_connect_authorizer(struct ceph_connection *con) 603 { 604 void *auth_buf; 605 int auth_len = 0; 606 int auth_protocol = 0; 607 608 mutex_unlock(&con->mutex); 609 if (con->ops->get_authorizer) 610 con->ops->get_authorizer(con, &auth_buf, &auth_len, 611 &auth_protocol, &con->auth_reply_buf, 612 &con->auth_reply_buf_len, 613 con->auth_retry); 614 mutex_lock(&con->mutex); 615 616 if (test_bit(CLOSED, &con->state) || 617 test_bit(OPENING, &con->state)) 618 return -EAGAIN; 619 620 con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); 621 con->out_connect.authorizer_len = cpu_to_le32(auth_len); 622 623 if (auth_len) { 624 con->out_kvec[con->out_kvec_left].iov_base = auth_buf; 625 con->out_kvec[con->out_kvec_left].iov_len = auth_len; 626 con->out_kvec_left++; 627 con->out_kvec_bytes += auth_len; 628 } 629 return 0; 630 } 631 632 /* 633 * We connected to a peer and are saying hello. 634 */ 635 static void prepare_write_banner(struct ceph_messenger *msgr, 636 struct ceph_connection *con) 637 { 638 int len = strlen(CEPH_BANNER); 639 640 con->out_kvec[0].iov_base = CEPH_BANNER; 641 con->out_kvec[0].iov_len = len; 642 con->out_kvec[1].iov_base = &msgr->my_enc_addr; 643 con->out_kvec[1].iov_len = sizeof(msgr->my_enc_addr); 644 con->out_kvec_left = 2; 645 con->out_kvec_bytes = len + sizeof(msgr->my_enc_addr); 646 con->out_kvec_cur = con->out_kvec; 647 con->out_more = 0; 648 set_bit(WRITE_PENDING, &con->state); 649 } 650 651 static int prepare_write_connect(struct ceph_messenger *msgr, 652 struct ceph_connection *con, 653 int after_banner) 654 { 655 unsigned global_seq = get_global_seq(con->msgr, 0); 656 int proto; 657 658 switch (con->peer_name.type) { 659 case CEPH_ENTITY_TYPE_MON: 660 proto = CEPH_MONC_PROTOCOL; 661 break; 662 case CEPH_ENTITY_TYPE_OSD: 663 proto = CEPH_OSDC_PROTOCOL; 664 break; 665 case CEPH_ENTITY_TYPE_MDS: 666 proto = CEPH_MDSC_PROTOCOL; 667 break; 668 default: 669 BUG(); 670 } 671 672 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 673 con->connect_seq, global_seq, proto); 674 675 con->out_connect.features = cpu_to_le64(msgr->supported_features); 676 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 677 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 678 con->out_connect.global_seq = cpu_to_le32(global_seq); 679 con->out_connect.protocol_version = cpu_to_le32(proto); 680 con->out_connect.flags = 0; 681 682 if (!after_banner) { 683 con->out_kvec_left = 0; 684 con->out_kvec_bytes = 0; 685 } 686 con->out_kvec[con->out_kvec_left].iov_base = &con->out_connect; 687 con->out_kvec[con->out_kvec_left].iov_len = sizeof(con->out_connect); 688 con->out_kvec_left++; 689 con->out_kvec_bytes += sizeof(con->out_connect); 690 con->out_kvec_cur = con->out_kvec; 691 con->out_more = 0; 692 set_bit(WRITE_PENDING, &con->state); 693 694 return prepare_connect_authorizer(con); 695 } 696 697 698 /* 699 * write as much of pending kvecs to the socket as we can. 700 * 1 -> done 701 * 0 -> socket full, but more to do 702 * <0 -> error 703 */ 704 static int write_partial_kvec(struct ceph_connection *con) 705 { 706 int ret; 707 708 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 709 while (con->out_kvec_bytes > 0) { 710 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 711 con->out_kvec_left, con->out_kvec_bytes, 712 con->out_more); 713 if (ret <= 0) 714 goto out; 715 con->out_kvec_bytes -= ret; 716 if (con->out_kvec_bytes == 0) 717 break; /* done */ 718 while (ret > 0) { 719 if (ret >= con->out_kvec_cur->iov_len) { 720 ret -= con->out_kvec_cur->iov_len; 721 con->out_kvec_cur++; 722 con->out_kvec_left--; 723 } else { 724 con->out_kvec_cur->iov_len -= ret; 725 con->out_kvec_cur->iov_base += ret; 726 ret = 0; 727 break; 728 } 729 } 730 } 731 con->out_kvec_left = 0; 732 con->out_kvec_is_msg = false; 733 ret = 1; 734 out: 735 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 736 con->out_kvec_bytes, con->out_kvec_left, ret); 737 return ret; /* done! */ 738 } 739 740 #ifdef CONFIG_BLOCK 741 static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg) 742 { 743 if (!bio) { 744 *iter = NULL; 745 *seg = 0; 746 return; 747 } 748 *iter = bio; 749 *seg = bio->bi_idx; 750 } 751 752 static void iter_bio_next(struct bio **bio_iter, int *seg) 753 { 754 if (*bio_iter == NULL) 755 return; 756 757 BUG_ON(*seg >= (*bio_iter)->bi_vcnt); 758 759 (*seg)++; 760 if (*seg == (*bio_iter)->bi_vcnt) 761 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg); 762 } 763 #endif 764 765 /* 766 * Write as much message data payload as we can. If we finish, queue 767 * up the footer. 768 * 1 -> done, footer is now queued in out_kvec[]. 769 * 0 -> socket full, but more to do 770 * <0 -> error 771 */ 772 static int write_partial_msg_pages(struct ceph_connection *con) 773 { 774 struct ceph_msg *msg = con->out_msg; 775 unsigned data_len = le32_to_cpu(msg->hdr.data_len); 776 size_t len; 777 int crc = con->msgr->nocrc; 778 int ret; 779 int total_max_write; 780 int in_trail = 0; 781 size_t trail_len = (msg->trail ? msg->trail->length : 0); 782 783 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 784 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 785 con->out_msg_pos.page_pos); 786 787 #ifdef CONFIG_BLOCK 788 if (msg->bio && !msg->bio_iter) 789 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg); 790 #endif 791 792 while (data_len > con->out_msg_pos.data_pos) { 793 struct page *page = NULL; 794 void *kaddr = NULL; 795 int max_write = PAGE_SIZE; 796 int page_shift = 0; 797 798 total_max_write = data_len - trail_len - 799 con->out_msg_pos.data_pos; 800 801 /* 802 * if we are calculating the data crc (the default), we need 803 * to map the page. if our pages[] has been revoked, use the 804 * zero page. 805 */ 806 807 /* have we reached the trail part of the data? */ 808 if (con->out_msg_pos.data_pos >= data_len - trail_len) { 809 in_trail = 1; 810 811 total_max_write = data_len - con->out_msg_pos.data_pos; 812 813 page = list_first_entry(&msg->trail->head, 814 struct page, lru); 815 if (crc) 816 kaddr = kmap(page); 817 max_write = PAGE_SIZE; 818 } else if (msg->pages) { 819 page = msg->pages[con->out_msg_pos.page]; 820 if (crc) 821 kaddr = kmap(page); 822 } else if (msg->pagelist) { 823 page = list_first_entry(&msg->pagelist->head, 824 struct page, lru); 825 if (crc) 826 kaddr = kmap(page); 827 #ifdef CONFIG_BLOCK 828 } else if (msg->bio) { 829 struct bio_vec *bv; 830 831 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg); 832 page = bv->bv_page; 833 page_shift = bv->bv_offset; 834 if (crc) 835 kaddr = kmap(page) + page_shift; 836 max_write = bv->bv_len; 837 #endif 838 } else { 839 page = con->msgr->zero_page; 840 if (crc) 841 kaddr = page_address(con->msgr->zero_page); 842 } 843 len = min_t(int, max_write - con->out_msg_pos.page_pos, 844 total_max_write); 845 846 if (crc && !con->out_msg_pos.did_page_crc) { 847 void *base = kaddr + con->out_msg_pos.page_pos; 848 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 849 850 BUG_ON(kaddr == NULL); 851 con->out_msg->footer.data_crc = 852 cpu_to_le32(crc32c(tmpcrc, base, len)); 853 con->out_msg_pos.did_page_crc = 1; 854 } 855 ret = kernel_sendpage(con->sock, page, 856 con->out_msg_pos.page_pos + page_shift, 857 len, 858 MSG_DONTWAIT | MSG_NOSIGNAL | 859 MSG_MORE); 860 861 if (crc && 862 (msg->pages || msg->pagelist || msg->bio || in_trail)) 863 kunmap(page); 864 865 if (ret == -EAGAIN) 866 ret = 0; 867 if (ret <= 0) 868 goto out; 869 870 con->out_msg_pos.data_pos += ret; 871 con->out_msg_pos.page_pos += ret; 872 if (ret == len) { 873 con->out_msg_pos.page_pos = 0; 874 con->out_msg_pos.page++; 875 con->out_msg_pos.did_page_crc = 0; 876 if (in_trail) 877 list_move_tail(&page->lru, 878 &msg->trail->head); 879 else if (msg->pagelist) 880 list_move_tail(&page->lru, 881 &msg->pagelist->head); 882 #ifdef CONFIG_BLOCK 883 else if (msg->bio) 884 iter_bio_next(&msg->bio_iter, &msg->bio_seg); 885 #endif 886 } 887 } 888 889 dout("write_partial_msg_pages %p msg %p done\n", con, msg); 890 891 /* prepare and queue up footer, too */ 892 if (!crc) 893 con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 894 con->out_kvec_bytes = 0; 895 con->out_kvec_left = 0; 896 con->out_kvec_cur = con->out_kvec; 897 prepare_write_message_footer(con, 0); 898 ret = 1; 899 out: 900 return ret; 901 } 902 903 /* 904 * write some zeros 905 */ 906 static int write_partial_skip(struct ceph_connection *con) 907 { 908 int ret; 909 910 while (con->out_skip > 0) { 911 struct kvec iov = { 912 .iov_base = page_address(con->msgr->zero_page), 913 .iov_len = min(con->out_skip, (int)PAGE_CACHE_SIZE) 914 }; 915 916 ret = ceph_tcp_sendmsg(con->sock, &iov, 1, iov.iov_len, 1); 917 if (ret <= 0) 918 goto out; 919 con->out_skip -= ret; 920 } 921 ret = 1; 922 out: 923 return ret; 924 } 925 926 /* 927 * Prepare to read connection handshake, or an ack. 928 */ 929 static void prepare_read_banner(struct ceph_connection *con) 930 { 931 dout("prepare_read_banner %p\n", con); 932 con->in_base_pos = 0; 933 } 934 935 static void prepare_read_connect(struct ceph_connection *con) 936 { 937 dout("prepare_read_connect %p\n", con); 938 con->in_base_pos = 0; 939 } 940 941 static void prepare_read_ack(struct ceph_connection *con) 942 { 943 dout("prepare_read_ack %p\n", con); 944 con->in_base_pos = 0; 945 } 946 947 static void prepare_read_tag(struct ceph_connection *con) 948 { 949 dout("prepare_read_tag %p\n", con); 950 con->in_base_pos = 0; 951 con->in_tag = CEPH_MSGR_TAG_READY; 952 } 953 954 /* 955 * Prepare to read a message. 956 */ 957 static int prepare_read_message(struct ceph_connection *con) 958 { 959 dout("prepare_read_message %p\n", con); 960 BUG_ON(con->in_msg != NULL); 961 con->in_base_pos = 0; 962 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 963 return 0; 964 } 965 966 967 static int read_partial(struct ceph_connection *con, 968 int *to, int size, void *object) 969 { 970 *to += size; 971 while (con->in_base_pos < *to) { 972 int left = *to - con->in_base_pos; 973 int have = size - left; 974 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 975 if (ret <= 0) 976 return ret; 977 con->in_base_pos += ret; 978 } 979 return 1; 980 } 981 982 983 /* 984 * Read all or part of the connect-side handshake on a new connection 985 */ 986 static int read_partial_banner(struct ceph_connection *con) 987 { 988 int ret, to = 0; 989 990 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 991 992 /* peer's banner */ 993 ret = read_partial(con, &to, strlen(CEPH_BANNER), con->in_banner); 994 if (ret <= 0) 995 goto out; 996 ret = read_partial(con, &to, sizeof(con->actual_peer_addr), 997 &con->actual_peer_addr); 998 if (ret <= 0) 999 goto out; 1000 ret = read_partial(con, &to, sizeof(con->peer_addr_for_me), 1001 &con->peer_addr_for_me); 1002 if (ret <= 0) 1003 goto out; 1004 out: 1005 return ret; 1006 } 1007 1008 static int read_partial_connect(struct ceph_connection *con) 1009 { 1010 int ret, to = 0; 1011 1012 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1013 1014 ret = read_partial(con, &to, sizeof(con->in_reply), &con->in_reply); 1015 if (ret <= 0) 1016 goto out; 1017 ret = read_partial(con, &to, le32_to_cpu(con->in_reply.authorizer_len), 1018 con->auth_reply_buf); 1019 if (ret <= 0) 1020 goto out; 1021 1022 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 1023 con, (int)con->in_reply.tag, 1024 le32_to_cpu(con->in_reply.connect_seq), 1025 le32_to_cpu(con->in_reply.global_seq)); 1026 out: 1027 return ret; 1028 1029 } 1030 1031 /* 1032 * Verify the hello banner looks okay. 1033 */ 1034 static int verify_hello(struct ceph_connection *con) 1035 { 1036 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 1037 pr_err("connect to %s got bad banner\n", 1038 ceph_pr_addr(&con->peer_addr.in_addr)); 1039 con->error_msg = "protocol error, bad banner"; 1040 return -1; 1041 } 1042 return 0; 1043 } 1044 1045 static bool addr_is_blank(struct sockaddr_storage *ss) 1046 { 1047 switch (ss->ss_family) { 1048 case AF_INET: 1049 return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; 1050 case AF_INET6: 1051 return 1052 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && 1053 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && 1054 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && 1055 ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; 1056 } 1057 return false; 1058 } 1059 1060 static int addr_port(struct sockaddr_storage *ss) 1061 { 1062 switch (ss->ss_family) { 1063 case AF_INET: 1064 return ntohs(((struct sockaddr_in *)ss)->sin_port); 1065 case AF_INET6: 1066 return ntohs(((struct sockaddr_in6 *)ss)->sin6_port); 1067 } 1068 return 0; 1069 } 1070 1071 static void addr_set_port(struct sockaddr_storage *ss, int p) 1072 { 1073 switch (ss->ss_family) { 1074 case AF_INET: 1075 ((struct sockaddr_in *)ss)->sin_port = htons(p); 1076 break; 1077 case AF_INET6: 1078 ((struct sockaddr_in6 *)ss)->sin6_port = htons(p); 1079 break; 1080 } 1081 } 1082 1083 /* 1084 * Parse an ip[:port] list into an addr array. Use the default 1085 * monitor port if a port isn't specified. 1086 */ 1087 int ceph_parse_ips(const char *c, const char *end, 1088 struct ceph_entity_addr *addr, 1089 int max_count, int *count) 1090 { 1091 int i; 1092 const char *p = c; 1093 1094 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 1095 for (i = 0; i < max_count; i++) { 1096 const char *ipend; 1097 struct sockaddr_storage *ss = &addr[i].in_addr; 1098 struct sockaddr_in *in4 = (void *)ss; 1099 struct sockaddr_in6 *in6 = (void *)ss; 1100 int port; 1101 char delim = ','; 1102 1103 if (*p == '[') { 1104 delim = ']'; 1105 p++; 1106 } 1107 1108 memset(ss, 0, sizeof(*ss)); 1109 if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr, 1110 delim, &ipend)) 1111 ss->ss_family = AF_INET; 1112 else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr, 1113 delim, &ipend)) 1114 ss->ss_family = AF_INET6; 1115 else 1116 goto bad; 1117 p = ipend; 1118 1119 if (delim == ']') { 1120 if (*p != ']') { 1121 dout("missing matching ']'\n"); 1122 goto bad; 1123 } 1124 p++; 1125 } 1126 1127 /* port? */ 1128 if (p < end && *p == ':') { 1129 port = 0; 1130 p++; 1131 while (p < end && *p >= '0' && *p <= '9') { 1132 port = (port * 10) + (*p - '0'); 1133 p++; 1134 } 1135 if (port > 65535 || port == 0) 1136 goto bad; 1137 } else { 1138 port = CEPH_MON_PORT; 1139 } 1140 1141 addr_set_port(ss, port); 1142 1143 dout("parse_ips got %s\n", ceph_pr_addr(ss)); 1144 1145 if (p == end) 1146 break; 1147 if (*p != ',') 1148 goto bad; 1149 p++; 1150 } 1151 1152 if (p != end) 1153 goto bad; 1154 1155 if (count) 1156 *count = i + 1; 1157 return 0; 1158 1159 bad: 1160 pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c); 1161 return -EINVAL; 1162 } 1163 EXPORT_SYMBOL(ceph_parse_ips); 1164 1165 static int process_banner(struct ceph_connection *con) 1166 { 1167 dout("process_banner on %p\n", con); 1168 1169 if (verify_hello(con) < 0) 1170 return -1; 1171 1172 ceph_decode_addr(&con->actual_peer_addr); 1173 ceph_decode_addr(&con->peer_addr_for_me); 1174 1175 /* 1176 * Make sure the other end is who we wanted. note that the other 1177 * end may not yet know their ip address, so if it's 0.0.0.0, give 1178 * them the benefit of the doubt. 1179 */ 1180 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 1181 sizeof(con->peer_addr)) != 0 && 1182 !(addr_is_blank(&con->actual_peer_addr.in_addr) && 1183 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 1184 pr_warning("wrong peer, want %s/%d, got %s/%d\n", 1185 ceph_pr_addr(&con->peer_addr.in_addr), 1186 (int)le32_to_cpu(con->peer_addr.nonce), 1187 ceph_pr_addr(&con->actual_peer_addr.in_addr), 1188 (int)le32_to_cpu(con->actual_peer_addr.nonce)); 1189 con->error_msg = "wrong peer at address"; 1190 return -1; 1191 } 1192 1193 /* 1194 * did we learn our address? 1195 */ 1196 if (addr_is_blank(&con->msgr->inst.addr.in_addr)) { 1197 int port = addr_port(&con->msgr->inst.addr.in_addr); 1198 1199 memcpy(&con->msgr->inst.addr.in_addr, 1200 &con->peer_addr_for_me.in_addr, 1201 sizeof(con->peer_addr_for_me.in_addr)); 1202 addr_set_port(&con->msgr->inst.addr.in_addr, port); 1203 encode_my_addr(con->msgr); 1204 dout("process_banner learned my addr is %s\n", 1205 ceph_pr_addr(&con->msgr->inst.addr.in_addr)); 1206 } 1207 1208 set_bit(NEGOTIATING, &con->state); 1209 prepare_read_connect(con); 1210 return 0; 1211 } 1212 1213 static void fail_protocol(struct ceph_connection *con) 1214 { 1215 reset_connection(con); 1216 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1217 1218 mutex_unlock(&con->mutex); 1219 if (con->ops->bad_proto) 1220 con->ops->bad_proto(con); 1221 mutex_lock(&con->mutex); 1222 } 1223 1224 static int process_connect(struct ceph_connection *con) 1225 { 1226 u64 sup_feat = con->msgr->supported_features; 1227 u64 req_feat = con->msgr->required_features; 1228 u64 server_feat = le64_to_cpu(con->in_reply.features); 1229 int ret; 1230 1231 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 1232 1233 switch (con->in_reply.tag) { 1234 case CEPH_MSGR_TAG_FEATURES: 1235 pr_err("%s%lld %s feature set mismatch," 1236 " my %llx < server's %llx, missing %llx\n", 1237 ENTITY_NAME(con->peer_name), 1238 ceph_pr_addr(&con->peer_addr.in_addr), 1239 sup_feat, server_feat, server_feat & ~sup_feat); 1240 con->error_msg = "missing required protocol features"; 1241 fail_protocol(con); 1242 return -1; 1243 1244 case CEPH_MSGR_TAG_BADPROTOVER: 1245 pr_err("%s%lld %s protocol version mismatch," 1246 " my %d != server's %d\n", 1247 ENTITY_NAME(con->peer_name), 1248 ceph_pr_addr(&con->peer_addr.in_addr), 1249 le32_to_cpu(con->out_connect.protocol_version), 1250 le32_to_cpu(con->in_reply.protocol_version)); 1251 con->error_msg = "protocol version mismatch"; 1252 fail_protocol(con); 1253 return -1; 1254 1255 case CEPH_MSGR_TAG_BADAUTHORIZER: 1256 con->auth_retry++; 1257 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 1258 con->auth_retry); 1259 if (con->auth_retry == 2) { 1260 con->error_msg = "connect authorization failure"; 1261 return -1; 1262 } 1263 con->auth_retry = 1; 1264 ret = prepare_write_connect(con->msgr, con, 0); 1265 if (ret < 0) 1266 return ret; 1267 prepare_read_connect(con); 1268 break; 1269 1270 case CEPH_MSGR_TAG_RESETSESSION: 1271 /* 1272 * If we connected with a large connect_seq but the peer 1273 * has no record of a session with us (no connection, or 1274 * connect_seq == 0), they will send RESETSESION to indicate 1275 * that they must have reset their session, and may have 1276 * dropped messages. 1277 */ 1278 dout("process_connect got RESET peer seq %u\n", 1279 le32_to_cpu(con->in_connect.connect_seq)); 1280 pr_err("%s%lld %s connection reset\n", 1281 ENTITY_NAME(con->peer_name), 1282 ceph_pr_addr(&con->peer_addr.in_addr)); 1283 reset_connection(con); 1284 prepare_write_connect(con->msgr, con, 0); 1285 prepare_read_connect(con); 1286 1287 /* Tell ceph about it. */ 1288 mutex_unlock(&con->mutex); 1289 pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); 1290 if (con->ops->peer_reset) 1291 con->ops->peer_reset(con); 1292 mutex_lock(&con->mutex); 1293 if (test_bit(CLOSED, &con->state) || 1294 test_bit(OPENING, &con->state)) 1295 return -EAGAIN; 1296 break; 1297 1298 case CEPH_MSGR_TAG_RETRY_SESSION: 1299 /* 1300 * If we sent a smaller connect_seq than the peer has, try 1301 * again with a larger value. 1302 */ 1303 dout("process_connect got RETRY my seq = %u, peer_seq = %u\n", 1304 le32_to_cpu(con->out_connect.connect_seq), 1305 le32_to_cpu(con->in_connect.connect_seq)); 1306 con->connect_seq = le32_to_cpu(con->in_connect.connect_seq); 1307 prepare_write_connect(con->msgr, con, 0); 1308 prepare_read_connect(con); 1309 break; 1310 1311 case CEPH_MSGR_TAG_RETRY_GLOBAL: 1312 /* 1313 * If we sent a smaller global_seq than the peer has, try 1314 * again with a larger value. 1315 */ 1316 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 1317 con->peer_global_seq, 1318 le32_to_cpu(con->in_connect.global_seq)); 1319 get_global_seq(con->msgr, 1320 le32_to_cpu(con->in_connect.global_seq)); 1321 prepare_write_connect(con->msgr, con, 0); 1322 prepare_read_connect(con); 1323 break; 1324 1325 case CEPH_MSGR_TAG_READY: 1326 if (req_feat & ~server_feat) { 1327 pr_err("%s%lld %s protocol feature mismatch," 1328 " my required %llx > server's %llx, need %llx\n", 1329 ENTITY_NAME(con->peer_name), 1330 ceph_pr_addr(&con->peer_addr.in_addr), 1331 req_feat, server_feat, req_feat & ~server_feat); 1332 con->error_msg = "missing required protocol features"; 1333 fail_protocol(con); 1334 return -1; 1335 } 1336 clear_bit(CONNECTING, &con->state); 1337 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1338 con->connect_seq++; 1339 con->peer_features = server_feat; 1340 dout("process_connect got READY gseq %d cseq %d (%d)\n", 1341 con->peer_global_seq, 1342 le32_to_cpu(con->in_reply.connect_seq), 1343 con->connect_seq); 1344 WARN_ON(con->connect_seq != 1345 le32_to_cpu(con->in_reply.connect_seq)); 1346 1347 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1348 set_bit(LOSSYTX, &con->state); 1349 1350 prepare_read_tag(con); 1351 break; 1352 1353 case CEPH_MSGR_TAG_WAIT: 1354 /* 1355 * If there is a connection race (we are opening 1356 * connections to each other), one of us may just have 1357 * to WAIT. This shouldn't happen if we are the 1358 * client. 1359 */ 1360 pr_err("process_connect got WAIT as client\n"); 1361 con->error_msg = "protocol error, got WAIT as client"; 1362 return -1; 1363 1364 default: 1365 pr_err("connect protocol error, will retry\n"); 1366 con->error_msg = "protocol error, garbage tag during connect"; 1367 return -1; 1368 } 1369 return 0; 1370 } 1371 1372 1373 /* 1374 * read (part of) an ack 1375 */ 1376 static int read_partial_ack(struct ceph_connection *con) 1377 { 1378 int to = 0; 1379 1380 return read_partial(con, &to, sizeof(con->in_temp_ack), 1381 &con->in_temp_ack); 1382 } 1383 1384 1385 /* 1386 * We can finally discard anything that's been acked. 1387 */ 1388 static void process_ack(struct ceph_connection *con) 1389 { 1390 struct ceph_msg *m; 1391 u64 ack = le64_to_cpu(con->in_temp_ack); 1392 u64 seq; 1393 1394 while (!list_empty(&con->out_sent)) { 1395 m = list_first_entry(&con->out_sent, struct ceph_msg, 1396 list_head); 1397 seq = le64_to_cpu(m->hdr.seq); 1398 if (seq > ack) 1399 break; 1400 dout("got ack for seq %llu type %d at %p\n", seq, 1401 le16_to_cpu(m->hdr.type), m); 1402 ceph_msg_remove(m); 1403 } 1404 prepare_read_tag(con); 1405 } 1406 1407 1408 1409 1410 static int read_partial_message_section(struct ceph_connection *con, 1411 struct kvec *section, 1412 unsigned int sec_len, u32 *crc) 1413 { 1414 int ret, left; 1415 1416 BUG_ON(!section); 1417 1418 while (section->iov_len < sec_len) { 1419 BUG_ON(section->iov_base == NULL); 1420 left = sec_len - section->iov_len; 1421 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 1422 section->iov_len, left); 1423 if (ret <= 0) 1424 return ret; 1425 section->iov_len += ret; 1426 if (section->iov_len == sec_len) 1427 *crc = crc32c(0, section->iov_base, 1428 section->iov_len); 1429 } 1430 1431 return 1; 1432 } 1433 1434 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1435 struct ceph_msg_header *hdr, 1436 int *skip); 1437 1438 1439 static int read_partial_message_pages(struct ceph_connection *con, 1440 struct page **pages, 1441 unsigned data_len, int datacrc) 1442 { 1443 void *p; 1444 int ret; 1445 int left; 1446 1447 left = min((int)(data_len - con->in_msg_pos.data_pos), 1448 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1449 /* (page) data */ 1450 BUG_ON(pages == NULL); 1451 p = kmap(pages[con->in_msg_pos.page]); 1452 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1453 left); 1454 if (ret > 0 && datacrc) 1455 con->in_data_crc = 1456 crc32c(con->in_data_crc, 1457 p + con->in_msg_pos.page_pos, ret); 1458 kunmap(pages[con->in_msg_pos.page]); 1459 if (ret <= 0) 1460 return ret; 1461 con->in_msg_pos.data_pos += ret; 1462 con->in_msg_pos.page_pos += ret; 1463 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1464 con->in_msg_pos.page_pos = 0; 1465 con->in_msg_pos.page++; 1466 } 1467 1468 return ret; 1469 } 1470 1471 #ifdef CONFIG_BLOCK 1472 static int read_partial_message_bio(struct ceph_connection *con, 1473 struct bio **bio_iter, int *bio_seg, 1474 unsigned data_len, int datacrc) 1475 { 1476 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg); 1477 void *p; 1478 int ret, left; 1479 1480 if (IS_ERR(bv)) 1481 return PTR_ERR(bv); 1482 1483 left = min((int)(data_len - con->in_msg_pos.data_pos), 1484 (int)(bv->bv_len - con->in_msg_pos.page_pos)); 1485 1486 p = kmap(bv->bv_page) + bv->bv_offset; 1487 1488 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1489 left); 1490 if (ret > 0 && datacrc) 1491 con->in_data_crc = 1492 crc32c(con->in_data_crc, 1493 p + con->in_msg_pos.page_pos, ret); 1494 kunmap(bv->bv_page); 1495 if (ret <= 0) 1496 return ret; 1497 con->in_msg_pos.data_pos += ret; 1498 con->in_msg_pos.page_pos += ret; 1499 if (con->in_msg_pos.page_pos == bv->bv_len) { 1500 con->in_msg_pos.page_pos = 0; 1501 iter_bio_next(bio_iter, bio_seg); 1502 } 1503 1504 return ret; 1505 } 1506 #endif 1507 1508 /* 1509 * read (part of) a message. 1510 */ 1511 static int read_partial_message(struct ceph_connection *con) 1512 { 1513 struct ceph_msg *m = con->in_msg; 1514 int ret; 1515 int to, left; 1516 unsigned front_len, middle_len, data_len; 1517 int datacrc = con->msgr->nocrc; 1518 int skip; 1519 u64 seq; 1520 1521 dout("read_partial_message con %p msg %p\n", con, m); 1522 1523 /* header */ 1524 while (con->in_base_pos < sizeof(con->in_hdr)) { 1525 left = sizeof(con->in_hdr) - con->in_base_pos; 1526 ret = ceph_tcp_recvmsg(con->sock, 1527 (char *)&con->in_hdr + con->in_base_pos, 1528 left); 1529 if (ret <= 0) 1530 return ret; 1531 con->in_base_pos += ret; 1532 if (con->in_base_pos == sizeof(con->in_hdr)) { 1533 u32 crc = crc32c(0, (void *)&con->in_hdr, 1534 sizeof(con->in_hdr) - sizeof(con->in_hdr.crc)); 1535 if (crc != le32_to_cpu(con->in_hdr.crc)) { 1536 pr_err("read_partial_message bad hdr " 1537 " crc %u != expected %u\n", 1538 crc, con->in_hdr.crc); 1539 return -EBADMSG; 1540 } 1541 } 1542 } 1543 front_len = le32_to_cpu(con->in_hdr.front_len); 1544 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1545 return -EIO; 1546 middle_len = le32_to_cpu(con->in_hdr.middle_len); 1547 if (middle_len > CEPH_MSG_MAX_DATA_LEN) 1548 return -EIO; 1549 data_len = le32_to_cpu(con->in_hdr.data_len); 1550 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1551 return -EIO; 1552 1553 /* verify seq# */ 1554 seq = le64_to_cpu(con->in_hdr.seq); 1555 if ((s64)seq - (s64)con->in_seq < 1) { 1556 pr_info("skipping %s%lld %s seq %lld expected %lld\n", 1557 ENTITY_NAME(con->peer_name), 1558 ceph_pr_addr(&con->peer_addr.in_addr), 1559 seq, con->in_seq + 1); 1560 con->in_base_pos = -front_len - middle_len - data_len - 1561 sizeof(m->footer); 1562 con->in_tag = CEPH_MSGR_TAG_READY; 1563 return 0; 1564 } else if ((s64)seq - (s64)con->in_seq > 1) { 1565 pr_err("read_partial_message bad seq %lld expected %lld\n", 1566 seq, con->in_seq + 1); 1567 con->error_msg = "bad message sequence # for incoming message"; 1568 return -EBADMSG; 1569 } 1570 1571 /* allocate message? */ 1572 if (!con->in_msg) { 1573 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1574 con->in_hdr.front_len, con->in_hdr.data_len); 1575 skip = 0; 1576 con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip); 1577 if (skip) { 1578 /* skip this message */ 1579 dout("alloc_msg said skip message\n"); 1580 BUG_ON(con->in_msg); 1581 con->in_base_pos = -front_len - middle_len - data_len - 1582 sizeof(m->footer); 1583 con->in_tag = CEPH_MSGR_TAG_READY; 1584 con->in_seq++; 1585 return 0; 1586 } 1587 if (!con->in_msg) { 1588 con->error_msg = 1589 "error allocating memory for incoming message"; 1590 return -ENOMEM; 1591 } 1592 m = con->in_msg; 1593 m->front.iov_len = 0; /* haven't read it yet */ 1594 if (m->middle) 1595 m->middle->vec.iov_len = 0; 1596 1597 con->in_msg_pos.page = 0; 1598 if (m->pages) 1599 con->in_msg_pos.page_pos = m->page_alignment; 1600 else 1601 con->in_msg_pos.page_pos = 0; 1602 con->in_msg_pos.data_pos = 0; 1603 } 1604 1605 /* front */ 1606 ret = read_partial_message_section(con, &m->front, front_len, 1607 &con->in_front_crc); 1608 if (ret <= 0) 1609 return ret; 1610 1611 /* middle */ 1612 if (m->middle) { 1613 ret = read_partial_message_section(con, &m->middle->vec, 1614 middle_len, 1615 &con->in_middle_crc); 1616 if (ret <= 0) 1617 return ret; 1618 } 1619 #ifdef CONFIG_BLOCK 1620 if (m->bio && !m->bio_iter) 1621 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg); 1622 #endif 1623 1624 /* (page) data */ 1625 while (con->in_msg_pos.data_pos < data_len) { 1626 if (m->pages) { 1627 ret = read_partial_message_pages(con, m->pages, 1628 data_len, datacrc); 1629 if (ret <= 0) 1630 return ret; 1631 #ifdef CONFIG_BLOCK 1632 } else if (m->bio) { 1633 1634 ret = read_partial_message_bio(con, 1635 &m->bio_iter, &m->bio_seg, 1636 data_len, datacrc); 1637 if (ret <= 0) 1638 return ret; 1639 #endif 1640 } else { 1641 BUG_ON(1); 1642 } 1643 } 1644 1645 /* footer */ 1646 to = sizeof(m->hdr) + sizeof(m->footer); 1647 while (con->in_base_pos < to) { 1648 left = to - con->in_base_pos; 1649 ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer + 1650 (con->in_base_pos - sizeof(m->hdr)), 1651 left); 1652 if (ret <= 0) 1653 return ret; 1654 con->in_base_pos += ret; 1655 } 1656 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1657 m, front_len, m->footer.front_crc, middle_len, 1658 m->footer.middle_crc, data_len, m->footer.data_crc); 1659 1660 /* crc ok? */ 1661 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 1662 pr_err("read_partial_message %p front crc %u != exp. %u\n", 1663 m, con->in_front_crc, m->footer.front_crc); 1664 return -EBADMSG; 1665 } 1666 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 1667 pr_err("read_partial_message %p middle crc %u != exp %u\n", 1668 m, con->in_middle_crc, m->footer.middle_crc); 1669 return -EBADMSG; 1670 } 1671 if (datacrc && 1672 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 1673 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 1674 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 1675 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 1676 return -EBADMSG; 1677 } 1678 1679 return 1; /* done! */ 1680 } 1681 1682 /* 1683 * Process message. This happens in the worker thread. The callback should 1684 * be careful not to do anything that waits on other incoming messages or it 1685 * may deadlock. 1686 */ 1687 static void process_message(struct ceph_connection *con) 1688 { 1689 struct ceph_msg *msg; 1690 1691 msg = con->in_msg; 1692 con->in_msg = NULL; 1693 1694 /* if first message, set peer_name */ 1695 if (con->peer_name.type == 0) 1696 con->peer_name = msg->hdr.src; 1697 1698 con->in_seq++; 1699 mutex_unlock(&con->mutex); 1700 1701 dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", 1702 msg, le64_to_cpu(msg->hdr.seq), 1703 ENTITY_NAME(msg->hdr.src), 1704 le16_to_cpu(msg->hdr.type), 1705 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1706 le32_to_cpu(msg->hdr.front_len), 1707 le32_to_cpu(msg->hdr.data_len), 1708 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 1709 con->ops->dispatch(con, msg); 1710 1711 mutex_lock(&con->mutex); 1712 prepare_read_tag(con); 1713 } 1714 1715 1716 /* 1717 * Write something to the socket. Called in a worker thread when the 1718 * socket appears to be writeable and we have something ready to send. 1719 */ 1720 static int try_write(struct ceph_connection *con) 1721 { 1722 struct ceph_messenger *msgr = con->msgr; 1723 int ret = 1; 1724 1725 dout("try_write start %p state %lu nref %d\n", con, con->state, 1726 atomic_read(&con->nref)); 1727 1728 more: 1729 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1730 1731 /* open the socket first? */ 1732 if (con->sock == NULL) { 1733 prepare_write_banner(msgr, con); 1734 prepare_write_connect(msgr, con, 1); 1735 prepare_read_banner(con); 1736 set_bit(CONNECTING, &con->state); 1737 clear_bit(NEGOTIATING, &con->state); 1738 1739 BUG_ON(con->in_msg); 1740 con->in_tag = CEPH_MSGR_TAG_READY; 1741 dout("try_write initiating connect on %p new state %lu\n", 1742 con, con->state); 1743 con->sock = ceph_tcp_connect(con); 1744 if (IS_ERR(con->sock)) { 1745 con->sock = NULL; 1746 con->error_msg = "connect error"; 1747 ret = -1; 1748 goto out; 1749 } 1750 } 1751 1752 more_kvec: 1753 /* kvec data queued? */ 1754 if (con->out_skip) { 1755 ret = write_partial_skip(con); 1756 if (ret <= 0) 1757 goto out; 1758 } 1759 if (con->out_kvec_left) { 1760 ret = write_partial_kvec(con); 1761 if (ret <= 0) 1762 goto out; 1763 } 1764 1765 /* msg pages? */ 1766 if (con->out_msg) { 1767 if (con->out_msg_done) { 1768 ceph_msg_put(con->out_msg); 1769 con->out_msg = NULL; /* we're done with this one */ 1770 goto do_next; 1771 } 1772 1773 ret = write_partial_msg_pages(con); 1774 if (ret == 1) 1775 goto more_kvec; /* we need to send the footer, too! */ 1776 if (ret == 0) 1777 goto out; 1778 if (ret < 0) { 1779 dout("try_write write_partial_msg_pages err %d\n", 1780 ret); 1781 goto out; 1782 } 1783 } 1784 1785 do_next: 1786 if (!test_bit(CONNECTING, &con->state)) { 1787 /* is anything else pending? */ 1788 if (!list_empty(&con->out_queue)) { 1789 prepare_write_message(con); 1790 goto more; 1791 } 1792 if (con->in_seq > con->in_seq_acked) { 1793 prepare_write_ack(con); 1794 goto more; 1795 } 1796 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { 1797 prepare_write_keepalive(con); 1798 goto more; 1799 } 1800 } 1801 1802 /* Nothing to do! */ 1803 clear_bit(WRITE_PENDING, &con->state); 1804 dout("try_write nothing else to write.\n"); 1805 ret = 0; 1806 out: 1807 dout("try_write done on %p ret %d\n", con, ret); 1808 return ret; 1809 } 1810 1811 1812 1813 /* 1814 * Read what we can from the socket. 1815 */ 1816 static int try_read(struct ceph_connection *con) 1817 { 1818 int ret = -1; 1819 1820 if (!con->sock) 1821 return 0; 1822 1823 if (test_bit(STANDBY, &con->state)) 1824 return 0; 1825 1826 dout("try_read start on %p\n", con); 1827 1828 more: 1829 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1830 con->in_base_pos); 1831 1832 /* 1833 * process_connect and process_message drop and re-take 1834 * con->mutex. make sure we handle a racing close or reopen. 1835 */ 1836 if (test_bit(CLOSED, &con->state) || 1837 test_bit(OPENING, &con->state)) { 1838 ret = -EAGAIN; 1839 goto out; 1840 } 1841 1842 if (test_bit(CONNECTING, &con->state)) { 1843 if (!test_bit(NEGOTIATING, &con->state)) { 1844 dout("try_read connecting\n"); 1845 ret = read_partial_banner(con); 1846 if (ret <= 0) 1847 goto out; 1848 ret = process_banner(con); 1849 if (ret < 0) 1850 goto out; 1851 } 1852 ret = read_partial_connect(con); 1853 if (ret <= 0) 1854 goto out; 1855 ret = process_connect(con); 1856 if (ret < 0) 1857 goto out; 1858 goto more; 1859 } 1860 1861 if (con->in_base_pos < 0) { 1862 /* 1863 * skipping + discarding content. 1864 * 1865 * FIXME: there must be a better way to do this! 1866 */ 1867 static char buf[1024]; 1868 int skip = min(1024, -con->in_base_pos); 1869 dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); 1870 ret = ceph_tcp_recvmsg(con->sock, buf, skip); 1871 if (ret <= 0) 1872 goto out; 1873 con->in_base_pos += ret; 1874 if (con->in_base_pos) 1875 goto more; 1876 } 1877 if (con->in_tag == CEPH_MSGR_TAG_READY) { 1878 /* 1879 * what's next? 1880 */ 1881 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 1882 if (ret <= 0) 1883 goto out; 1884 dout("try_read got tag %d\n", (int)con->in_tag); 1885 switch (con->in_tag) { 1886 case CEPH_MSGR_TAG_MSG: 1887 prepare_read_message(con); 1888 break; 1889 case CEPH_MSGR_TAG_ACK: 1890 prepare_read_ack(con); 1891 break; 1892 case CEPH_MSGR_TAG_CLOSE: 1893 set_bit(CLOSED, &con->state); /* fixme */ 1894 goto out; 1895 default: 1896 goto bad_tag; 1897 } 1898 } 1899 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 1900 ret = read_partial_message(con); 1901 if (ret <= 0) { 1902 switch (ret) { 1903 case -EBADMSG: 1904 con->error_msg = "bad crc"; 1905 ret = -EIO; 1906 break; 1907 case -EIO: 1908 con->error_msg = "io error"; 1909 break; 1910 } 1911 goto out; 1912 } 1913 if (con->in_tag == CEPH_MSGR_TAG_READY) 1914 goto more; 1915 process_message(con); 1916 goto more; 1917 } 1918 if (con->in_tag == CEPH_MSGR_TAG_ACK) { 1919 ret = read_partial_ack(con); 1920 if (ret <= 0) 1921 goto out; 1922 process_ack(con); 1923 goto more; 1924 } 1925 1926 out: 1927 dout("try_read done on %p ret %d\n", con, ret); 1928 return ret; 1929 1930 bad_tag: 1931 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 1932 con->error_msg = "protocol error, garbage tag"; 1933 ret = -1; 1934 goto out; 1935 } 1936 1937 1938 /* 1939 * Atomically queue work on a connection. Bump @con reference to 1940 * avoid races with connection teardown. 1941 */ 1942 static void queue_con(struct ceph_connection *con) 1943 { 1944 if (test_bit(DEAD, &con->state)) { 1945 dout("queue_con %p ignoring: DEAD\n", 1946 con); 1947 return; 1948 } 1949 1950 if (!con->ops->get(con)) { 1951 dout("queue_con %p ref count 0\n", con); 1952 return; 1953 } 1954 1955 if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { 1956 dout("queue_con %p - already queued\n", con); 1957 con->ops->put(con); 1958 } else { 1959 dout("queue_con %p\n", con); 1960 } 1961 } 1962 1963 /* 1964 * Do some work on a connection. Drop a connection ref when we're done. 1965 */ 1966 static void con_work(struct work_struct *work) 1967 { 1968 struct ceph_connection *con = container_of(work, struct ceph_connection, 1969 work.work); 1970 int ret; 1971 1972 mutex_lock(&con->mutex); 1973 restart: 1974 if (test_and_clear_bit(BACKOFF, &con->state)) { 1975 dout("con_work %p backing off\n", con); 1976 if (queue_delayed_work(ceph_msgr_wq, &con->work, 1977 round_jiffies_relative(con->delay))) { 1978 dout("con_work %p backoff %lu\n", con, con->delay); 1979 mutex_unlock(&con->mutex); 1980 return; 1981 } else { 1982 con->ops->put(con); 1983 dout("con_work %p FAILED to back off %lu\n", con, 1984 con->delay); 1985 } 1986 } 1987 1988 if (test_bit(STANDBY, &con->state)) { 1989 dout("con_work %p STANDBY\n", con); 1990 goto done; 1991 } 1992 if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */ 1993 dout("con_work CLOSED\n"); 1994 con_close_socket(con); 1995 goto done; 1996 } 1997 if (test_and_clear_bit(OPENING, &con->state)) { 1998 /* reopen w/ new peer */ 1999 dout("con_work OPENING\n"); 2000 con_close_socket(con); 2001 } 2002 2003 if (test_and_clear_bit(SOCK_CLOSED, &con->state)) 2004 goto fault; 2005 2006 ret = try_read(con); 2007 if (ret == -EAGAIN) 2008 goto restart; 2009 if (ret < 0) 2010 goto fault; 2011 2012 ret = try_write(con); 2013 if (ret == -EAGAIN) 2014 goto restart; 2015 if (ret < 0) 2016 goto fault; 2017 2018 done: 2019 mutex_unlock(&con->mutex); 2020 done_unlocked: 2021 con->ops->put(con); 2022 return; 2023 2024 fault: 2025 mutex_unlock(&con->mutex); 2026 ceph_fault(con); /* error/fault path */ 2027 goto done_unlocked; 2028 } 2029 2030 2031 /* 2032 * Generic error/fault handler. A retry mechanism is used with 2033 * exponential backoff 2034 */ 2035 static void ceph_fault(struct ceph_connection *con) 2036 { 2037 pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2038 ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); 2039 dout("fault %p state %lu to peer %s\n", 2040 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2041 2042 if (test_bit(LOSSYTX, &con->state)) { 2043 dout("fault on LOSSYTX channel\n"); 2044 goto out; 2045 } 2046 2047 mutex_lock(&con->mutex); 2048 if (test_bit(CLOSED, &con->state)) 2049 goto out_unlock; 2050 2051 con_close_socket(con); 2052 2053 if (con->in_msg) { 2054 ceph_msg_put(con->in_msg); 2055 con->in_msg = NULL; 2056 } 2057 2058 /* Requeue anything that hasn't been acked */ 2059 list_splice_init(&con->out_sent, &con->out_queue); 2060 2061 /* If there are no messages queued or keepalive pending, place 2062 * the connection in a STANDBY state */ 2063 if (list_empty(&con->out_queue) && 2064 !test_bit(KEEPALIVE_PENDING, &con->state)) { 2065 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2066 clear_bit(WRITE_PENDING, &con->state); 2067 set_bit(STANDBY, &con->state); 2068 } else { 2069 /* retry after a delay. */ 2070 if (con->delay == 0) 2071 con->delay = BASE_DELAY_INTERVAL; 2072 else if (con->delay < MAX_DELAY_INTERVAL) 2073 con->delay *= 2; 2074 con->ops->get(con); 2075 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2076 round_jiffies_relative(con->delay))) { 2077 dout("fault queued %p delay %lu\n", con, con->delay); 2078 } else { 2079 con->ops->put(con); 2080 dout("fault failed to queue %p delay %lu, backoff\n", 2081 con, con->delay); 2082 /* 2083 * In many cases we see a socket state change 2084 * while con_work is running and end up 2085 * queuing (non-delayed) work, such that we 2086 * can't backoff with a delay. Set a flag so 2087 * that when con_work restarts we schedule the 2088 * delay then. 2089 */ 2090 set_bit(BACKOFF, &con->state); 2091 } 2092 } 2093 2094 out_unlock: 2095 mutex_unlock(&con->mutex); 2096 out: 2097 /* 2098 * in case we faulted due to authentication, invalidate our 2099 * current tickets so that we can get new ones. 2100 */ 2101 if (con->auth_retry && con->ops->invalidate_authorizer) { 2102 dout("calling invalidate_authorizer()\n"); 2103 con->ops->invalidate_authorizer(con); 2104 } 2105 2106 if (con->ops->fault) 2107 con->ops->fault(con); 2108 } 2109 2110 2111 2112 /* 2113 * create a new messenger instance 2114 */ 2115 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr, 2116 u32 supported_features, 2117 u32 required_features) 2118 { 2119 struct ceph_messenger *msgr; 2120 2121 msgr = kzalloc(sizeof(*msgr), GFP_KERNEL); 2122 if (msgr == NULL) 2123 return ERR_PTR(-ENOMEM); 2124 2125 msgr->supported_features = supported_features; 2126 msgr->required_features = required_features; 2127 2128 spin_lock_init(&msgr->global_seq_lock); 2129 2130 /* the zero page is needed if a request is "canceled" while the message 2131 * is being written over the socket */ 2132 msgr->zero_page = __page_cache_alloc(GFP_KERNEL | __GFP_ZERO); 2133 if (!msgr->zero_page) { 2134 kfree(msgr); 2135 return ERR_PTR(-ENOMEM); 2136 } 2137 kmap(msgr->zero_page); 2138 2139 if (myaddr) 2140 msgr->inst.addr = *myaddr; 2141 2142 /* select a random nonce */ 2143 msgr->inst.addr.type = 0; 2144 get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); 2145 encode_my_addr(msgr); 2146 2147 dout("messenger_create %p\n", msgr); 2148 return msgr; 2149 } 2150 EXPORT_SYMBOL(ceph_messenger_create); 2151 2152 void ceph_messenger_destroy(struct ceph_messenger *msgr) 2153 { 2154 dout("destroy %p\n", msgr); 2155 kunmap(msgr->zero_page); 2156 __free_page(msgr->zero_page); 2157 kfree(msgr); 2158 dout("destroyed messenger %p\n", msgr); 2159 } 2160 EXPORT_SYMBOL(ceph_messenger_destroy); 2161 2162 static void clear_standby(struct ceph_connection *con) 2163 { 2164 /* come back from STANDBY? */ 2165 if (test_and_clear_bit(STANDBY, &con->state)) { 2166 mutex_lock(&con->mutex); 2167 dout("clear_standby %p and ++connect_seq\n", con); 2168 con->connect_seq++; 2169 WARN_ON(test_bit(WRITE_PENDING, &con->state)); 2170 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); 2171 mutex_unlock(&con->mutex); 2172 } 2173 } 2174 2175 /* 2176 * Queue up an outgoing message on the given connection. 2177 */ 2178 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 2179 { 2180 if (test_bit(CLOSED, &con->state)) { 2181 dout("con_send %p closed, dropping %p\n", con, msg); 2182 ceph_msg_put(msg); 2183 return; 2184 } 2185 2186 /* set src+dst */ 2187 msg->hdr.src = con->msgr->inst.name; 2188 2189 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 2190 2191 msg->needs_out_seq = true; 2192 2193 /* queue */ 2194 mutex_lock(&con->mutex); 2195 BUG_ON(!list_empty(&msg->list_head)); 2196 list_add_tail(&msg->list_head, &con->out_queue); 2197 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 2198 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), 2199 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 2200 le32_to_cpu(msg->hdr.front_len), 2201 le32_to_cpu(msg->hdr.middle_len), 2202 le32_to_cpu(msg->hdr.data_len)); 2203 mutex_unlock(&con->mutex); 2204 2205 /* if there wasn't anything waiting to send before, queue 2206 * new work */ 2207 clear_standby(con); 2208 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2209 queue_con(con); 2210 } 2211 EXPORT_SYMBOL(ceph_con_send); 2212 2213 /* 2214 * Revoke a message that was previously queued for send 2215 */ 2216 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) 2217 { 2218 mutex_lock(&con->mutex); 2219 if (!list_empty(&msg->list_head)) { 2220 dout("con_revoke %p msg %p - was on queue\n", con, msg); 2221 list_del_init(&msg->list_head); 2222 ceph_msg_put(msg); 2223 msg->hdr.seq = 0; 2224 } 2225 if (con->out_msg == msg) { 2226 dout("con_revoke %p msg %p - was sending\n", con, msg); 2227 con->out_msg = NULL; 2228 if (con->out_kvec_is_msg) { 2229 con->out_skip = con->out_kvec_bytes; 2230 con->out_kvec_is_msg = false; 2231 } 2232 ceph_msg_put(msg); 2233 msg->hdr.seq = 0; 2234 } 2235 mutex_unlock(&con->mutex); 2236 } 2237 2238 /* 2239 * Revoke a message that we may be reading data into 2240 */ 2241 void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg) 2242 { 2243 mutex_lock(&con->mutex); 2244 if (con->in_msg && con->in_msg == msg) { 2245 unsigned front_len = le32_to_cpu(con->in_hdr.front_len); 2246 unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len); 2247 unsigned data_len = le32_to_cpu(con->in_hdr.data_len); 2248 2249 /* skip rest of message */ 2250 dout("con_revoke_pages %p msg %p revoked\n", con, msg); 2251 con->in_base_pos = con->in_base_pos - 2252 sizeof(struct ceph_msg_header) - 2253 front_len - 2254 middle_len - 2255 data_len - 2256 sizeof(struct ceph_msg_footer); 2257 ceph_msg_put(con->in_msg); 2258 con->in_msg = NULL; 2259 con->in_tag = CEPH_MSGR_TAG_READY; 2260 con->in_seq++; 2261 } else { 2262 dout("con_revoke_pages %p msg %p pages %p no-op\n", 2263 con, con->in_msg, msg); 2264 } 2265 mutex_unlock(&con->mutex); 2266 } 2267 2268 /* 2269 * Queue a keepalive byte to ensure the tcp connection is alive. 2270 */ 2271 void ceph_con_keepalive(struct ceph_connection *con) 2272 { 2273 dout("con_keepalive %p\n", con); 2274 clear_standby(con); 2275 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2276 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2277 queue_con(con); 2278 } 2279 EXPORT_SYMBOL(ceph_con_keepalive); 2280 2281 2282 /* 2283 * construct a new message with given type, size 2284 * the new msg has a ref count of 1. 2285 */ 2286 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags) 2287 { 2288 struct ceph_msg *m; 2289 2290 m = kmalloc(sizeof(*m), flags); 2291 if (m == NULL) 2292 goto out; 2293 kref_init(&m->kref); 2294 INIT_LIST_HEAD(&m->list_head); 2295 2296 m->hdr.tid = 0; 2297 m->hdr.type = cpu_to_le16(type); 2298 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 2299 m->hdr.version = 0; 2300 m->hdr.front_len = cpu_to_le32(front_len); 2301 m->hdr.middle_len = 0; 2302 m->hdr.data_len = 0; 2303 m->hdr.data_off = 0; 2304 m->hdr.reserved = 0; 2305 m->footer.front_crc = 0; 2306 m->footer.middle_crc = 0; 2307 m->footer.data_crc = 0; 2308 m->footer.flags = 0; 2309 m->front_max = front_len; 2310 m->front_is_vmalloc = false; 2311 m->more_to_follow = false; 2312 m->pool = NULL; 2313 2314 /* middle */ 2315 m->middle = NULL; 2316 2317 /* data */ 2318 m->nr_pages = 0; 2319 m->page_alignment = 0; 2320 m->pages = NULL; 2321 m->pagelist = NULL; 2322 m->bio = NULL; 2323 m->bio_iter = NULL; 2324 m->bio_seg = 0; 2325 m->trail = NULL; 2326 2327 /* front */ 2328 if (front_len) { 2329 if (front_len > PAGE_CACHE_SIZE) { 2330 m->front.iov_base = __vmalloc(front_len, flags, 2331 PAGE_KERNEL); 2332 m->front_is_vmalloc = true; 2333 } else { 2334 m->front.iov_base = kmalloc(front_len, flags); 2335 } 2336 if (m->front.iov_base == NULL) { 2337 pr_err("msg_new can't allocate %d bytes\n", 2338 front_len); 2339 goto out2; 2340 } 2341 } else { 2342 m->front.iov_base = NULL; 2343 } 2344 m->front.iov_len = front_len; 2345 2346 dout("ceph_msg_new %p front %d\n", m, front_len); 2347 return m; 2348 2349 out2: 2350 ceph_msg_put(m); 2351 out: 2352 pr_err("msg_new can't create type %d front %d\n", type, front_len); 2353 return NULL; 2354 } 2355 EXPORT_SYMBOL(ceph_msg_new); 2356 2357 /* 2358 * Allocate "middle" portion of a message, if it is needed and wasn't 2359 * allocated by alloc_msg. This allows us to read a small fixed-size 2360 * per-type header in the front and then gracefully fail (i.e., 2361 * propagate the error to the caller based on info in the front) when 2362 * the middle is too large. 2363 */ 2364 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 2365 { 2366 int type = le16_to_cpu(msg->hdr.type); 2367 int middle_len = le32_to_cpu(msg->hdr.middle_len); 2368 2369 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, 2370 ceph_msg_type_name(type), middle_len); 2371 BUG_ON(!middle_len); 2372 BUG_ON(msg->middle); 2373 2374 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS); 2375 if (!msg->middle) 2376 return -ENOMEM; 2377 return 0; 2378 } 2379 2380 /* 2381 * Generic message allocator, for incoming messages. 2382 */ 2383 static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 2384 struct ceph_msg_header *hdr, 2385 int *skip) 2386 { 2387 int type = le16_to_cpu(hdr->type); 2388 int front_len = le32_to_cpu(hdr->front_len); 2389 int middle_len = le32_to_cpu(hdr->middle_len); 2390 struct ceph_msg *msg = NULL; 2391 int ret; 2392 2393 if (con->ops->alloc_msg) { 2394 mutex_unlock(&con->mutex); 2395 msg = con->ops->alloc_msg(con, hdr, skip); 2396 mutex_lock(&con->mutex); 2397 if (!msg || *skip) 2398 return NULL; 2399 } 2400 if (!msg) { 2401 *skip = 0; 2402 msg = ceph_msg_new(type, front_len, GFP_NOFS); 2403 if (!msg) { 2404 pr_err("unable to allocate msg type %d len %d\n", 2405 type, front_len); 2406 return NULL; 2407 } 2408 msg->page_alignment = le16_to_cpu(hdr->data_off); 2409 } 2410 memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr)); 2411 2412 if (middle_len && !msg->middle) { 2413 ret = ceph_alloc_middle(con, msg); 2414 if (ret < 0) { 2415 ceph_msg_put(msg); 2416 return NULL; 2417 } 2418 } 2419 2420 return msg; 2421 } 2422 2423 2424 /* 2425 * Free a generically kmalloc'd message. 2426 */ 2427 void ceph_msg_kfree(struct ceph_msg *m) 2428 { 2429 dout("msg_kfree %p\n", m); 2430 if (m->front_is_vmalloc) 2431 vfree(m->front.iov_base); 2432 else 2433 kfree(m->front.iov_base); 2434 kfree(m); 2435 } 2436 2437 /* 2438 * Drop a msg ref. Destroy as needed. 2439 */ 2440 void ceph_msg_last_put(struct kref *kref) 2441 { 2442 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 2443 2444 dout("ceph_msg_put last one on %p\n", m); 2445 WARN_ON(!list_empty(&m->list_head)); 2446 2447 /* drop middle, data, if any */ 2448 if (m->middle) { 2449 ceph_buffer_put(m->middle); 2450 m->middle = NULL; 2451 } 2452 m->nr_pages = 0; 2453 m->pages = NULL; 2454 2455 if (m->pagelist) { 2456 ceph_pagelist_release(m->pagelist); 2457 kfree(m->pagelist); 2458 m->pagelist = NULL; 2459 } 2460 2461 m->trail = NULL; 2462 2463 if (m->pool) 2464 ceph_msgpool_put(m->pool, m); 2465 else 2466 ceph_msg_kfree(m); 2467 } 2468 EXPORT_SYMBOL(ceph_msg_last_put); 2469 2470 void ceph_msg_dump(struct ceph_msg *msg) 2471 { 2472 pr_debug("msg_dump %p (front_max %d nr_pages %d)\n", msg, 2473 msg->front_max, msg->nr_pages); 2474 print_hex_dump(KERN_DEBUG, "header: ", 2475 DUMP_PREFIX_OFFSET, 16, 1, 2476 &msg->hdr, sizeof(msg->hdr), true); 2477 print_hex_dump(KERN_DEBUG, " front: ", 2478 DUMP_PREFIX_OFFSET, 16, 1, 2479 msg->front.iov_base, msg->front.iov_len, true); 2480 if (msg->middle) 2481 print_hex_dump(KERN_DEBUG, "middle: ", 2482 DUMP_PREFIX_OFFSET, 16, 1, 2483 msg->middle->vec.iov_base, 2484 msg->middle->vec.iov_len, true); 2485 print_hex_dump(KERN_DEBUG, "footer: ", 2486 DUMP_PREFIX_OFFSET, 16, 1, 2487 &msg->footer, sizeof(msg->footer), true); 2488 } 2489 EXPORT_SYMBOL(ceph_msg_dump); 2490