1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/crc32c.h> 5 #include <linux/ctype.h> 6 #include <linux/highmem.h> 7 #include <linux/inet.h> 8 #include <linux/kthread.h> 9 #include <linux/net.h> 10 #include <linux/nsproxy.h> 11 #include <linux/sched/mm.h> 12 #include <linux/slab.h> 13 #include <linux/socket.h> 14 #include <linux/string.h> 15 #ifdef CONFIG_BLOCK 16 #include <linux/bio.h> 17 #endif /* CONFIG_BLOCK */ 18 #include <linux/dns_resolver.h> 19 #include <net/tcp.h> 20 21 #include <linux/ceph/ceph_features.h> 22 #include <linux/ceph/libceph.h> 23 #include <linux/ceph/messenger.h> 24 #include <linux/ceph/decode.h> 25 #include <linux/ceph/pagelist.h> 26 #include <linux/export.h> 27 28 /* 29 * Ceph uses the messenger to exchange ceph_msg messages with other 30 * hosts in the system. The messenger provides ordered and reliable 31 * delivery. We tolerate TCP disconnects by reconnecting (with 32 * exponential backoff) in the case of a fault (disconnection, bad 33 * crc, protocol error). Acks allow sent messages to be discarded by 34 * the sender. 35 */ 36 37 /* 38 * We track the state of the socket on a given connection using 39 * values defined below. The transition to a new socket state is 40 * handled by a function which verifies we aren't coming from an 41 * unexpected state. 42 * 43 * -------- 44 * | NEW* | transient initial state 45 * -------- 46 * | con_sock_state_init() 47 * v 48 * ---------- 49 * | CLOSED | initialized, but no socket (and no 50 * ---------- TCP connection) 51 * ^ \ 52 * | \ con_sock_state_connecting() 53 * | ---------------------- 54 * | \ 55 * + con_sock_state_closed() \ 56 * |+--------------------------- \ 57 * | \ \ \ 58 * | ----------- \ \ 59 * | | CLOSING | socket event; \ \ 60 * | ----------- await close \ \ 61 * | ^ \ | 62 * | | \ | 63 * | + con_sock_state_closing() \ | 64 * | / \ | | 65 * | / --------------- | | 66 * | / \ v v 67 * | / -------------- 68 * | / -----------------| CONNECTING | socket created, TCP 69 * | | / -------------- connect initiated 70 * | | | con_sock_state_connected() 71 * | | v 72 * ------------- 73 * | CONNECTED | TCP connection established 74 * ------------- 75 * 76 * State values for ceph_connection->sock_state; NEW is assumed to be 0. 77 */ 78 79 #define CON_SOCK_STATE_NEW 0 /* -> CLOSED */ 80 #define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */ 81 #define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */ 82 #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ 83 #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ 84 85 static bool con_flag_valid(unsigned long con_flag) 86 { 87 switch (con_flag) { 88 case CEPH_CON_F_LOSSYTX: 89 case CEPH_CON_F_KEEPALIVE_PENDING: 90 case CEPH_CON_F_WRITE_PENDING: 91 case CEPH_CON_F_SOCK_CLOSED: 92 case CEPH_CON_F_BACKOFF: 93 return true; 94 default: 95 return false; 96 } 97 } 98 99 void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag) 100 { 101 BUG_ON(!con_flag_valid(con_flag)); 102 103 clear_bit(con_flag, &con->flags); 104 } 105 106 void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag) 107 { 108 BUG_ON(!con_flag_valid(con_flag)); 109 110 set_bit(con_flag, &con->flags); 111 } 112 113 bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag) 114 { 115 BUG_ON(!con_flag_valid(con_flag)); 116 117 return test_bit(con_flag, &con->flags); 118 } 119 120 bool ceph_con_flag_test_and_clear(struct ceph_connection *con, 121 unsigned long con_flag) 122 { 123 BUG_ON(!con_flag_valid(con_flag)); 124 125 return test_and_clear_bit(con_flag, &con->flags); 126 } 127 128 bool ceph_con_flag_test_and_set(struct ceph_connection *con, 129 unsigned long con_flag) 130 { 131 BUG_ON(!con_flag_valid(con_flag)); 132 133 return test_and_set_bit(con_flag, &con->flags); 134 } 135 136 /* Slab caches for frequently-allocated structures */ 137 138 static struct kmem_cache *ceph_msg_cache; 139 140 /* static tag bytes (protocol control messages) */ 141 static char tag_msg = CEPH_MSGR_TAG_MSG; 142 static char tag_ack = CEPH_MSGR_TAG_ACK; 143 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 144 static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; 145 146 #ifdef CONFIG_LOCKDEP 147 static struct lock_class_key socket_class; 148 #endif 149 150 static void queue_con(struct ceph_connection *con); 151 static void cancel_con(struct ceph_connection *con); 152 static void ceph_con_workfn(struct work_struct *); 153 static void con_fault(struct ceph_connection *con); 154 155 /* 156 * Nicely render a sockaddr as a string. An array of formatted 157 * strings is used, to approximate reentrancy. 158 */ 159 #define ADDR_STR_COUNT_LOG 5 /* log2(# address strings in array) */ 160 #define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG) 161 #define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1) 162 #define MAX_ADDR_STR_LEN 64 /* 54 is enough */ 163 164 static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN]; 165 static atomic_t addr_str_seq = ATOMIC_INIT(0); 166 167 struct page *ceph_zero_page; /* used in certain error cases */ 168 169 const char *ceph_pr_addr(const struct ceph_entity_addr *addr) 170 { 171 int i; 172 char *s; 173 struct sockaddr_storage ss = addr->in_addr; /* align */ 174 struct sockaddr_in *in4 = (struct sockaddr_in *)&ss; 175 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss; 176 177 i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK; 178 s = addr_str[i]; 179 180 switch (ss.ss_family) { 181 case AF_INET: 182 snprintf(s, MAX_ADDR_STR_LEN, "(%d)%pI4:%hu", 183 le32_to_cpu(addr->type), &in4->sin_addr, 184 ntohs(in4->sin_port)); 185 break; 186 187 case AF_INET6: 188 snprintf(s, MAX_ADDR_STR_LEN, "(%d)[%pI6c]:%hu", 189 le32_to_cpu(addr->type), &in6->sin6_addr, 190 ntohs(in6->sin6_port)); 191 break; 192 193 default: 194 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)", 195 ss.ss_family); 196 } 197 198 return s; 199 } 200 EXPORT_SYMBOL(ceph_pr_addr); 201 202 void ceph_encode_my_addr(struct ceph_messenger *msgr) 203 { 204 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); 205 ceph_encode_banner_addr(&msgr->my_enc_addr); 206 } 207 208 /* 209 * work queue for all reading and writing to/from the socket. 210 */ 211 static struct workqueue_struct *ceph_msgr_wq; 212 213 static int ceph_msgr_slab_init(void) 214 { 215 BUG_ON(ceph_msg_cache); 216 ceph_msg_cache = KMEM_CACHE(ceph_msg, 0); 217 if (!ceph_msg_cache) 218 return -ENOMEM; 219 220 return 0; 221 } 222 223 static void ceph_msgr_slab_exit(void) 224 { 225 BUG_ON(!ceph_msg_cache); 226 kmem_cache_destroy(ceph_msg_cache); 227 ceph_msg_cache = NULL; 228 } 229 230 static void _ceph_msgr_exit(void) 231 { 232 if (ceph_msgr_wq) { 233 destroy_workqueue(ceph_msgr_wq); 234 ceph_msgr_wq = NULL; 235 } 236 237 BUG_ON(!ceph_zero_page); 238 put_page(ceph_zero_page); 239 ceph_zero_page = NULL; 240 241 ceph_msgr_slab_exit(); 242 } 243 244 int __init ceph_msgr_init(void) 245 { 246 if (ceph_msgr_slab_init()) 247 return -ENOMEM; 248 249 BUG_ON(ceph_zero_page); 250 ceph_zero_page = ZERO_PAGE(0); 251 get_page(ceph_zero_page); 252 253 /* 254 * The number of active work items is limited by the number of 255 * connections, so leave @max_active at default. 256 */ 257 ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM, 0); 258 if (ceph_msgr_wq) 259 return 0; 260 261 pr_err("msgr_init failed to create workqueue\n"); 262 _ceph_msgr_exit(); 263 264 return -ENOMEM; 265 } 266 267 void ceph_msgr_exit(void) 268 { 269 BUG_ON(ceph_msgr_wq == NULL); 270 271 _ceph_msgr_exit(); 272 } 273 274 void ceph_msgr_flush(void) 275 { 276 flush_workqueue(ceph_msgr_wq); 277 } 278 EXPORT_SYMBOL(ceph_msgr_flush); 279 280 /* Connection socket state transition functions */ 281 282 static void con_sock_state_init(struct ceph_connection *con) 283 { 284 int old_state; 285 286 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); 287 if (WARN_ON(old_state != CON_SOCK_STATE_NEW)) 288 printk("%s: unexpected old state %d\n", __func__, old_state); 289 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 290 CON_SOCK_STATE_CLOSED); 291 } 292 293 static void con_sock_state_connecting(struct ceph_connection *con) 294 { 295 int old_state; 296 297 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); 298 if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED)) 299 printk("%s: unexpected old state %d\n", __func__, old_state); 300 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 301 CON_SOCK_STATE_CONNECTING); 302 } 303 304 static void con_sock_state_connected(struct ceph_connection *con) 305 { 306 int old_state; 307 308 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); 309 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING)) 310 printk("%s: unexpected old state %d\n", __func__, old_state); 311 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 312 CON_SOCK_STATE_CONNECTED); 313 } 314 315 static void con_sock_state_closing(struct ceph_connection *con) 316 { 317 int old_state; 318 319 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); 320 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING && 321 old_state != CON_SOCK_STATE_CONNECTED && 322 old_state != CON_SOCK_STATE_CLOSING)) 323 printk("%s: unexpected old state %d\n", __func__, old_state); 324 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 325 CON_SOCK_STATE_CLOSING); 326 } 327 328 static void con_sock_state_closed(struct ceph_connection *con) 329 { 330 int old_state; 331 332 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); 333 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && 334 old_state != CON_SOCK_STATE_CLOSING && 335 old_state != CON_SOCK_STATE_CONNECTING && 336 old_state != CON_SOCK_STATE_CLOSED)) 337 printk("%s: unexpected old state %d\n", __func__, old_state); 338 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 339 CON_SOCK_STATE_CLOSED); 340 } 341 342 /* 343 * socket callback functions 344 */ 345 346 /* data available on socket, or listen socket received a connect */ 347 static void ceph_sock_data_ready(struct sock *sk) 348 { 349 struct ceph_connection *con = sk->sk_user_data; 350 if (atomic_read(&con->msgr->stopping)) { 351 return; 352 } 353 354 if (sk->sk_state != TCP_CLOSE_WAIT) { 355 dout("%s %p state = %d, queueing work\n", __func__, 356 con, con->state); 357 queue_con(con); 358 } 359 } 360 361 /* socket has buffer space for writing */ 362 static void ceph_sock_write_space(struct sock *sk) 363 { 364 struct ceph_connection *con = sk->sk_user_data; 365 366 /* only queue to workqueue if there is data we want to write, 367 * and there is sufficient space in the socket buffer to accept 368 * more data. clear SOCK_NOSPACE so that ceph_sock_write_space() 369 * doesn't get called again until try_write() fills the socket 370 * buffer. See net/ipv4/tcp_input.c:tcp_check_space() 371 * and net/core/stream.c:sk_stream_write_space(). 372 */ 373 if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { 374 if (sk_stream_is_writeable(sk)) { 375 dout("%s %p queueing write work\n", __func__, con); 376 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 377 queue_con(con); 378 } 379 } else { 380 dout("%s %p nothing to write\n", __func__, con); 381 } 382 } 383 384 /* socket's state has changed */ 385 static void ceph_sock_state_change(struct sock *sk) 386 { 387 struct ceph_connection *con = sk->sk_user_data; 388 389 dout("%s %p state = %d sk_state = %u\n", __func__, 390 con, con->state, sk->sk_state); 391 392 switch (sk->sk_state) { 393 case TCP_CLOSE: 394 dout("%s TCP_CLOSE\n", __func__); 395 fallthrough; 396 case TCP_CLOSE_WAIT: 397 dout("%s TCP_CLOSE_WAIT\n", __func__); 398 con_sock_state_closing(con); 399 ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); 400 queue_con(con); 401 break; 402 case TCP_ESTABLISHED: 403 dout("%s TCP_ESTABLISHED\n", __func__); 404 con_sock_state_connected(con); 405 queue_con(con); 406 break; 407 default: /* Everything else is uninteresting */ 408 break; 409 } 410 } 411 412 /* 413 * set up socket callbacks 414 */ 415 static void set_sock_callbacks(struct socket *sock, 416 struct ceph_connection *con) 417 { 418 struct sock *sk = sock->sk; 419 sk->sk_user_data = con; 420 sk->sk_data_ready = ceph_sock_data_ready; 421 sk->sk_write_space = ceph_sock_write_space; 422 sk->sk_state_change = ceph_sock_state_change; 423 } 424 425 426 /* 427 * socket helpers 428 */ 429 430 /* 431 * initiate connection to a remote socket. 432 */ 433 int ceph_tcp_connect(struct ceph_connection *con) 434 { 435 struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */ 436 struct socket *sock; 437 unsigned int noio_flag; 438 int ret; 439 440 dout("%s con %p peer_addr %s\n", __func__, con, 441 ceph_pr_addr(&con->peer_addr)); 442 BUG_ON(con->sock); 443 444 /* sock_create_kern() allocates with GFP_KERNEL */ 445 noio_flag = memalloc_noio_save(); 446 ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family, 447 SOCK_STREAM, IPPROTO_TCP, &sock); 448 memalloc_noio_restore(noio_flag); 449 if (ret) 450 return ret; 451 sock->sk->sk_allocation = GFP_NOFS; 452 453 #ifdef CONFIG_LOCKDEP 454 lockdep_set_class(&sock->sk->sk_lock, &socket_class); 455 #endif 456 457 set_sock_callbacks(sock, con); 458 459 con_sock_state_connecting(con); 460 ret = sock->ops->connect(sock, (struct sockaddr *)&ss, sizeof(ss), 461 O_NONBLOCK); 462 if (ret == -EINPROGRESS) { 463 dout("connect %s EINPROGRESS sk_state = %u\n", 464 ceph_pr_addr(&con->peer_addr), 465 sock->sk->sk_state); 466 } else if (ret < 0) { 467 pr_err("connect %s error %d\n", 468 ceph_pr_addr(&con->peer_addr), ret); 469 sock_release(sock); 470 return ret; 471 } 472 473 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) 474 tcp_sock_set_nodelay(sock->sk); 475 476 con->sock = sock; 477 return 0; 478 } 479 480 /* 481 * If @buf is NULL, discard up to @len bytes. 482 */ 483 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 484 { 485 struct kvec iov = {buf, len}; 486 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 487 int r; 488 489 if (!buf) 490 msg.msg_flags |= MSG_TRUNC; 491 492 iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len); 493 r = sock_recvmsg(sock, &msg, msg.msg_flags); 494 if (r == -EAGAIN) 495 r = 0; 496 return r; 497 } 498 499 static int ceph_tcp_recvpage(struct socket *sock, struct page *page, 500 int page_offset, size_t length) 501 { 502 struct bio_vec bvec = { 503 .bv_page = page, 504 .bv_offset = page_offset, 505 .bv_len = length 506 }; 507 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 508 int r; 509 510 BUG_ON(page_offset + length > PAGE_SIZE); 511 iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length); 512 r = sock_recvmsg(sock, &msg, msg.msg_flags); 513 if (r == -EAGAIN) 514 r = 0; 515 return r; 516 } 517 518 /* 519 * write something. @more is true if caller will be sending more data 520 * shortly. 521 */ 522 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 523 size_t kvlen, size_t len, bool more) 524 { 525 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 526 int r; 527 528 if (more) 529 msg.msg_flags |= MSG_MORE; 530 else 531 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 532 533 r = kernel_sendmsg(sock, &msg, iov, kvlen, len); 534 if (r == -EAGAIN) 535 r = 0; 536 return r; 537 } 538 539 /* 540 * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST 541 */ 542 static int ceph_tcp_sendpage(struct socket *sock, struct page *page, 543 int offset, size_t size, int more) 544 { 545 ssize_t (*sendpage)(struct socket *sock, struct page *page, 546 int offset, size_t size, int flags); 547 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more; 548 int ret; 549 550 /* 551 * sendpage cannot properly handle pages with page_count == 0, 552 * we need to fall back to sendmsg if that's the case. 553 * 554 * Same goes for slab pages: skb_can_coalesce() allows 555 * coalescing neighboring slab objects into a single frag which 556 * triggers one of hardened usercopy checks. 557 */ 558 if (sendpage_ok(page)) 559 sendpage = sock->ops->sendpage; 560 else 561 sendpage = sock_no_sendpage; 562 563 ret = sendpage(sock, page, offset, size, flags); 564 if (ret == -EAGAIN) 565 ret = 0; 566 567 return ret; 568 } 569 570 /* 571 * Shutdown/close the socket for the given connection. 572 */ 573 int ceph_con_close_socket(struct ceph_connection *con) 574 { 575 int rc = 0; 576 577 dout("%s con %p sock %p\n", __func__, con, con->sock); 578 if (con->sock) { 579 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 580 sock_release(con->sock); 581 con->sock = NULL; 582 } 583 584 /* 585 * Forcibly clear the SOCK_CLOSED flag. It gets set 586 * independent of the connection mutex, and we could have 587 * received a socket close event before we had the chance to 588 * shut the socket down. 589 */ 590 ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); 591 592 con_sock_state_closed(con); 593 return rc; 594 } 595 596 void ceph_con_v1_reset_protocol(struct ceph_connection *con) 597 { 598 con->out_skip = 0; 599 } 600 601 static void ceph_con_reset_protocol(struct ceph_connection *con) 602 { 603 dout("%s con %p\n", __func__, con); 604 605 ceph_con_close_socket(con); 606 if (con->in_msg) { 607 WARN_ON(con->in_msg->con != con); 608 ceph_msg_put(con->in_msg); 609 con->in_msg = NULL; 610 } 611 if (con->out_msg) { 612 WARN_ON(con->out_msg->con != con); 613 ceph_msg_put(con->out_msg); 614 con->out_msg = NULL; 615 } 616 617 ceph_con_v1_reset_protocol(con); 618 } 619 620 /* 621 * Reset a connection. Discard all incoming and outgoing messages 622 * and clear *_seq state. 623 */ 624 static void ceph_msg_remove(struct ceph_msg *msg) 625 { 626 list_del_init(&msg->list_head); 627 628 ceph_msg_put(msg); 629 } 630 static void ceph_msg_remove_list(struct list_head *head) 631 { 632 while (!list_empty(head)) { 633 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 634 list_head); 635 ceph_msg_remove(msg); 636 } 637 } 638 639 void ceph_con_v1_reset_session(struct ceph_connection *con) 640 { 641 con->connect_seq = 0; 642 con->peer_global_seq = 0; 643 } 644 645 void ceph_con_reset_session(struct ceph_connection *con) 646 { 647 dout("%s con %p\n", __func__, con); 648 649 WARN_ON(con->in_msg); 650 WARN_ON(con->out_msg); 651 ceph_msg_remove_list(&con->out_queue); 652 ceph_msg_remove_list(&con->out_sent); 653 con->out_seq = 0; 654 con->in_seq = 0; 655 con->in_seq_acked = 0; 656 657 ceph_con_v1_reset_session(con); 658 } 659 660 /* 661 * mark a peer down. drop any open connections. 662 */ 663 void ceph_con_close(struct ceph_connection *con) 664 { 665 mutex_lock(&con->mutex); 666 dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); 667 con->state = CEPH_CON_S_CLOSED; 668 669 ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next 670 connect */ 671 ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); 672 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); 673 ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF); 674 675 ceph_con_reset_protocol(con); 676 ceph_con_reset_session(con); 677 cancel_con(con); 678 mutex_unlock(&con->mutex); 679 } 680 EXPORT_SYMBOL(ceph_con_close); 681 682 /* 683 * Reopen a closed connection, with a new peer address. 684 */ 685 void ceph_con_open(struct ceph_connection *con, 686 __u8 entity_type, __u64 entity_num, 687 struct ceph_entity_addr *addr) 688 { 689 mutex_lock(&con->mutex); 690 dout("con_open %p %s\n", con, ceph_pr_addr(addr)); 691 692 WARN_ON(con->state != CEPH_CON_S_CLOSED); 693 con->state = CEPH_CON_S_PREOPEN; 694 695 con->peer_name.type = (__u8) entity_type; 696 con->peer_name.num = cpu_to_le64(entity_num); 697 698 memcpy(&con->peer_addr, addr, sizeof(*addr)); 699 con->delay = 0; /* reset backoff memory */ 700 mutex_unlock(&con->mutex); 701 queue_con(con); 702 } 703 EXPORT_SYMBOL(ceph_con_open); 704 705 bool ceph_con_v1_opened(struct ceph_connection *con) 706 { 707 return con->connect_seq; 708 } 709 710 /* 711 * return true if this connection ever successfully opened 712 */ 713 bool ceph_con_opened(struct ceph_connection *con) 714 { 715 return ceph_con_v1_opened(con); 716 } 717 718 /* 719 * initialize a new connection. 720 */ 721 void ceph_con_init(struct ceph_connection *con, void *private, 722 const struct ceph_connection_operations *ops, 723 struct ceph_messenger *msgr) 724 { 725 dout("con_init %p\n", con); 726 memset(con, 0, sizeof(*con)); 727 con->private = private; 728 con->ops = ops; 729 con->msgr = msgr; 730 731 con_sock_state_init(con); 732 733 mutex_init(&con->mutex); 734 INIT_LIST_HEAD(&con->out_queue); 735 INIT_LIST_HEAD(&con->out_sent); 736 INIT_DELAYED_WORK(&con->work, ceph_con_workfn); 737 738 con->state = CEPH_CON_S_CLOSED; 739 } 740 EXPORT_SYMBOL(ceph_con_init); 741 742 743 /* 744 * We maintain a global counter to order connection attempts. Get 745 * a unique seq greater than @gt. 746 */ 747 u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt) 748 { 749 u32 ret; 750 751 spin_lock(&msgr->global_seq_lock); 752 if (msgr->global_seq < gt) 753 msgr->global_seq = gt; 754 ret = ++msgr->global_seq; 755 spin_unlock(&msgr->global_seq_lock); 756 return ret; 757 } 758 759 /* 760 * Discard messages that have been acked by the server. 761 */ 762 void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) 763 { 764 struct ceph_msg *msg; 765 u64 seq; 766 767 dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq); 768 while (!list_empty(&con->out_sent)) { 769 msg = list_first_entry(&con->out_sent, struct ceph_msg, 770 list_head); 771 WARN_ON(msg->needs_out_seq); 772 seq = le64_to_cpu(msg->hdr.seq); 773 if (seq > ack_seq) 774 break; 775 776 dout("%s con %p discarding msg %p seq %llu\n", __func__, con, 777 msg, seq); 778 ceph_msg_remove(msg); 779 } 780 } 781 782 /* 783 * Discard messages that have been requeued in con_fault(), up to 784 * reconnect_seq. This avoids gratuitously resending messages that 785 * the server had received and handled prior to reconnect. 786 */ 787 void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) 788 { 789 struct ceph_msg *msg; 790 u64 seq; 791 792 dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq); 793 while (!list_empty(&con->out_queue)) { 794 msg = list_first_entry(&con->out_queue, struct ceph_msg, 795 list_head); 796 if (msg->needs_out_seq) 797 break; 798 seq = le64_to_cpu(msg->hdr.seq); 799 if (seq > reconnect_seq) 800 break; 801 802 dout("%s con %p discarding msg %p seq %llu\n", __func__, con, 803 msg, seq); 804 ceph_msg_remove(msg); 805 } 806 } 807 808 static void con_out_kvec_reset(struct ceph_connection *con) 809 { 810 BUG_ON(con->out_skip); 811 812 con->out_kvec_left = 0; 813 con->out_kvec_bytes = 0; 814 con->out_kvec_cur = &con->out_kvec[0]; 815 } 816 817 static void con_out_kvec_add(struct ceph_connection *con, 818 size_t size, void *data) 819 { 820 int index = con->out_kvec_left; 821 822 BUG_ON(con->out_skip); 823 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); 824 825 con->out_kvec[index].iov_len = size; 826 con->out_kvec[index].iov_base = data; 827 con->out_kvec_left++; 828 con->out_kvec_bytes += size; 829 } 830 831 /* 832 * Chop off a kvec from the end. Return residual number of bytes for 833 * that kvec, i.e. how many bytes would have been written if the kvec 834 * hadn't been nuked. 835 */ 836 static int con_out_kvec_skip(struct ceph_connection *con) 837 { 838 int off = con->out_kvec_cur - con->out_kvec; 839 int skip = 0; 840 841 if (con->out_kvec_bytes > 0) { 842 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; 843 BUG_ON(con->out_kvec_bytes < skip); 844 BUG_ON(!con->out_kvec_left); 845 con->out_kvec_bytes -= skip; 846 con->out_kvec_left--; 847 } 848 849 return skip; 850 } 851 852 #ifdef CONFIG_BLOCK 853 854 /* 855 * For a bio data item, a piece is whatever remains of the next 856 * entry in the current bio iovec, or the first entry in the next 857 * bio in the list. 858 */ 859 static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor, 860 size_t length) 861 { 862 struct ceph_msg_data *data = cursor->data; 863 struct ceph_bio_iter *it = &cursor->bio_iter; 864 865 cursor->resid = min_t(size_t, length, data->bio_length); 866 *it = data->bio_pos; 867 if (cursor->resid < it->iter.bi_size) 868 it->iter.bi_size = cursor->resid; 869 870 BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter)); 871 cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter); 872 } 873 874 static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor, 875 size_t *page_offset, 876 size_t *length) 877 { 878 struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio, 879 cursor->bio_iter.iter); 880 881 *page_offset = bv.bv_offset; 882 *length = bv.bv_len; 883 return bv.bv_page; 884 } 885 886 static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor, 887 size_t bytes) 888 { 889 struct ceph_bio_iter *it = &cursor->bio_iter; 890 struct page *page = bio_iter_page(it->bio, it->iter); 891 892 BUG_ON(bytes > cursor->resid); 893 BUG_ON(bytes > bio_iter_len(it->bio, it->iter)); 894 cursor->resid -= bytes; 895 bio_advance_iter(it->bio, &it->iter, bytes); 896 897 if (!cursor->resid) { 898 BUG_ON(!cursor->last_piece); 899 return false; /* no more data */ 900 } 901 902 if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done && 903 page == bio_iter_page(it->bio, it->iter))) 904 return false; /* more bytes to process in this segment */ 905 906 if (!it->iter.bi_size) { 907 it->bio = it->bio->bi_next; 908 it->iter = it->bio->bi_iter; 909 if (cursor->resid < it->iter.bi_size) 910 it->iter.bi_size = cursor->resid; 911 } 912 913 BUG_ON(cursor->last_piece); 914 BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter)); 915 cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter); 916 return true; 917 } 918 #endif /* CONFIG_BLOCK */ 919 920 static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor, 921 size_t length) 922 { 923 struct ceph_msg_data *data = cursor->data; 924 struct bio_vec *bvecs = data->bvec_pos.bvecs; 925 926 cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size); 927 cursor->bvec_iter = data->bvec_pos.iter; 928 cursor->bvec_iter.bi_size = cursor->resid; 929 930 BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter)); 931 cursor->last_piece = 932 cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter); 933 } 934 935 static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor, 936 size_t *page_offset, 937 size_t *length) 938 { 939 struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs, 940 cursor->bvec_iter); 941 942 *page_offset = bv.bv_offset; 943 *length = bv.bv_len; 944 return bv.bv_page; 945 } 946 947 static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor, 948 size_t bytes) 949 { 950 struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs; 951 struct page *page = bvec_iter_page(bvecs, cursor->bvec_iter); 952 953 BUG_ON(bytes > cursor->resid); 954 BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter)); 955 cursor->resid -= bytes; 956 bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes); 957 958 if (!cursor->resid) { 959 BUG_ON(!cursor->last_piece); 960 return false; /* no more data */ 961 } 962 963 if (!bytes || (cursor->bvec_iter.bi_bvec_done && 964 page == bvec_iter_page(bvecs, cursor->bvec_iter))) 965 return false; /* more bytes to process in this segment */ 966 967 BUG_ON(cursor->last_piece); 968 BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter)); 969 cursor->last_piece = 970 cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter); 971 return true; 972 } 973 974 /* 975 * For a page array, a piece comes from the first page in the array 976 * that has not already been fully consumed. 977 */ 978 static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor, 979 size_t length) 980 { 981 struct ceph_msg_data *data = cursor->data; 982 int page_count; 983 984 BUG_ON(data->type != CEPH_MSG_DATA_PAGES); 985 986 BUG_ON(!data->pages); 987 BUG_ON(!data->length); 988 989 cursor->resid = min(length, data->length); 990 page_count = calc_pages_for(data->alignment, (u64)data->length); 991 cursor->page_offset = data->alignment & ~PAGE_MASK; 992 cursor->page_index = 0; 993 BUG_ON(page_count > (int)USHRT_MAX); 994 cursor->page_count = (unsigned short)page_count; 995 BUG_ON(length > SIZE_MAX - cursor->page_offset); 996 cursor->last_piece = cursor->page_offset + cursor->resid <= PAGE_SIZE; 997 } 998 999 static struct page * 1000 ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor, 1001 size_t *page_offset, size_t *length) 1002 { 1003 struct ceph_msg_data *data = cursor->data; 1004 1005 BUG_ON(data->type != CEPH_MSG_DATA_PAGES); 1006 1007 BUG_ON(cursor->page_index >= cursor->page_count); 1008 BUG_ON(cursor->page_offset >= PAGE_SIZE); 1009 1010 *page_offset = cursor->page_offset; 1011 if (cursor->last_piece) 1012 *length = cursor->resid; 1013 else 1014 *length = PAGE_SIZE - *page_offset; 1015 1016 return data->pages[cursor->page_index]; 1017 } 1018 1019 static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor, 1020 size_t bytes) 1021 { 1022 BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES); 1023 1024 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE); 1025 1026 /* Advance the cursor page offset */ 1027 1028 cursor->resid -= bytes; 1029 cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK; 1030 if (!bytes || cursor->page_offset) 1031 return false; /* more bytes to process in the current page */ 1032 1033 if (!cursor->resid) 1034 return false; /* no more data */ 1035 1036 /* Move on to the next page; offset is already at 0 */ 1037 1038 BUG_ON(cursor->page_index >= cursor->page_count); 1039 cursor->page_index++; 1040 cursor->last_piece = cursor->resid <= PAGE_SIZE; 1041 1042 return true; 1043 } 1044 1045 /* 1046 * For a pagelist, a piece is whatever remains to be consumed in the 1047 * first page in the list, or the front of the next page. 1048 */ 1049 static void 1050 ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor, 1051 size_t length) 1052 { 1053 struct ceph_msg_data *data = cursor->data; 1054 struct ceph_pagelist *pagelist; 1055 struct page *page; 1056 1057 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 1058 1059 pagelist = data->pagelist; 1060 BUG_ON(!pagelist); 1061 1062 if (!length) 1063 return; /* pagelist can be assigned but empty */ 1064 1065 BUG_ON(list_empty(&pagelist->head)); 1066 page = list_first_entry(&pagelist->head, struct page, lru); 1067 1068 cursor->resid = min(length, pagelist->length); 1069 cursor->page = page; 1070 cursor->offset = 0; 1071 cursor->last_piece = cursor->resid <= PAGE_SIZE; 1072 } 1073 1074 static struct page * 1075 ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor, 1076 size_t *page_offset, size_t *length) 1077 { 1078 struct ceph_msg_data *data = cursor->data; 1079 struct ceph_pagelist *pagelist; 1080 1081 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 1082 1083 pagelist = data->pagelist; 1084 BUG_ON(!pagelist); 1085 1086 BUG_ON(!cursor->page); 1087 BUG_ON(cursor->offset + cursor->resid != pagelist->length); 1088 1089 /* offset of first page in pagelist is always 0 */ 1090 *page_offset = cursor->offset & ~PAGE_MASK; 1091 if (cursor->last_piece) 1092 *length = cursor->resid; 1093 else 1094 *length = PAGE_SIZE - *page_offset; 1095 1096 return cursor->page; 1097 } 1098 1099 static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, 1100 size_t bytes) 1101 { 1102 struct ceph_msg_data *data = cursor->data; 1103 struct ceph_pagelist *pagelist; 1104 1105 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 1106 1107 pagelist = data->pagelist; 1108 BUG_ON(!pagelist); 1109 1110 BUG_ON(cursor->offset + cursor->resid != pagelist->length); 1111 BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE); 1112 1113 /* Advance the cursor offset */ 1114 1115 cursor->resid -= bytes; 1116 cursor->offset += bytes; 1117 /* offset of first page in pagelist is always 0 */ 1118 if (!bytes || cursor->offset & ~PAGE_MASK) 1119 return false; /* more bytes to process in the current page */ 1120 1121 if (!cursor->resid) 1122 return false; /* no more data */ 1123 1124 /* Move on to the next page */ 1125 1126 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); 1127 cursor->page = list_next_entry(cursor->page, lru); 1128 cursor->last_piece = cursor->resid <= PAGE_SIZE; 1129 1130 return true; 1131 } 1132 1133 /* 1134 * Message data is handled (sent or received) in pieces, where each 1135 * piece resides on a single page. The network layer might not 1136 * consume an entire piece at once. A data item's cursor keeps 1137 * track of which piece is next to process and how much remains to 1138 * be processed in that piece. It also tracks whether the current 1139 * piece is the last one in the data item. 1140 */ 1141 static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) 1142 { 1143 size_t length = cursor->total_resid; 1144 1145 switch (cursor->data->type) { 1146 case CEPH_MSG_DATA_PAGELIST: 1147 ceph_msg_data_pagelist_cursor_init(cursor, length); 1148 break; 1149 case CEPH_MSG_DATA_PAGES: 1150 ceph_msg_data_pages_cursor_init(cursor, length); 1151 break; 1152 #ifdef CONFIG_BLOCK 1153 case CEPH_MSG_DATA_BIO: 1154 ceph_msg_data_bio_cursor_init(cursor, length); 1155 break; 1156 #endif /* CONFIG_BLOCK */ 1157 case CEPH_MSG_DATA_BVECS: 1158 ceph_msg_data_bvecs_cursor_init(cursor, length); 1159 break; 1160 case CEPH_MSG_DATA_NONE: 1161 default: 1162 /* BUG(); */ 1163 break; 1164 } 1165 cursor->need_crc = true; 1166 } 1167 1168 void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, 1169 struct ceph_msg *msg, size_t length) 1170 { 1171 BUG_ON(!length); 1172 BUG_ON(length > msg->data_length); 1173 BUG_ON(!msg->num_data_items); 1174 1175 cursor->total_resid = length; 1176 cursor->data = msg->data; 1177 1178 __ceph_msg_data_cursor_init(cursor); 1179 } 1180 1181 /* 1182 * Return the page containing the next piece to process for a given 1183 * data item, and supply the page offset and length of that piece. 1184 * Indicate whether this is the last piece in this data item. 1185 */ 1186 struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, 1187 size_t *page_offset, size_t *length, 1188 bool *last_piece) 1189 { 1190 struct page *page; 1191 1192 switch (cursor->data->type) { 1193 case CEPH_MSG_DATA_PAGELIST: 1194 page = ceph_msg_data_pagelist_next(cursor, page_offset, length); 1195 break; 1196 case CEPH_MSG_DATA_PAGES: 1197 page = ceph_msg_data_pages_next(cursor, page_offset, length); 1198 break; 1199 #ifdef CONFIG_BLOCK 1200 case CEPH_MSG_DATA_BIO: 1201 page = ceph_msg_data_bio_next(cursor, page_offset, length); 1202 break; 1203 #endif /* CONFIG_BLOCK */ 1204 case CEPH_MSG_DATA_BVECS: 1205 page = ceph_msg_data_bvecs_next(cursor, page_offset, length); 1206 break; 1207 case CEPH_MSG_DATA_NONE: 1208 default: 1209 page = NULL; 1210 break; 1211 } 1212 1213 BUG_ON(!page); 1214 BUG_ON(*page_offset + *length > PAGE_SIZE); 1215 BUG_ON(!*length); 1216 BUG_ON(*length > cursor->resid); 1217 if (last_piece) 1218 *last_piece = cursor->last_piece; 1219 1220 return page; 1221 } 1222 1223 /* 1224 * Returns true if the result moves the cursor on to the next piece 1225 * of the data item. 1226 */ 1227 void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) 1228 { 1229 bool new_piece; 1230 1231 BUG_ON(bytes > cursor->resid); 1232 switch (cursor->data->type) { 1233 case CEPH_MSG_DATA_PAGELIST: 1234 new_piece = ceph_msg_data_pagelist_advance(cursor, bytes); 1235 break; 1236 case CEPH_MSG_DATA_PAGES: 1237 new_piece = ceph_msg_data_pages_advance(cursor, bytes); 1238 break; 1239 #ifdef CONFIG_BLOCK 1240 case CEPH_MSG_DATA_BIO: 1241 new_piece = ceph_msg_data_bio_advance(cursor, bytes); 1242 break; 1243 #endif /* CONFIG_BLOCK */ 1244 case CEPH_MSG_DATA_BVECS: 1245 new_piece = ceph_msg_data_bvecs_advance(cursor, bytes); 1246 break; 1247 case CEPH_MSG_DATA_NONE: 1248 default: 1249 BUG(); 1250 break; 1251 } 1252 cursor->total_resid -= bytes; 1253 1254 if (!cursor->resid && cursor->total_resid) { 1255 WARN_ON(!cursor->last_piece); 1256 cursor->data++; 1257 __ceph_msg_data_cursor_init(cursor); 1258 new_piece = true; 1259 } 1260 cursor->need_crc = new_piece; 1261 } 1262 1263 static size_t sizeof_footer(struct ceph_connection *con) 1264 { 1265 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? 1266 sizeof(struct ceph_msg_footer) : 1267 sizeof(struct ceph_msg_footer_old); 1268 } 1269 1270 static void prepare_message_data(struct ceph_msg *msg, u32 data_len) 1271 { 1272 /* Initialize data cursor */ 1273 1274 ceph_msg_data_cursor_init(&msg->cursor, msg, data_len); 1275 } 1276 1277 /* 1278 * Prepare footer for currently outgoing message, and finish things 1279 * off. Assumes out_kvec* are already valid.. we just add on to the end. 1280 */ 1281 static void prepare_write_message_footer(struct ceph_connection *con) 1282 { 1283 struct ceph_msg *m = con->out_msg; 1284 1285 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; 1286 1287 dout("prepare_write_message_footer %p\n", con); 1288 con_out_kvec_add(con, sizeof_footer(con), &m->footer); 1289 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { 1290 if (con->ops->sign_message) 1291 con->ops->sign_message(m); 1292 else 1293 m->footer.sig = 0; 1294 } else { 1295 m->old_footer.flags = m->footer.flags; 1296 } 1297 con->out_more = m->more_to_follow; 1298 con->out_msg_done = true; 1299 } 1300 1301 /* 1302 * Prepare headers for the next outgoing message. 1303 */ 1304 static void prepare_write_message(struct ceph_connection *con) 1305 { 1306 struct ceph_msg *m; 1307 u32 crc; 1308 1309 con_out_kvec_reset(con); 1310 con->out_msg_done = false; 1311 1312 /* Sneak an ack in there first? If we can get it into the same 1313 * TCP packet that's a good thing. */ 1314 if (con->in_seq > con->in_seq_acked) { 1315 con->in_seq_acked = con->in_seq; 1316 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 1317 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 1318 con_out_kvec_add(con, sizeof (con->out_temp_ack), 1319 &con->out_temp_ack); 1320 } 1321 1322 ceph_con_get_out_msg(con); 1323 m = con->out_msg; 1324 1325 dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n", 1326 m, con->out_seq, le16_to_cpu(m->hdr.type), 1327 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 1328 m->data_length); 1329 WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len)); 1330 WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len)); 1331 1332 /* tag + hdr + front + middle */ 1333 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 1334 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); 1335 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 1336 1337 if (m->middle) 1338 con_out_kvec_add(con, m->middle->vec.iov_len, 1339 m->middle->vec.iov_base); 1340 1341 /* fill in hdr crc and finalize hdr */ 1342 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 1343 con->out_msg->hdr.crc = cpu_to_le32(crc); 1344 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); 1345 1346 /* fill in front and middle crc, footer */ 1347 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 1348 con->out_msg->footer.front_crc = cpu_to_le32(crc); 1349 if (m->middle) { 1350 crc = crc32c(0, m->middle->vec.iov_base, 1351 m->middle->vec.iov_len); 1352 con->out_msg->footer.middle_crc = cpu_to_le32(crc); 1353 } else 1354 con->out_msg->footer.middle_crc = 0; 1355 dout("%s front_crc %u middle_crc %u\n", __func__, 1356 le32_to_cpu(con->out_msg->footer.front_crc), 1357 le32_to_cpu(con->out_msg->footer.middle_crc)); 1358 con->out_msg->footer.flags = 0; 1359 1360 /* is there a data payload? */ 1361 con->out_msg->footer.data_crc = 0; 1362 if (m->data_length) { 1363 prepare_message_data(con->out_msg, m->data_length); 1364 con->out_more = 1; /* data + footer will follow */ 1365 } else { 1366 /* no, queue up footer too and be done */ 1367 prepare_write_message_footer(con); 1368 } 1369 1370 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1371 } 1372 1373 /* 1374 * Prepare an ack. 1375 */ 1376 static void prepare_write_ack(struct ceph_connection *con) 1377 { 1378 dout("prepare_write_ack %p %llu -> %llu\n", con, 1379 con->in_seq_acked, con->in_seq); 1380 con->in_seq_acked = con->in_seq; 1381 1382 con_out_kvec_reset(con); 1383 1384 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 1385 1386 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 1387 con_out_kvec_add(con, sizeof (con->out_temp_ack), 1388 &con->out_temp_ack); 1389 1390 con->out_more = 1; /* more will follow.. eventually.. */ 1391 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1392 } 1393 1394 /* 1395 * Prepare to share the seq during handshake 1396 */ 1397 static void prepare_write_seq(struct ceph_connection *con) 1398 { 1399 dout("prepare_write_seq %p %llu -> %llu\n", con, 1400 con->in_seq_acked, con->in_seq); 1401 con->in_seq_acked = con->in_seq; 1402 1403 con_out_kvec_reset(con); 1404 1405 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 1406 con_out_kvec_add(con, sizeof (con->out_temp_ack), 1407 &con->out_temp_ack); 1408 1409 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1410 } 1411 1412 /* 1413 * Prepare to write keepalive byte. 1414 */ 1415 static void prepare_write_keepalive(struct ceph_connection *con) 1416 { 1417 dout("prepare_write_keepalive %p\n", con); 1418 con_out_kvec_reset(con); 1419 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { 1420 struct timespec64 now; 1421 1422 ktime_get_real_ts64(&now); 1423 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); 1424 ceph_encode_timespec64(&con->out_temp_keepalive2, &now); 1425 con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), 1426 &con->out_temp_keepalive2); 1427 } else { 1428 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); 1429 } 1430 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1431 } 1432 1433 /* 1434 * Connection negotiation. 1435 */ 1436 1437 static int get_connect_authorizer(struct ceph_connection *con) 1438 { 1439 struct ceph_auth_handshake *auth; 1440 int auth_proto; 1441 1442 if (!con->ops->get_authorizer) { 1443 con->auth = NULL; 1444 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 1445 con->out_connect.authorizer_len = 0; 1446 return 0; 1447 } 1448 1449 auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry); 1450 if (IS_ERR(auth)) 1451 return PTR_ERR(auth); 1452 1453 con->auth = auth; 1454 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); 1455 con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len); 1456 return 0; 1457 } 1458 1459 /* 1460 * We connected to a peer and are saying hello. 1461 */ 1462 static void prepare_write_banner(struct ceph_connection *con) 1463 { 1464 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 1465 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), 1466 &con->msgr->my_enc_addr); 1467 1468 con->out_more = 0; 1469 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1470 } 1471 1472 static void __prepare_write_connect(struct ceph_connection *con) 1473 { 1474 con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect); 1475 if (con->auth) 1476 con_out_kvec_add(con, con->auth->authorizer_buf_len, 1477 con->auth->authorizer_buf); 1478 1479 con->out_more = 0; 1480 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1481 } 1482 1483 static int prepare_write_connect(struct ceph_connection *con) 1484 { 1485 unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); 1486 int proto; 1487 int ret; 1488 1489 switch (con->peer_name.type) { 1490 case CEPH_ENTITY_TYPE_MON: 1491 proto = CEPH_MONC_PROTOCOL; 1492 break; 1493 case CEPH_ENTITY_TYPE_OSD: 1494 proto = CEPH_OSDC_PROTOCOL; 1495 break; 1496 case CEPH_ENTITY_TYPE_MDS: 1497 proto = CEPH_MDSC_PROTOCOL; 1498 break; 1499 default: 1500 BUG(); 1501 } 1502 1503 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 1504 con->connect_seq, global_seq, proto); 1505 1506 con->out_connect.features = 1507 cpu_to_le64(from_msgr(con->msgr)->supported_features); 1508 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 1509 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 1510 con->out_connect.global_seq = cpu_to_le32(global_seq); 1511 con->out_connect.protocol_version = cpu_to_le32(proto); 1512 con->out_connect.flags = 0; 1513 1514 ret = get_connect_authorizer(con); 1515 if (ret) 1516 return ret; 1517 1518 __prepare_write_connect(con); 1519 return 0; 1520 } 1521 1522 /* 1523 * write as much of pending kvecs to the socket as we can. 1524 * 1 -> done 1525 * 0 -> socket full, but more to do 1526 * <0 -> error 1527 */ 1528 static int write_partial_kvec(struct ceph_connection *con) 1529 { 1530 int ret; 1531 1532 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 1533 while (con->out_kvec_bytes > 0) { 1534 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 1535 con->out_kvec_left, con->out_kvec_bytes, 1536 con->out_more); 1537 if (ret <= 0) 1538 goto out; 1539 con->out_kvec_bytes -= ret; 1540 if (con->out_kvec_bytes == 0) 1541 break; /* done */ 1542 1543 /* account for full iov entries consumed */ 1544 while (ret >= con->out_kvec_cur->iov_len) { 1545 BUG_ON(!con->out_kvec_left); 1546 ret -= con->out_kvec_cur->iov_len; 1547 con->out_kvec_cur++; 1548 con->out_kvec_left--; 1549 } 1550 /* and for a partially-consumed entry */ 1551 if (ret) { 1552 con->out_kvec_cur->iov_len -= ret; 1553 con->out_kvec_cur->iov_base += ret; 1554 } 1555 } 1556 con->out_kvec_left = 0; 1557 ret = 1; 1558 out: 1559 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 1560 con->out_kvec_bytes, con->out_kvec_left, ret); 1561 return ret; /* done! */ 1562 } 1563 1564 u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, 1565 unsigned int length) 1566 { 1567 char *kaddr; 1568 1569 kaddr = kmap(page); 1570 BUG_ON(kaddr == NULL); 1571 crc = crc32c(crc, kaddr + page_offset, length); 1572 kunmap(page); 1573 1574 return crc; 1575 } 1576 /* 1577 * Write as much message data payload as we can. If we finish, queue 1578 * up the footer. 1579 * 1 -> done, footer is now queued in out_kvec[]. 1580 * 0 -> socket full, but more to do 1581 * <0 -> error 1582 */ 1583 static int write_partial_message_data(struct ceph_connection *con) 1584 { 1585 struct ceph_msg *msg = con->out_msg; 1586 struct ceph_msg_data_cursor *cursor = &msg->cursor; 1587 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 1588 int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; 1589 u32 crc; 1590 1591 dout("%s %p msg %p\n", __func__, con, msg); 1592 1593 if (!msg->num_data_items) 1594 return -EINVAL; 1595 1596 /* 1597 * Iterate through each page that contains data to be 1598 * written, and send as much as possible for each. 1599 * 1600 * If we are calculating the data crc (the default), we will 1601 * need to map the page. If we have no pages, they have 1602 * been revoked, so use the zero page. 1603 */ 1604 crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; 1605 while (cursor->total_resid) { 1606 struct page *page; 1607 size_t page_offset; 1608 size_t length; 1609 int ret; 1610 1611 if (!cursor->resid) { 1612 ceph_msg_data_advance(cursor, 0); 1613 continue; 1614 } 1615 1616 page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); 1617 if (length == cursor->total_resid) 1618 more = MSG_MORE; 1619 ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, 1620 more); 1621 if (ret <= 0) { 1622 if (do_datacrc) 1623 msg->footer.data_crc = cpu_to_le32(crc); 1624 1625 return ret; 1626 } 1627 if (do_datacrc && cursor->need_crc) 1628 crc = ceph_crc32c_page(crc, page, page_offset, length); 1629 ceph_msg_data_advance(cursor, (size_t)ret); 1630 } 1631 1632 dout("%s %p msg %p done\n", __func__, con, msg); 1633 1634 /* prepare and queue up footer, too */ 1635 if (do_datacrc) 1636 msg->footer.data_crc = cpu_to_le32(crc); 1637 else 1638 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 1639 con_out_kvec_reset(con); 1640 prepare_write_message_footer(con); 1641 1642 return 1; /* must return > 0 to indicate success */ 1643 } 1644 1645 /* 1646 * write some zeros 1647 */ 1648 static int write_partial_skip(struct ceph_connection *con) 1649 { 1650 int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; 1651 int ret; 1652 1653 dout("%s %p %d left\n", __func__, con, con->out_skip); 1654 while (con->out_skip > 0) { 1655 size_t size = min(con->out_skip, (int) PAGE_SIZE); 1656 1657 if (size == con->out_skip) 1658 more = MSG_MORE; 1659 ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, 1660 more); 1661 if (ret <= 0) 1662 goto out; 1663 con->out_skip -= ret; 1664 } 1665 ret = 1; 1666 out: 1667 return ret; 1668 } 1669 1670 /* 1671 * Prepare to read connection handshake, or an ack. 1672 */ 1673 static void prepare_read_banner(struct ceph_connection *con) 1674 { 1675 dout("prepare_read_banner %p\n", con); 1676 con->in_base_pos = 0; 1677 } 1678 1679 static void prepare_read_connect(struct ceph_connection *con) 1680 { 1681 dout("prepare_read_connect %p\n", con); 1682 con->in_base_pos = 0; 1683 } 1684 1685 static void prepare_read_ack(struct ceph_connection *con) 1686 { 1687 dout("prepare_read_ack %p\n", con); 1688 con->in_base_pos = 0; 1689 } 1690 1691 static void prepare_read_seq(struct ceph_connection *con) 1692 { 1693 dout("prepare_read_seq %p\n", con); 1694 con->in_base_pos = 0; 1695 con->in_tag = CEPH_MSGR_TAG_SEQ; 1696 } 1697 1698 static void prepare_read_tag(struct ceph_connection *con) 1699 { 1700 dout("prepare_read_tag %p\n", con); 1701 con->in_base_pos = 0; 1702 con->in_tag = CEPH_MSGR_TAG_READY; 1703 } 1704 1705 static void prepare_read_keepalive_ack(struct ceph_connection *con) 1706 { 1707 dout("prepare_read_keepalive_ack %p\n", con); 1708 con->in_base_pos = 0; 1709 } 1710 1711 /* 1712 * Prepare to read a message. 1713 */ 1714 static int prepare_read_message(struct ceph_connection *con) 1715 { 1716 dout("prepare_read_message %p\n", con); 1717 BUG_ON(con->in_msg != NULL); 1718 con->in_base_pos = 0; 1719 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 1720 return 0; 1721 } 1722 1723 1724 static int read_partial(struct ceph_connection *con, 1725 int end, int size, void *object) 1726 { 1727 while (con->in_base_pos < end) { 1728 int left = end - con->in_base_pos; 1729 int have = size - left; 1730 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 1731 if (ret <= 0) 1732 return ret; 1733 con->in_base_pos += ret; 1734 } 1735 return 1; 1736 } 1737 1738 1739 /* 1740 * Read all or part of the connect-side handshake on a new connection 1741 */ 1742 static int read_partial_banner(struct ceph_connection *con) 1743 { 1744 int size; 1745 int end; 1746 int ret; 1747 1748 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 1749 1750 /* peer's banner */ 1751 size = strlen(CEPH_BANNER); 1752 end = size; 1753 ret = read_partial(con, end, size, con->in_banner); 1754 if (ret <= 0) 1755 goto out; 1756 1757 size = sizeof (con->actual_peer_addr); 1758 end += size; 1759 ret = read_partial(con, end, size, &con->actual_peer_addr); 1760 if (ret <= 0) 1761 goto out; 1762 ceph_decode_banner_addr(&con->actual_peer_addr); 1763 1764 size = sizeof (con->peer_addr_for_me); 1765 end += size; 1766 ret = read_partial(con, end, size, &con->peer_addr_for_me); 1767 if (ret <= 0) 1768 goto out; 1769 ceph_decode_banner_addr(&con->peer_addr_for_me); 1770 1771 out: 1772 return ret; 1773 } 1774 1775 static int read_partial_connect(struct ceph_connection *con) 1776 { 1777 int size; 1778 int end; 1779 int ret; 1780 1781 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1782 1783 size = sizeof (con->in_reply); 1784 end = size; 1785 ret = read_partial(con, end, size, &con->in_reply); 1786 if (ret <= 0) 1787 goto out; 1788 1789 if (con->auth) { 1790 size = le32_to_cpu(con->in_reply.authorizer_len); 1791 if (size > con->auth->authorizer_reply_buf_len) { 1792 pr_err("authorizer reply too big: %d > %zu\n", size, 1793 con->auth->authorizer_reply_buf_len); 1794 ret = -EINVAL; 1795 goto out; 1796 } 1797 1798 end += size; 1799 ret = read_partial(con, end, size, 1800 con->auth->authorizer_reply_buf); 1801 if (ret <= 0) 1802 goto out; 1803 } 1804 1805 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 1806 con, (int)con->in_reply.tag, 1807 le32_to_cpu(con->in_reply.connect_seq), 1808 le32_to_cpu(con->in_reply.global_seq)); 1809 out: 1810 return ret; 1811 } 1812 1813 /* 1814 * Verify the hello banner looks okay. 1815 */ 1816 static int verify_hello(struct ceph_connection *con) 1817 { 1818 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 1819 pr_err("connect to %s got bad banner\n", 1820 ceph_pr_addr(&con->peer_addr)); 1821 con->error_msg = "protocol error, bad banner"; 1822 return -1; 1823 } 1824 return 0; 1825 } 1826 1827 bool ceph_addr_is_blank(const struct ceph_entity_addr *addr) 1828 { 1829 struct sockaddr_storage ss = addr->in_addr; /* align */ 1830 struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr; 1831 struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr; 1832 1833 switch (ss.ss_family) { 1834 case AF_INET: 1835 return addr4->s_addr == htonl(INADDR_ANY); 1836 case AF_INET6: 1837 return ipv6_addr_any(addr6); 1838 default: 1839 return true; 1840 } 1841 } 1842 1843 int ceph_addr_port(const struct ceph_entity_addr *addr) 1844 { 1845 switch (get_unaligned(&addr->in_addr.ss_family)) { 1846 case AF_INET: 1847 return ntohs(get_unaligned(&((struct sockaddr_in *)&addr->in_addr)->sin_port)); 1848 case AF_INET6: 1849 return ntohs(get_unaligned(&((struct sockaddr_in6 *)&addr->in_addr)->sin6_port)); 1850 } 1851 return 0; 1852 } 1853 1854 void ceph_addr_set_port(struct ceph_entity_addr *addr, int p) 1855 { 1856 switch (get_unaligned(&addr->in_addr.ss_family)) { 1857 case AF_INET: 1858 put_unaligned(htons(p), &((struct sockaddr_in *)&addr->in_addr)->sin_port); 1859 break; 1860 case AF_INET6: 1861 put_unaligned(htons(p), &((struct sockaddr_in6 *)&addr->in_addr)->sin6_port); 1862 break; 1863 } 1864 } 1865 1866 /* 1867 * Unlike other *_pton function semantics, zero indicates success. 1868 */ 1869 static int ceph_pton(const char *str, size_t len, struct ceph_entity_addr *addr, 1870 char delim, const char **ipend) 1871 { 1872 memset(&addr->in_addr, 0, sizeof(addr->in_addr)); 1873 1874 if (in4_pton(str, len, (u8 *)&((struct sockaddr_in *)&addr->in_addr)->sin_addr.s_addr, delim, ipend)) { 1875 put_unaligned(AF_INET, &addr->in_addr.ss_family); 1876 return 0; 1877 } 1878 1879 if (in6_pton(str, len, (u8 *)&((struct sockaddr_in6 *)&addr->in_addr)->sin6_addr.s6_addr, delim, ipend)) { 1880 put_unaligned(AF_INET6, &addr->in_addr.ss_family); 1881 return 0; 1882 } 1883 1884 return -EINVAL; 1885 } 1886 1887 /* 1888 * Extract hostname string and resolve using kernel DNS facility. 1889 */ 1890 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER 1891 static int ceph_dns_resolve_name(const char *name, size_t namelen, 1892 struct ceph_entity_addr *addr, char delim, const char **ipend) 1893 { 1894 const char *end, *delim_p; 1895 char *colon_p, *ip_addr = NULL; 1896 int ip_len, ret; 1897 1898 /* 1899 * The end of the hostname occurs immediately preceding the delimiter or 1900 * the port marker (':') where the delimiter takes precedence. 1901 */ 1902 delim_p = memchr(name, delim, namelen); 1903 colon_p = memchr(name, ':', namelen); 1904 1905 if (delim_p && colon_p) 1906 end = delim_p < colon_p ? delim_p : colon_p; 1907 else if (!delim_p && colon_p) 1908 end = colon_p; 1909 else { 1910 end = delim_p; 1911 if (!end) /* case: hostname:/ */ 1912 end = name + namelen; 1913 } 1914 1915 if (end <= name) 1916 return -EINVAL; 1917 1918 /* do dns_resolve upcall */ 1919 ip_len = dns_query(current->nsproxy->net_ns, 1920 NULL, name, end - name, NULL, &ip_addr, NULL, false); 1921 if (ip_len > 0) 1922 ret = ceph_pton(ip_addr, ip_len, addr, -1, NULL); 1923 else 1924 ret = -ESRCH; 1925 1926 kfree(ip_addr); 1927 1928 *ipend = end; 1929 1930 pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name, 1931 ret, ret ? "failed" : ceph_pr_addr(addr)); 1932 1933 return ret; 1934 } 1935 #else 1936 static inline int ceph_dns_resolve_name(const char *name, size_t namelen, 1937 struct ceph_entity_addr *addr, char delim, const char **ipend) 1938 { 1939 return -EINVAL; 1940 } 1941 #endif 1942 1943 /* 1944 * Parse a server name (IP or hostname). If a valid IP address is not found 1945 * then try to extract a hostname to resolve using userspace DNS upcall. 1946 */ 1947 static int ceph_parse_server_name(const char *name, size_t namelen, 1948 struct ceph_entity_addr *addr, char delim, const char **ipend) 1949 { 1950 int ret; 1951 1952 ret = ceph_pton(name, namelen, addr, delim, ipend); 1953 if (ret) 1954 ret = ceph_dns_resolve_name(name, namelen, addr, delim, ipend); 1955 1956 return ret; 1957 } 1958 1959 /* 1960 * Parse an ip[:port] list into an addr array. Use the default 1961 * monitor port if a port isn't specified. 1962 */ 1963 int ceph_parse_ips(const char *c, const char *end, 1964 struct ceph_entity_addr *addr, 1965 int max_count, int *count) 1966 { 1967 int i, ret = -EINVAL; 1968 const char *p = c; 1969 1970 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 1971 for (i = 0; i < max_count; i++) { 1972 const char *ipend; 1973 int port; 1974 char delim = ','; 1975 1976 if (*p == '[') { 1977 delim = ']'; 1978 p++; 1979 } 1980 1981 ret = ceph_parse_server_name(p, end - p, &addr[i], delim, &ipend); 1982 if (ret) 1983 goto bad; 1984 ret = -EINVAL; 1985 1986 p = ipend; 1987 1988 if (delim == ']') { 1989 if (*p != ']') { 1990 dout("missing matching ']'\n"); 1991 goto bad; 1992 } 1993 p++; 1994 } 1995 1996 /* port? */ 1997 if (p < end && *p == ':') { 1998 port = 0; 1999 p++; 2000 while (p < end && *p >= '0' && *p <= '9') { 2001 port = (port * 10) + (*p - '0'); 2002 p++; 2003 } 2004 if (port == 0) 2005 port = CEPH_MON_PORT; 2006 else if (port > 65535) 2007 goto bad; 2008 } else { 2009 port = CEPH_MON_PORT; 2010 } 2011 2012 ceph_addr_set_port(&addr[i], port); 2013 addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY; 2014 2015 dout("parse_ips got %s\n", ceph_pr_addr(&addr[i])); 2016 2017 if (p == end) 2018 break; 2019 if (*p != ',') 2020 goto bad; 2021 p++; 2022 } 2023 2024 if (p != end) 2025 goto bad; 2026 2027 if (count) 2028 *count = i + 1; 2029 return 0; 2030 2031 bad: 2032 return ret; 2033 } 2034 2035 static int process_banner(struct ceph_connection *con) 2036 { 2037 struct ceph_entity_addr *my_addr = &con->msgr->inst.addr; 2038 2039 dout("process_banner on %p\n", con); 2040 2041 if (verify_hello(con) < 0) 2042 return -1; 2043 2044 /* 2045 * Make sure the other end is who we wanted. note that the other 2046 * end may not yet know their ip address, so if it's 0.0.0.0, give 2047 * them the benefit of the doubt. 2048 */ 2049 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 2050 sizeof(con->peer_addr)) != 0 && 2051 !(ceph_addr_is_blank(&con->actual_peer_addr) && 2052 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 2053 pr_warn("wrong peer, want %s/%u, got %s/%u\n", 2054 ceph_pr_addr(&con->peer_addr), 2055 le32_to_cpu(con->peer_addr.nonce), 2056 ceph_pr_addr(&con->actual_peer_addr), 2057 le32_to_cpu(con->actual_peer_addr.nonce)); 2058 con->error_msg = "wrong peer at address"; 2059 return -1; 2060 } 2061 2062 /* 2063 * did we learn our address? 2064 */ 2065 if (ceph_addr_is_blank(my_addr)) { 2066 memcpy(&my_addr->in_addr, 2067 &con->peer_addr_for_me.in_addr, 2068 sizeof(con->peer_addr_for_me.in_addr)); 2069 ceph_addr_set_port(my_addr, 0); 2070 ceph_encode_my_addr(con->msgr); 2071 dout("process_banner learned my addr is %s\n", 2072 ceph_pr_addr(my_addr)); 2073 } 2074 2075 return 0; 2076 } 2077 2078 static int process_connect(struct ceph_connection *con) 2079 { 2080 u64 sup_feat = from_msgr(con->msgr)->supported_features; 2081 u64 req_feat = from_msgr(con->msgr)->required_features; 2082 u64 server_feat = le64_to_cpu(con->in_reply.features); 2083 int ret; 2084 2085 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 2086 2087 if (con->auth) { 2088 int len = le32_to_cpu(con->in_reply.authorizer_len); 2089 2090 /* 2091 * Any connection that defines ->get_authorizer() 2092 * should also define ->add_authorizer_challenge() and 2093 * ->verify_authorizer_reply(). 2094 * 2095 * See get_connect_authorizer(). 2096 */ 2097 if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { 2098 ret = con->ops->add_authorizer_challenge( 2099 con, con->auth->authorizer_reply_buf, len); 2100 if (ret < 0) 2101 return ret; 2102 2103 con_out_kvec_reset(con); 2104 __prepare_write_connect(con); 2105 prepare_read_connect(con); 2106 return 0; 2107 } 2108 2109 if (len) { 2110 ret = con->ops->verify_authorizer_reply(con); 2111 if (ret < 0) { 2112 con->error_msg = "bad authorize reply"; 2113 return ret; 2114 } 2115 } 2116 } 2117 2118 switch (con->in_reply.tag) { 2119 case CEPH_MSGR_TAG_FEATURES: 2120 pr_err("%s%lld %s feature set mismatch," 2121 " my %llx < server's %llx, missing %llx\n", 2122 ENTITY_NAME(con->peer_name), 2123 ceph_pr_addr(&con->peer_addr), 2124 sup_feat, server_feat, server_feat & ~sup_feat); 2125 con->error_msg = "missing required protocol features"; 2126 return -1; 2127 2128 case CEPH_MSGR_TAG_BADPROTOVER: 2129 pr_err("%s%lld %s protocol version mismatch," 2130 " my %d != server's %d\n", 2131 ENTITY_NAME(con->peer_name), 2132 ceph_pr_addr(&con->peer_addr), 2133 le32_to_cpu(con->out_connect.protocol_version), 2134 le32_to_cpu(con->in_reply.protocol_version)); 2135 con->error_msg = "protocol version mismatch"; 2136 return -1; 2137 2138 case CEPH_MSGR_TAG_BADAUTHORIZER: 2139 con->auth_retry++; 2140 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 2141 con->auth_retry); 2142 if (con->auth_retry == 2) { 2143 con->error_msg = "connect authorization failure"; 2144 return -1; 2145 } 2146 con_out_kvec_reset(con); 2147 ret = prepare_write_connect(con); 2148 if (ret < 0) 2149 return ret; 2150 prepare_read_connect(con); 2151 break; 2152 2153 case CEPH_MSGR_TAG_RESETSESSION: 2154 /* 2155 * If we connected with a large connect_seq but the peer 2156 * has no record of a session with us (no connection, or 2157 * connect_seq == 0), they will send RESETSESION to indicate 2158 * that they must have reset their session, and may have 2159 * dropped messages. 2160 */ 2161 dout("process_connect got RESET peer seq %u\n", 2162 le32_to_cpu(con->in_reply.connect_seq)); 2163 pr_info("%s%lld %s session reset\n", 2164 ENTITY_NAME(con->peer_name), 2165 ceph_pr_addr(&con->peer_addr)); 2166 ceph_con_reset_session(con); 2167 con_out_kvec_reset(con); 2168 ret = prepare_write_connect(con); 2169 if (ret < 0) 2170 return ret; 2171 prepare_read_connect(con); 2172 2173 /* Tell ceph about it. */ 2174 mutex_unlock(&con->mutex); 2175 if (con->ops->peer_reset) 2176 con->ops->peer_reset(con); 2177 mutex_lock(&con->mutex); 2178 if (con->state != CEPH_CON_S_V1_CONNECT_MSG) 2179 return -EAGAIN; 2180 break; 2181 2182 case CEPH_MSGR_TAG_RETRY_SESSION: 2183 /* 2184 * If we sent a smaller connect_seq than the peer has, try 2185 * again with a larger value. 2186 */ 2187 dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", 2188 le32_to_cpu(con->out_connect.connect_seq), 2189 le32_to_cpu(con->in_reply.connect_seq)); 2190 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); 2191 con_out_kvec_reset(con); 2192 ret = prepare_write_connect(con); 2193 if (ret < 0) 2194 return ret; 2195 prepare_read_connect(con); 2196 break; 2197 2198 case CEPH_MSGR_TAG_RETRY_GLOBAL: 2199 /* 2200 * If we sent a smaller global_seq than the peer has, try 2201 * again with a larger value. 2202 */ 2203 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 2204 con->peer_global_seq, 2205 le32_to_cpu(con->in_reply.global_seq)); 2206 ceph_get_global_seq(con->msgr, 2207 le32_to_cpu(con->in_reply.global_seq)); 2208 con_out_kvec_reset(con); 2209 ret = prepare_write_connect(con); 2210 if (ret < 0) 2211 return ret; 2212 prepare_read_connect(con); 2213 break; 2214 2215 case CEPH_MSGR_TAG_SEQ: 2216 case CEPH_MSGR_TAG_READY: 2217 if (req_feat & ~server_feat) { 2218 pr_err("%s%lld %s protocol feature mismatch," 2219 " my required %llx > server's %llx, need %llx\n", 2220 ENTITY_NAME(con->peer_name), 2221 ceph_pr_addr(&con->peer_addr), 2222 req_feat, server_feat, req_feat & ~server_feat); 2223 con->error_msg = "missing required protocol features"; 2224 return -1; 2225 } 2226 2227 WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG); 2228 con->state = CEPH_CON_S_OPEN; 2229 con->auth_retry = 0; /* we authenticated; clear flag */ 2230 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 2231 con->connect_seq++; 2232 con->peer_features = server_feat; 2233 dout("process_connect got READY gseq %d cseq %d (%d)\n", 2234 con->peer_global_seq, 2235 le32_to_cpu(con->in_reply.connect_seq), 2236 con->connect_seq); 2237 WARN_ON(con->connect_seq != 2238 le32_to_cpu(con->in_reply.connect_seq)); 2239 2240 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 2241 ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); 2242 2243 con->delay = 0; /* reset backoff memory */ 2244 2245 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { 2246 prepare_write_seq(con); 2247 prepare_read_seq(con); 2248 } else { 2249 prepare_read_tag(con); 2250 } 2251 break; 2252 2253 case CEPH_MSGR_TAG_WAIT: 2254 /* 2255 * If there is a connection race (we are opening 2256 * connections to each other), one of us may just have 2257 * to WAIT. This shouldn't happen if we are the 2258 * client. 2259 */ 2260 con->error_msg = "protocol error, got WAIT as client"; 2261 return -1; 2262 2263 default: 2264 con->error_msg = "protocol error, garbage tag during connect"; 2265 return -1; 2266 } 2267 return 0; 2268 } 2269 2270 2271 /* 2272 * read (part of) an ack 2273 */ 2274 static int read_partial_ack(struct ceph_connection *con) 2275 { 2276 int size = sizeof (con->in_temp_ack); 2277 int end = size; 2278 2279 return read_partial(con, end, size, &con->in_temp_ack); 2280 } 2281 2282 /* 2283 * We can finally discard anything that's been acked. 2284 */ 2285 static void process_ack(struct ceph_connection *con) 2286 { 2287 u64 ack = le64_to_cpu(con->in_temp_ack); 2288 2289 if (con->in_tag == CEPH_MSGR_TAG_ACK) 2290 ceph_con_discard_sent(con, ack); 2291 else 2292 ceph_con_discard_requeued(con, ack); 2293 2294 prepare_read_tag(con); 2295 } 2296 2297 2298 static int read_partial_message_section(struct ceph_connection *con, 2299 struct kvec *section, 2300 unsigned int sec_len, u32 *crc) 2301 { 2302 int ret, left; 2303 2304 BUG_ON(!section); 2305 2306 while (section->iov_len < sec_len) { 2307 BUG_ON(section->iov_base == NULL); 2308 left = sec_len - section->iov_len; 2309 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 2310 section->iov_len, left); 2311 if (ret <= 0) 2312 return ret; 2313 section->iov_len += ret; 2314 } 2315 if (section->iov_len == sec_len) 2316 *crc = crc32c(0, section->iov_base, section->iov_len); 2317 2318 return 1; 2319 } 2320 2321 static int read_partial_msg_data(struct ceph_connection *con) 2322 { 2323 struct ceph_msg *msg = con->in_msg; 2324 struct ceph_msg_data_cursor *cursor = &msg->cursor; 2325 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 2326 struct page *page; 2327 size_t page_offset; 2328 size_t length; 2329 u32 crc = 0; 2330 int ret; 2331 2332 if (!msg->num_data_items) 2333 return -EIO; 2334 2335 if (do_datacrc) 2336 crc = con->in_data_crc; 2337 while (cursor->total_resid) { 2338 if (!cursor->resid) { 2339 ceph_msg_data_advance(cursor, 0); 2340 continue; 2341 } 2342 2343 page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); 2344 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); 2345 if (ret <= 0) { 2346 if (do_datacrc) 2347 con->in_data_crc = crc; 2348 2349 return ret; 2350 } 2351 2352 if (do_datacrc) 2353 crc = ceph_crc32c_page(crc, page, page_offset, ret); 2354 ceph_msg_data_advance(cursor, (size_t)ret); 2355 } 2356 if (do_datacrc) 2357 con->in_data_crc = crc; 2358 2359 return 1; /* must return > 0 to indicate success */ 2360 } 2361 2362 /* 2363 * read (part of) a message. 2364 */ 2365 static int read_partial_message(struct ceph_connection *con) 2366 { 2367 struct ceph_msg *m = con->in_msg; 2368 int size; 2369 int end; 2370 int ret; 2371 unsigned int front_len, middle_len, data_len; 2372 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 2373 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); 2374 u64 seq; 2375 u32 crc; 2376 2377 dout("read_partial_message con %p msg %p\n", con, m); 2378 2379 /* header */ 2380 size = sizeof (con->in_hdr); 2381 end = size; 2382 ret = read_partial(con, end, size, &con->in_hdr); 2383 if (ret <= 0) 2384 return ret; 2385 2386 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); 2387 if (cpu_to_le32(crc) != con->in_hdr.crc) { 2388 pr_err("read_partial_message bad hdr crc %u != expected %u\n", 2389 crc, con->in_hdr.crc); 2390 return -EBADMSG; 2391 } 2392 2393 front_len = le32_to_cpu(con->in_hdr.front_len); 2394 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 2395 return -EIO; 2396 middle_len = le32_to_cpu(con->in_hdr.middle_len); 2397 if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN) 2398 return -EIO; 2399 data_len = le32_to_cpu(con->in_hdr.data_len); 2400 if (data_len > CEPH_MSG_MAX_DATA_LEN) 2401 return -EIO; 2402 2403 /* verify seq# */ 2404 seq = le64_to_cpu(con->in_hdr.seq); 2405 if ((s64)seq - (s64)con->in_seq < 1) { 2406 pr_info("skipping %s%lld %s seq %lld expected %lld\n", 2407 ENTITY_NAME(con->peer_name), 2408 ceph_pr_addr(&con->peer_addr), 2409 seq, con->in_seq + 1); 2410 con->in_base_pos = -front_len - middle_len - data_len - 2411 sizeof_footer(con); 2412 con->in_tag = CEPH_MSGR_TAG_READY; 2413 return 1; 2414 } else if ((s64)seq - (s64)con->in_seq > 1) { 2415 pr_err("read_partial_message bad seq %lld expected %lld\n", 2416 seq, con->in_seq + 1); 2417 con->error_msg = "bad message sequence # for incoming message"; 2418 return -EBADE; 2419 } 2420 2421 /* allocate message? */ 2422 if (!con->in_msg) { 2423 int skip = 0; 2424 2425 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 2426 front_len, data_len); 2427 ret = ceph_con_in_msg_alloc(con, &con->in_hdr, &skip); 2428 if (ret < 0) 2429 return ret; 2430 2431 BUG_ON(!con->in_msg ^ skip); 2432 if (skip) { 2433 /* skip this message */ 2434 dout("alloc_msg said skip message\n"); 2435 con->in_base_pos = -front_len - middle_len - data_len - 2436 sizeof_footer(con); 2437 con->in_tag = CEPH_MSGR_TAG_READY; 2438 con->in_seq++; 2439 return 1; 2440 } 2441 2442 BUG_ON(!con->in_msg); 2443 BUG_ON(con->in_msg->con != con); 2444 m = con->in_msg; 2445 m->front.iov_len = 0; /* haven't read it yet */ 2446 if (m->middle) 2447 m->middle->vec.iov_len = 0; 2448 2449 /* prepare for data payload, if any */ 2450 2451 if (data_len) 2452 prepare_message_data(con->in_msg, data_len); 2453 } 2454 2455 /* front */ 2456 ret = read_partial_message_section(con, &m->front, front_len, 2457 &con->in_front_crc); 2458 if (ret <= 0) 2459 return ret; 2460 2461 /* middle */ 2462 if (m->middle) { 2463 ret = read_partial_message_section(con, &m->middle->vec, 2464 middle_len, 2465 &con->in_middle_crc); 2466 if (ret <= 0) 2467 return ret; 2468 } 2469 2470 /* (page) data */ 2471 if (data_len) { 2472 ret = read_partial_msg_data(con); 2473 if (ret <= 0) 2474 return ret; 2475 } 2476 2477 /* footer */ 2478 size = sizeof_footer(con); 2479 end += size; 2480 ret = read_partial(con, end, size, &m->footer); 2481 if (ret <= 0) 2482 return ret; 2483 2484 if (!need_sign) { 2485 m->footer.flags = m->old_footer.flags; 2486 m->footer.sig = 0; 2487 } 2488 2489 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 2490 m, front_len, m->footer.front_crc, middle_len, 2491 m->footer.middle_crc, data_len, m->footer.data_crc); 2492 2493 /* crc ok? */ 2494 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 2495 pr_err("read_partial_message %p front crc %u != exp. %u\n", 2496 m, con->in_front_crc, m->footer.front_crc); 2497 return -EBADMSG; 2498 } 2499 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 2500 pr_err("read_partial_message %p middle crc %u != exp %u\n", 2501 m, con->in_middle_crc, m->footer.middle_crc); 2502 return -EBADMSG; 2503 } 2504 if (do_datacrc && 2505 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 2506 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 2507 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 2508 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 2509 return -EBADMSG; 2510 } 2511 2512 if (need_sign && con->ops->check_message_signature && 2513 con->ops->check_message_signature(m)) { 2514 pr_err("read_partial_message %p signature check failed\n", m); 2515 return -EBADMSG; 2516 } 2517 2518 return 1; /* done! */ 2519 } 2520 2521 /* 2522 * Process message. This happens in the worker thread. The callback should 2523 * be careful not to do anything that waits on other incoming messages or it 2524 * may deadlock. 2525 */ 2526 void ceph_con_process_message(struct ceph_connection *con) 2527 { 2528 struct ceph_msg *msg = con->in_msg; 2529 2530 BUG_ON(con->in_msg->con != con); 2531 con->in_msg = NULL; 2532 2533 /* if first message, set peer_name */ 2534 if (con->peer_name.type == 0) 2535 con->peer_name = msg->hdr.src; 2536 2537 con->in_seq++; 2538 mutex_unlock(&con->mutex); 2539 2540 dout("===== %p %llu from %s%lld %d=%s len %d+%d+%d (%u %u %u) =====\n", 2541 msg, le64_to_cpu(msg->hdr.seq), 2542 ENTITY_NAME(msg->hdr.src), 2543 le16_to_cpu(msg->hdr.type), 2544 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 2545 le32_to_cpu(msg->hdr.front_len), 2546 le32_to_cpu(msg->hdr.middle_len), 2547 le32_to_cpu(msg->hdr.data_len), 2548 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 2549 con->ops->dispatch(con, msg); 2550 2551 mutex_lock(&con->mutex); 2552 } 2553 2554 static int read_keepalive_ack(struct ceph_connection *con) 2555 { 2556 struct ceph_timespec ceph_ts; 2557 size_t size = sizeof(ceph_ts); 2558 int ret = read_partial(con, size, size, &ceph_ts); 2559 if (ret <= 0) 2560 return ret; 2561 ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); 2562 prepare_read_tag(con); 2563 return 1; 2564 } 2565 2566 /* 2567 * Write something to the socket. Called in a worker thread when the 2568 * socket appears to be writeable and we have something ready to send. 2569 */ 2570 int ceph_con_v1_try_write(struct ceph_connection *con) 2571 { 2572 int ret = 1; 2573 2574 dout("try_write start %p state %d\n", con, con->state); 2575 if (con->state != CEPH_CON_S_PREOPEN && 2576 con->state != CEPH_CON_S_V1_BANNER && 2577 con->state != CEPH_CON_S_V1_CONNECT_MSG && 2578 con->state != CEPH_CON_S_OPEN) 2579 return 0; 2580 2581 /* open the socket first? */ 2582 if (con->state == CEPH_CON_S_PREOPEN) { 2583 BUG_ON(con->sock); 2584 con->state = CEPH_CON_S_V1_BANNER; 2585 2586 con_out_kvec_reset(con); 2587 prepare_write_banner(con); 2588 prepare_read_banner(con); 2589 2590 BUG_ON(con->in_msg); 2591 con->in_tag = CEPH_MSGR_TAG_READY; 2592 dout("try_write initiating connect on %p new state %d\n", 2593 con, con->state); 2594 ret = ceph_tcp_connect(con); 2595 if (ret < 0) { 2596 con->error_msg = "connect error"; 2597 goto out; 2598 } 2599 } 2600 2601 more: 2602 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 2603 BUG_ON(!con->sock); 2604 2605 /* kvec data queued? */ 2606 if (con->out_kvec_left) { 2607 ret = write_partial_kvec(con); 2608 if (ret <= 0) 2609 goto out; 2610 } 2611 if (con->out_skip) { 2612 ret = write_partial_skip(con); 2613 if (ret <= 0) 2614 goto out; 2615 } 2616 2617 /* msg pages? */ 2618 if (con->out_msg) { 2619 if (con->out_msg_done) { 2620 ceph_msg_put(con->out_msg); 2621 con->out_msg = NULL; /* we're done with this one */ 2622 goto do_next; 2623 } 2624 2625 ret = write_partial_message_data(con); 2626 if (ret == 1) 2627 goto more; /* we need to send the footer, too! */ 2628 if (ret == 0) 2629 goto out; 2630 if (ret < 0) { 2631 dout("try_write write_partial_message_data err %d\n", 2632 ret); 2633 goto out; 2634 } 2635 } 2636 2637 do_next: 2638 if (con->state == CEPH_CON_S_OPEN) { 2639 if (ceph_con_flag_test_and_clear(con, 2640 CEPH_CON_F_KEEPALIVE_PENDING)) { 2641 prepare_write_keepalive(con); 2642 goto more; 2643 } 2644 /* is anything else pending? */ 2645 if (!list_empty(&con->out_queue)) { 2646 prepare_write_message(con); 2647 goto more; 2648 } 2649 if (con->in_seq > con->in_seq_acked) { 2650 prepare_write_ack(con); 2651 goto more; 2652 } 2653 } 2654 2655 /* Nothing to do! */ 2656 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); 2657 dout("try_write nothing else to write.\n"); 2658 ret = 0; 2659 out: 2660 dout("try_write done on %p ret %d\n", con, ret); 2661 return ret; 2662 } 2663 2664 /* 2665 * Read what we can from the socket. 2666 */ 2667 int ceph_con_v1_try_read(struct ceph_connection *con) 2668 { 2669 int ret = -1; 2670 2671 more: 2672 dout("try_read start %p state %d\n", con, con->state); 2673 if (con->state != CEPH_CON_S_V1_BANNER && 2674 con->state != CEPH_CON_S_V1_CONNECT_MSG && 2675 con->state != CEPH_CON_S_OPEN) 2676 return 0; 2677 2678 BUG_ON(!con->sock); 2679 2680 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 2681 con->in_base_pos); 2682 2683 if (con->state == CEPH_CON_S_V1_BANNER) { 2684 ret = read_partial_banner(con); 2685 if (ret <= 0) 2686 goto out; 2687 ret = process_banner(con); 2688 if (ret < 0) 2689 goto out; 2690 2691 con->state = CEPH_CON_S_V1_CONNECT_MSG; 2692 2693 /* 2694 * Received banner is good, exchange connection info. 2695 * Do not reset out_kvec, as sending our banner raced 2696 * with receiving peer banner after connect completed. 2697 */ 2698 ret = prepare_write_connect(con); 2699 if (ret < 0) 2700 goto out; 2701 prepare_read_connect(con); 2702 2703 /* Send connection info before awaiting response */ 2704 goto out; 2705 } 2706 2707 if (con->state == CEPH_CON_S_V1_CONNECT_MSG) { 2708 ret = read_partial_connect(con); 2709 if (ret <= 0) 2710 goto out; 2711 ret = process_connect(con); 2712 if (ret < 0) 2713 goto out; 2714 goto more; 2715 } 2716 2717 WARN_ON(con->state != CEPH_CON_S_OPEN); 2718 2719 if (con->in_base_pos < 0) { 2720 /* 2721 * skipping + discarding content. 2722 */ 2723 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); 2724 if (ret <= 0) 2725 goto out; 2726 dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); 2727 con->in_base_pos += ret; 2728 if (con->in_base_pos) 2729 goto more; 2730 } 2731 if (con->in_tag == CEPH_MSGR_TAG_READY) { 2732 /* 2733 * what's next? 2734 */ 2735 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 2736 if (ret <= 0) 2737 goto out; 2738 dout("try_read got tag %d\n", (int)con->in_tag); 2739 switch (con->in_tag) { 2740 case CEPH_MSGR_TAG_MSG: 2741 prepare_read_message(con); 2742 break; 2743 case CEPH_MSGR_TAG_ACK: 2744 prepare_read_ack(con); 2745 break; 2746 case CEPH_MSGR_TAG_KEEPALIVE2_ACK: 2747 prepare_read_keepalive_ack(con); 2748 break; 2749 case CEPH_MSGR_TAG_CLOSE: 2750 ceph_con_close_socket(con); 2751 con->state = CEPH_CON_S_CLOSED; 2752 goto out; 2753 default: 2754 goto bad_tag; 2755 } 2756 } 2757 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 2758 ret = read_partial_message(con); 2759 if (ret <= 0) { 2760 switch (ret) { 2761 case -EBADMSG: 2762 con->error_msg = "bad crc/signature"; 2763 fallthrough; 2764 case -EBADE: 2765 ret = -EIO; 2766 break; 2767 case -EIO: 2768 con->error_msg = "io error"; 2769 break; 2770 } 2771 goto out; 2772 } 2773 if (con->in_tag == CEPH_MSGR_TAG_READY) 2774 goto more; 2775 ceph_con_process_message(con); 2776 if (con->state == CEPH_CON_S_OPEN) 2777 prepare_read_tag(con); 2778 goto more; 2779 } 2780 if (con->in_tag == CEPH_MSGR_TAG_ACK || 2781 con->in_tag == CEPH_MSGR_TAG_SEQ) { 2782 /* 2783 * the final handshake seq exchange is semantically 2784 * equivalent to an ACK 2785 */ 2786 ret = read_partial_ack(con); 2787 if (ret <= 0) 2788 goto out; 2789 process_ack(con); 2790 goto more; 2791 } 2792 if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { 2793 ret = read_keepalive_ack(con); 2794 if (ret <= 0) 2795 goto out; 2796 goto more; 2797 } 2798 2799 out: 2800 dout("try_read done on %p ret %d\n", con, ret); 2801 return ret; 2802 2803 bad_tag: 2804 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 2805 con->error_msg = "protocol error, garbage tag"; 2806 ret = -1; 2807 goto out; 2808 } 2809 2810 2811 /* 2812 * Atomically queue work on a connection after the specified delay. 2813 * Bump @con reference to avoid races with connection teardown. 2814 * Returns 0 if work was queued, or an error code otherwise. 2815 */ 2816 static int queue_con_delay(struct ceph_connection *con, unsigned long delay) 2817 { 2818 if (!con->ops->get(con)) { 2819 dout("%s %p ref count 0\n", __func__, con); 2820 return -ENOENT; 2821 } 2822 2823 if (delay >= HZ) 2824 delay = round_jiffies_relative(delay); 2825 2826 dout("%s %p %lu\n", __func__, con, delay); 2827 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { 2828 dout("%s %p - already queued\n", __func__, con); 2829 con->ops->put(con); 2830 return -EBUSY; 2831 } 2832 2833 return 0; 2834 } 2835 2836 static void queue_con(struct ceph_connection *con) 2837 { 2838 (void) queue_con_delay(con, 0); 2839 } 2840 2841 static void cancel_con(struct ceph_connection *con) 2842 { 2843 if (cancel_delayed_work(&con->work)) { 2844 dout("%s %p\n", __func__, con); 2845 con->ops->put(con); 2846 } 2847 } 2848 2849 static bool con_sock_closed(struct ceph_connection *con) 2850 { 2851 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) 2852 return false; 2853 2854 #define CASE(x) \ 2855 case CEPH_CON_S_ ## x: \ 2856 con->error_msg = "socket closed (con state " #x ")"; \ 2857 break; 2858 2859 switch (con->state) { 2860 CASE(CLOSED); 2861 CASE(PREOPEN); 2862 CASE(V1_BANNER); 2863 CASE(V1_CONNECT_MSG); 2864 CASE(OPEN); 2865 CASE(STANDBY); 2866 default: 2867 BUG(); 2868 } 2869 #undef CASE 2870 2871 return true; 2872 } 2873 2874 static bool con_backoff(struct ceph_connection *con) 2875 { 2876 int ret; 2877 2878 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) 2879 return false; 2880 2881 ret = queue_con_delay(con, con->delay); 2882 if (ret) { 2883 dout("%s: con %p FAILED to back off %lu\n", __func__, 2884 con, con->delay); 2885 BUG_ON(ret == -ENOENT); 2886 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); 2887 } 2888 2889 return true; 2890 } 2891 2892 /* Finish fault handling; con->mutex must *not* be held here */ 2893 2894 static void con_fault_finish(struct ceph_connection *con) 2895 { 2896 dout("%s %p\n", __func__, con); 2897 2898 /* 2899 * in case we faulted due to authentication, invalidate our 2900 * current tickets so that we can get new ones. 2901 */ 2902 if (con->auth_retry) { 2903 dout("auth_retry %d, invalidating\n", con->auth_retry); 2904 if (con->ops->invalidate_authorizer) 2905 con->ops->invalidate_authorizer(con); 2906 con->auth_retry = 0; 2907 } 2908 2909 if (con->ops->fault) 2910 con->ops->fault(con); 2911 } 2912 2913 /* 2914 * Do some work on a connection. Drop a connection ref when we're done. 2915 */ 2916 static void ceph_con_workfn(struct work_struct *work) 2917 { 2918 struct ceph_connection *con = container_of(work, struct ceph_connection, 2919 work.work); 2920 bool fault; 2921 2922 mutex_lock(&con->mutex); 2923 while (true) { 2924 int ret; 2925 2926 if ((fault = con_sock_closed(con))) { 2927 dout("%s: con %p SOCK_CLOSED\n", __func__, con); 2928 break; 2929 } 2930 if (con_backoff(con)) { 2931 dout("%s: con %p BACKOFF\n", __func__, con); 2932 break; 2933 } 2934 if (con->state == CEPH_CON_S_STANDBY) { 2935 dout("%s: con %p STANDBY\n", __func__, con); 2936 break; 2937 } 2938 if (con->state == CEPH_CON_S_CLOSED) { 2939 dout("%s: con %p CLOSED\n", __func__, con); 2940 BUG_ON(con->sock); 2941 break; 2942 } 2943 if (con->state == CEPH_CON_S_PREOPEN) { 2944 dout("%s: con %p PREOPEN\n", __func__, con); 2945 BUG_ON(con->sock); 2946 } 2947 2948 ret = ceph_con_v1_try_read(con); 2949 if (ret < 0) { 2950 if (ret == -EAGAIN) 2951 continue; 2952 if (!con->error_msg) 2953 con->error_msg = "socket error on read"; 2954 fault = true; 2955 break; 2956 } 2957 2958 ret = ceph_con_v1_try_write(con); 2959 if (ret < 0) { 2960 if (ret == -EAGAIN) 2961 continue; 2962 if (!con->error_msg) 2963 con->error_msg = "socket error on write"; 2964 fault = true; 2965 } 2966 2967 break; /* If we make it to here, we're done */ 2968 } 2969 if (fault) 2970 con_fault(con); 2971 mutex_unlock(&con->mutex); 2972 2973 if (fault) 2974 con_fault_finish(con); 2975 2976 con->ops->put(con); 2977 } 2978 2979 /* 2980 * Generic error/fault handler. A retry mechanism is used with 2981 * exponential backoff 2982 */ 2983 static void con_fault(struct ceph_connection *con) 2984 { 2985 dout("fault %p state %d to peer %s\n", 2986 con, con->state, ceph_pr_addr(&con->peer_addr)); 2987 2988 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 2989 ceph_pr_addr(&con->peer_addr), con->error_msg); 2990 con->error_msg = NULL; 2991 2992 WARN_ON(con->state != CEPH_CON_S_V1_BANNER && 2993 con->state != CEPH_CON_S_V1_CONNECT_MSG && 2994 con->state != CEPH_CON_S_OPEN); 2995 2996 ceph_con_reset_protocol(con); 2997 2998 if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) { 2999 dout("fault on LOSSYTX channel, marking CLOSED\n"); 3000 con->state = CEPH_CON_S_CLOSED; 3001 return; 3002 } 3003 3004 /* Requeue anything that hasn't been acked */ 3005 list_splice_init(&con->out_sent, &con->out_queue); 3006 3007 /* If there are no messages queued or keepalive pending, place 3008 * the connection in a STANDBY state */ 3009 if (list_empty(&con->out_queue) && 3010 !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { 3011 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 3012 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); 3013 con->state = CEPH_CON_S_STANDBY; 3014 } else { 3015 /* retry after a delay. */ 3016 con->state = CEPH_CON_S_PREOPEN; 3017 if (!con->delay) { 3018 con->delay = BASE_DELAY_INTERVAL; 3019 } else if (con->delay < MAX_DELAY_INTERVAL) { 3020 con->delay *= 2; 3021 if (con->delay > MAX_DELAY_INTERVAL) 3022 con->delay = MAX_DELAY_INTERVAL; 3023 } 3024 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); 3025 queue_con(con); 3026 } 3027 } 3028 3029 3030 void ceph_messenger_reset_nonce(struct ceph_messenger *msgr) 3031 { 3032 u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000; 3033 msgr->inst.addr.nonce = cpu_to_le32(nonce); 3034 ceph_encode_my_addr(msgr); 3035 } 3036 3037 /* 3038 * initialize a new messenger instance 3039 */ 3040 void ceph_messenger_init(struct ceph_messenger *msgr, 3041 struct ceph_entity_addr *myaddr) 3042 { 3043 spin_lock_init(&msgr->global_seq_lock); 3044 3045 if (myaddr) { 3046 memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr, 3047 sizeof(msgr->inst.addr.in_addr)); 3048 ceph_addr_set_port(&msgr->inst.addr, 0); 3049 } 3050 3051 msgr->inst.addr.type = 0; 3052 3053 /* generate a random non-zero nonce */ 3054 do { 3055 get_random_bytes(&msgr->inst.addr.nonce, 3056 sizeof(msgr->inst.addr.nonce)); 3057 } while (!msgr->inst.addr.nonce); 3058 ceph_encode_my_addr(msgr); 3059 3060 atomic_set(&msgr->stopping, 0); 3061 write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); 3062 3063 dout("%s %p\n", __func__, msgr); 3064 } 3065 3066 void ceph_messenger_fini(struct ceph_messenger *msgr) 3067 { 3068 put_net(read_pnet(&msgr->net)); 3069 } 3070 3071 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) 3072 { 3073 if (msg->con) 3074 msg->con->ops->put(msg->con); 3075 3076 msg->con = con ? con->ops->get(con) : NULL; 3077 BUG_ON(msg->con != con); 3078 } 3079 3080 static void clear_standby(struct ceph_connection *con) 3081 { 3082 /* come back from STANDBY? */ 3083 if (con->state == CEPH_CON_S_STANDBY) { 3084 dout("clear_standby %p and ++connect_seq\n", con); 3085 con->state = CEPH_CON_S_PREOPEN; 3086 con->connect_seq++; 3087 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); 3088 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); 3089 } 3090 } 3091 3092 /* 3093 * Queue up an outgoing message on the given connection. 3094 * 3095 * Consumes a ref on @msg. 3096 */ 3097 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 3098 { 3099 /* set src+dst */ 3100 msg->hdr.src = con->msgr->inst.name; 3101 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 3102 msg->needs_out_seq = true; 3103 3104 mutex_lock(&con->mutex); 3105 3106 if (con->state == CEPH_CON_S_CLOSED) { 3107 dout("con_send %p closed, dropping %p\n", con, msg); 3108 ceph_msg_put(msg); 3109 mutex_unlock(&con->mutex); 3110 return; 3111 } 3112 3113 msg_con_set(msg, con); 3114 3115 BUG_ON(!list_empty(&msg->list_head)); 3116 list_add_tail(&msg->list_head, &con->out_queue); 3117 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 3118 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), 3119 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 3120 le32_to_cpu(msg->hdr.front_len), 3121 le32_to_cpu(msg->hdr.middle_len), 3122 le32_to_cpu(msg->hdr.data_len)); 3123 3124 clear_standby(con); 3125 mutex_unlock(&con->mutex); 3126 3127 /* if there wasn't anything waiting to send before, queue 3128 * new work */ 3129 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) 3130 queue_con(con); 3131 } 3132 EXPORT_SYMBOL(ceph_con_send); 3133 3134 void ceph_con_v1_revoke(struct ceph_connection *con) 3135 { 3136 struct ceph_msg *msg = con->out_msg; 3137 3138 WARN_ON(con->out_skip); 3139 /* footer */ 3140 if (con->out_msg_done) { 3141 con->out_skip += con_out_kvec_skip(con); 3142 } else { 3143 WARN_ON(!msg->data_length); 3144 con->out_skip += sizeof_footer(con); 3145 } 3146 /* data, middle, front */ 3147 if (msg->data_length) 3148 con->out_skip += msg->cursor.total_resid; 3149 if (msg->middle) 3150 con->out_skip += con_out_kvec_skip(con); 3151 con->out_skip += con_out_kvec_skip(con); 3152 3153 dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, 3154 con->out_kvec_bytes, con->out_skip); 3155 } 3156 3157 /* 3158 * Revoke a message that was previously queued for send 3159 */ 3160 void ceph_msg_revoke(struct ceph_msg *msg) 3161 { 3162 struct ceph_connection *con = msg->con; 3163 3164 if (!con) { 3165 dout("%s msg %p null con\n", __func__, msg); 3166 return; /* Message not in our possession */ 3167 } 3168 3169 mutex_lock(&con->mutex); 3170 if (list_empty(&msg->list_head)) { 3171 WARN_ON(con->out_msg == msg); 3172 dout("%s con %p msg %p not linked\n", __func__, con, msg); 3173 mutex_unlock(&con->mutex); 3174 return; 3175 } 3176 3177 dout("%s con %p msg %p was linked\n", __func__, con, msg); 3178 msg->hdr.seq = 0; 3179 ceph_msg_remove(msg); 3180 3181 if (con->out_msg == msg) { 3182 WARN_ON(con->state != CEPH_CON_S_OPEN); 3183 dout("%s con %p msg %p was sending\n", __func__, con, msg); 3184 ceph_con_v1_revoke(con); 3185 ceph_msg_put(con->out_msg); 3186 con->out_msg = NULL; 3187 } else { 3188 dout("%s con %p msg %p not current, out_msg %p\n", __func__, 3189 con, msg, con->out_msg); 3190 } 3191 mutex_unlock(&con->mutex); 3192 } 3193 3194 void ceph_con_v1_revoke_incoming(struct ceph_connection *con) 3195 { 3196 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); 3197 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); 3198 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); 3199 3200 /* skip rest of message */ 3201 con->in_base_pos = con->in_base_pos - 3202 sizeof(struct ceph_msg_header) - 3203 front_len - 3204 middle_len - 3205 data_len - 3206 sizeof(struct ceph_msg_footer); 3207 3208 con->in_tag = CEPH_MSGR_TAG_READY; 3209 con->in_seq++; 3210 3211 dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos); 3212 } 3213 3214 /* 3215 * Revoke a message that we may be reading data into 3216 */ 3217 void ceph_msg_revoke_incoming(struct ceph_msg *msg) 3218 { 3219 struct ceph_connection *con = msg->con; 3220 3221 if (!con) { 3222 dout("%s msg %p null con\n", __func__, msg); 3223 return; /* Message not in our possession */ 3224 } 3225 3226 mutex_lock(&con->mutex); 3227 if (con->in_msg == msg) { 3228 WARN_ON(con->state != CEPH_CON_S_OPEN); 3229 dout("%s con %p msg %p was recving\n", __func__, con, msg); 3230 ceph_con_v1_revoke_incoming(con); 3231 ceph_msg_put(con->in_msg); 3232 con->in_msg = NULL; 3233 } else { 3234 dout("%s con %p msg %p not current, in_msg %p\n", __func__, 3235 con, msg, con->in_msg); 3236 } 3237 mutex_unlock(&con->mutex); 3238 } 3239 3240 /* 3241 * Queue a keepalive byte to ensure the tcp connection is alive. 3242 */ 3243 void ceph_con_keepalive(struct ceph_connection *con) 3244 { 3245 dout("con_keepalive %p\n", con); 3246 mutex_lock(&con->mutex); 3247 clear_standby(con); 3248 ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); 3249 mutex_unlock(&con->mutex); 3250 3251 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) 3252 queue_con(con); 3253 } 3254 EXPORT_SYMBOL(ceph_con_keepalive); 3255 3256 bool ceph_con_keepalive_expired(struct ceph_connection *con, 3257 unsigned long interval) 3258 { 3259 if (interval > 0 && 3260 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { 3261 struct timespec64 now; 3262 struct timespec64 ts; 3263 ktime_get_real_ts64(&now); 3264 jiffies_to_timespec64(interval, &ts); 3265 ts = timespec64_add(con->last_keepalive_ack, ts); 3266 return timespec64_compare(&now, &ts) >= 0; 3267 } 3268 return false; 3269 } 3270 3271 static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg) 3272 { 3273 BUG_ON(msg->num_data_items >= msg->max_data_items); 3274 return &msg->data[msg->num_data_items++]; 3275 } 3276 3277 static void ceph_msg_data_destroy(struct ceph_msg_data *data) 3278 { 3279 if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) { 3280 int num_pages = calc_pages_for(data->alignment, data->length); 3281 ceph_release_page_vector(data->pages, num_pages); 3282 } else if (data->type == CEPH_MSG_DATA_PAGELIST) { 3283 ceph_pagelist_release(data->pagelist); 3284 } 3285 } 3286 3287 void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, 3288 size_t length, size_t alignment, bool own_pages) 3289 { 3290 struct ceph_msg_data *data; 3291 3292 BUG_ON(!pages); 3293 BUG_ON(!length); 3294 3295 data = ceph_msg_data_add(msg); 3296 data->type = CEPH_MSG_DATA_PAGES; 3297 data->pages = pages; 3298 data->length = length; 3299 data->alignment = alignment & ~PAGE_MASK; 3300 data->own_pages = own_pages; 3301 3302 msg->data_length += length; 3303 } 3304 EXPORT_SYMBOL(ceph_msg_data_add_pages); 3305 3306 void ceph_msg_data_add_pagelist(struct ceph_msg *msg, 3307 struct ceph_pagelist *pagelist) 3308 { 3309 struct ceph_msg_data *data; 3310 3311 BUG_ON(!pagelist); 3312 BUG_ON(!pagelist->length); 3313 3314 data = ceph_msg_data_add(msg); 3315 data->type = CEPH_MSG_DATA_PAGELIST; 3316 refcount_inc(&pagelist->refcnt); 3317 data->pagelist = pagelist; 3318 3319 msg->data_length += pagelist->length; 3320 } 3321 EXPORT_SYMBOL(ceph_msg_data_add_pagelist); 3322 3323 #ifdef CONFIG_BLOCK 3324 void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, 3325 u32 length) 3326 { 3327 struct ceph_msg_data *data; 3328 3329 data = ceph_msg_data_add(msg); 3330 data->type = CEPH_MSG_DATA_BIO; 3331 data->bio_pos = *bio_pos; 3332 data->bio_length = length; 3333 3334 msg->data_length += length; 3335 } 3336 EXPORT_SYMBOL(ceph_msg_data_add_bio); 3337 #endif /* CONFIG_BLOCK */ 3338 3339 void ceph_msg_data_add_bvecs(struct ceph_msg *msg, 3340 struct ceph_bvec_iter *bvec_pos) 3341 { 3342 struct ceph_msg_data *data; 3343 3344 data = ceph_msg_data_add(msg); 3345 data->type = CEPH_MSG_DATA_BVECS; 3346 data->bvec_pos = *bvec_pos; 3347 3348 msg->data_length += bvec_pos->iter.bi_size; 3349 } 3350 EXPORT_SYMBOL(ceph_msg_data_add_bvecs); 3351 3352 /* 3353 * construct a new message with given type, size 3354 * the new msg has a ref count of 1. 3355 */ 3356 struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items, 3357 gfp_t flags, bool can_fail) 3358 { 3359 struct ceph_msg *m; 3360 3361 m = kmem_cache_zalloc(ceph_msg_cache, flags); 3362 if (m == NULL) 3363 goto out; 3364 3365 m->hdr.type = cpu_to_le16(type); 3366 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 3367 m->hdr.front_len = cpu_to_le32(front_len); 3368 3369 INIT_LIST_HEAD(&m->list_head); 3370 kref_init(&m->kref); 3371 3372 /* front */ 3373 if (front_len) { 3374 m->front.iov_base = ceph_kvmalloc(front_len, flags); 3375 if (m->front.iov_base == NULL) { 3376 dout("ceph_msg_new can't allocate %d bytes\n", 3377 front_len); 3378 goto out2; 3379 } 3380 } else { 3381 m->front.iov_base = NULL; 3382 } 3383 m->front_alloc_len = m->front.iov_len = front_len; 3384 3385 if (max_data_items) { 3386 m->data = kmalloc_array(max_data_items, sizeof(*m->data), 3387 flags); 3388 if (!m->data) 3389 goto out2; 3390 3391 m->max_data_items = max_data_items; 3392 } 3393 3394 dout("ceph_msg_new %p front %d\n", m, front_len); 3395 return m; 3396 3397 out2: 3398 ceph_msg_put(m); 3399 out: 3400 if (!can_fail) { 3401 pr_err("msg_new can't create type %d front %d\n", type, 3402 front_len); 3403 WARN_ON(1); 3404 } else { 3405 dout("msg_new can't create type %d front %d\n", type, 3406 front_len); 3407 } 3408 return NULL; 3409 } 3410 EXPORT_SYMBOL(ceph_msg_new2); 3411 3412 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 3413 bool can_fail) 3414 { 3415 return ceph_msg_new2(type, front_len, 0, flags, can_fail); 3416 } 3417 EXPORT_SYMBOL(ceph_msg_new); 3418 3419 /* 3420 * Allocate "middle" portion of a message, if it is needed and wasn't 3421 * allocated by alloc_msg. This allows us to read a small fixed-size 3422 * per-type header in the front and then gracefully fail (i.e., 3423 * propagate the error to the caller based on info in the front) when 3424 * the middle is too large. 3425 */ 3426 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 3427 { 3428 int type = le16_to_cpu(msg->hdr.type); 3429 int middle_len = le32_to_cpu(msg->hdr.middle_len); 3430 3431 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, 3432 ceph_msg_type_name(type), middle_len); 3433 BUG_ON(!middle_len); 3434 BUG_ON(msg->middle); 3435 3436 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS); 3437 if (!msg->middle) 3438 return -ENOMEM; 3439 return 0; 3440 } 3441 3442 /* 3443 * Allocate a message for receiving an incoming message on a 3444 * connection, and save the result in con->in_msg. Uses the 3445 * connection's private alloc_msg op if available. 3446 * 3447 * Returns 0 on success, or a negative error code. 3448 * 3449 * On success, if we set *skip = 1: 3450 * - the next message should be skipped and ignored. 3451 * - con->in_msg == NULL 3452 * or if we set *skip = 0: 3453 * - con->in_msg is non-null. 3454 * On error (ENOMEM, EAGAIN, ...), 3455 * - con->in_msg == NULL 3456 */ 3457 int ceph_con_in_msg_alloc(struct ceph_connection *con, 3458 struct ceph_msg_header *hdr, int *skip) 3459 { 3460 int middle_len = le32_to_cpu(hdr->middle_len); 3461 struct ceph_msg *msg; 3462 int ret = 0; 3463 3464 BUG_ON(con->in_msg != NULL); 3465 BUG_ON(!con->ops->alloc_msg); 3466 3467 mutex_unlock(&con->mutex); 3468 msg = con->ops->alloc_msg(con, hdr, skip); 3469 mutex_lock(&con->mutex); 3470 if (con->state != CEPH_CON_S_OPEN) { 3471 if (msg) 3472 ceph_msg_put(msg); 3473 return -EAGAIN; 3474 } 3475 if (msg) { 3476 BUG_ON(*skip); 3477 msg_con_set(msg, con); 3478 con->in_msg = msg; 3479 } else { 3480 /* 3481 * Null message pointer means either we should skip 3482 * this message or we couldn't allocate memory. The 3483 * former is not an error. 3484 */ 3485 if (*skip) 3486 return 0; 3487 3488 con->error_msg = "error allocating memory for incoming message"; 3489 return -ENOMEM; 3490 } 3491 memcpy(&con->in_msg->hdr, hdr, sizeof(*hdr)); 3492 3493 if (middle_len && !con->in_msg->middle) { 3494 ret = ceph_alloc_middle(con, con->in_msg); 3495 if (ret < 0) { 3496 ceph_msg_put(con->in_msg); 3497 con->in_msg = NULL; 3498 } 3499 } 3500 3501 return ret; 3502 } 3503 3504 void ceph_con_get_out_msg(struct ceph_connection *con) 3505 { 3506 struct ceph_msg *msg; 3507 3508 BUG_ON(list_empty(&con->out_queue)); 3509 msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head); 3510 WARN_ON(msg->con != con); 3511 3512 /* 3513 * Put the message on "sent" list using a ref from ceph_con_send(). 3514 * It is put when the message is acked or revoked. 3515 */ 3516 list_move_tail(&msg->list_head, &con->out_sent); 3517 3518 /* 3519 * Only assign outgoing seq # if we haven't sent this message 3520 * yet. If it is requeued, resend with it's original seq. 3521 */ 3522 if (msg->needs_out_seq) { 3523 msg->hdr.seq = cpu_to_le64(++con->out_seq); 3524 msg->needs_out_seq = false; 3525 3526 if (con->ops->reencode_message) 3527 con->ops->reencode_message(msg); 3528 } 3529 3530 /* 3531 * Get a ref for out_msg. It is put when we are done sending the 3532 * message or in case of a fault. 3533 */ 3534 WARN_ON(con->out_msg); 3535 con->out_msg = ceph_msg_get(msg); 3536 } 3537 3538 /* 3539 * Free a generically kmalloc'd message. 3540 */ 3541 static void ceph_msg_free(struct ceph_msg *m) 3542 { 3543 dout("%s %p\n", __func__, m); 3544 kvfree(m->front.iov_base); 3545 kfree(m->data); 3546 kmem_cache_free(ceph_msg_cache, m); 3547 } 3548 3549 static void ceph_msg_release(struct kref *kref) 3550 { 3551 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 3552 int i; 3553 3554 dout("%s %p\n", __func__, m); 3555 WARN_ON(!list_empty(&m->list_head)); 3556 3557 msg_con_set(m, NULL); 3558 3559 /* drop middle, data, if any */ 3560 if (m->middle) { 3561 ceph_buffer_put(m->middle); 3562 m->middle = NULL; 3563 } 3564 3565 for (i = 0; i < m->num_data_items; i++) 3566 ceph_msg_data_destroy(&m->data[i]); 3567 3568 if (m->pool) 3569 ceph_msgpool_put(m->pool, m); 3570 else 3571 ceph_msg_free(m); 3572 } 3573 3574 struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) 3575 { 3576 dout("%s %p (was %d)\n", __func__, msg, 3577 kref_read(&msg->kref)); 3578 kref_get(&msg->kref); 3579 return msg; 3580 } 3581 EXPORT_SYMBOL(ceph_msg_get); 3582 3583 void ceph_msg_put(struct ceph_msg *msg) 3584 { 3585 dout("%s %p (was %d)\n", __func__, msg, 3586 kref_read(&msg->kref)); 3587 kref_put(&msg->kref, ceph_msg_release); 3588 } 3589 EXPORT_SYMBOL(ceph_msg_put); 3590 3591 void ceph_msg_dump(struct ceph_msg *msg) 3592 { 3593 pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg, 3594 msg->front_alloc_len, msg->data_length); 3595 print_hex_dump(KERN_DEBUG, "header: ", 3596 DUMP_PREFIX_OFFSET, 16, 1, 3597 &msg->hdr, sizeof(msg->hdr), true); 3598 print_hex_dump(KERN_DEBUG, " front: ", 3599 DUMP_PREFIX_OFFSET, 16, 1, 3600 msg->front.iov_base, msg->front.iov_len, true); 3601 if (msg->middle) 3602 print_hex_dump(KERN_DEBUG, "middle: ", 3603 DUMP_PREFIX_OFFSET, 16, 1, 3604 msg->middle->vec.iov_base, 3605 msg->middle->vec.iov_len, true); 3606 print_hex_dump(KERN_DEBUG, "footer: ", 3607 DUMP_PREFIX_OFFSET, 16, 1, 3608 &msg->footer, sizeof(msg->footer), true); 3609 } 3610 EXPORT_SYMBOL(ceph_msg_dump); 3611