1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/crc32c.h> 5 #include <linux/ctype.h> 6 #include <linux/highmem.h> 7 #include <linux/inet.h> 8 #include <linux/kthread.h> 9 #include <linux/net.h> 10 #include <linux/nsproxy.h> 11 #include <linux/sched/mm.h> 12 #include <linux/slab.h> 13 #include <linux/socket.h> 14 #include <linux/string.h> 15 #ifdef CONFIG_BLOCK 16 #include <linux/bio.h> 17 #endif /* CONFIG_BLOCK */ 18 #include <linux/dns_resolver.h> 19 #include <net/tcp.h> 20 #include <trace/events/sock.h> 21 22 #include <linux/ceph/ceph_features.h> 23 #include <linux/ceph/libceph.h> 24 #include <linux/ceph/messenger.h> 25 #include <linux/ceph/decode.h> 26 #include <linux/ceph/pagelist.h> 27 #include <linux/export.h> 28 29 /* 30 * Ceph uses the messenger to exchange ceph_msg messages with other 31 * hosts in the system. The messenger provides ordered and reliable 32 * delivery. We tolerate TCP disconnects by reconnecting (with 33 * exponential backoff) in the case of a fault (disconnection, bad 34 * crc, protocol error). Acks allow sent messages to be discarded by 35 * the sender. 36 */ 37 38 /* 39 * We track the state of the socket on a given connection using 40 * values defined below. The transition to a new socket state is 41 * handled by a function which verifies we aren't coming from an 42 * unexpected state. 43 * 44 * -------- 45 * | NEW* | transient initial state 46 * -------- 47 * | con_sock_state_init() 48 * v 49 * ---------- 50 * | CLOSED | initialized, but no socket (and no 51 * ---------- TCP connection) 52 * ^ \ 53 * | \ con_sock_state_connecting() 54 * | ---------------------- 55 * | \ 56 * + con_sock_state_closed() \ 57 * |+--------------------------- \ 58 * | \ \ \ 59 * | ----------- \ \ 60 * | | CLOSING | socket event; \ \ 61 * | ----------- await close \ \ 62 * | ^ \ | 63 * | | \ | 64 * | + con_sock_state_closing() \ | 65 * | / \ | | 66 * | / --------------- | | 67 * | / \ v v 68 * | / -------------- 69 * | / -----------------| CONNECTING | socket created, TCP 70 * | | / -------------- connect initiated 71 * | | | con_sock_state_connected() 72 * | | v 73 * ------------- 74 * | CONNECTED | TCP connection established 75 * ------------- 76 * 77 * State values for ceph_connection->sock_state; NEW is assumed to be 0. 78 */ 79 80 #define CON_SOCK_STATE_NEW 0 /* -> CLOSED */ 81 #define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */ 82 #define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */ 83 #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ 84 #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ 85 86 static bool con_flag_valid(unsigned long con_flag) 87 { 88 switch (con_flag) { 89 case CEPH_CON_F_LOSSYTX: 90 case CEPH_CON_F_KEEPALIVE_PENDING: 91 case CEPH_CON_F_WRITE_PENDING: 92 case CEPH_CON_F_SOCK_CLOSED: 93 case CEPH_CON_F_BACKOFF: 94 return true; 95 default: 96 return false; 97 } 98 } 99 100 void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag) 101 { 102 BUG_ON(!con_flag_valid(con_flag)); 103 104 clear_bit(con_flag, &con->flags); 105 } 106 107 void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag) 108 { 109 BUG_ON(!con_flag_valid(con_flag)); 110 111 set_bit(con_flag, &con->flags); 112 } 113 114 bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag) 115 { 116 BUG_ON(!con_flag_valid(con_flag)); 117 118 return test_bit(con_flag, &con->flags); 119 } 120 121 bool ceph_con_flag_test_and_clear(struct ceph_connection *con, 122 unsigned long con_flag) 123 { 124 BUG_ON(!con_flag_valid(con_flag)); 125 126 return test_and_clear_bit(con_flag, &con->flags); 127 } 128 129 bool ceph_con_flag_test_and_set(struct ceph_connection *con, 130 unsigned long con_flag) 131 { 132 BUG_ON(!con_flag_valid(con_flag)); 133 134 return test_and_set_bit(con_flag, &con->flags); 135 } 136 137 /* Slab caches for frequently-allocated structures */ 138 139 static struct kmem_cache *ceph_msg_cache; 140 141 #ifdef CONFIG_LOCKDEP 142 static struct lock_class_key socket_class; 143 #endif 144 145 static void queue_con(struct ceph_connection *con); 146 static void cancel_con(struct ceph_connection *con); 147 static void ceph_con_workfn(struct work_struct *); 148 static void con_fault(struct ceph_connection *con); 149 150 /* 151 * Nicely render a sockaddr as a string. An array of formatted 152 * strings is used, to approximate reentrancy. 153 */ 154 #define ADDR_STR_COUNT_LOG 5 /* log2(# address strings in array) */ 155 #define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG) 156 #define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1) 157 #define MAX_ADDR_STR_LEN 64 /* 54 is enough */ 158 159 static char addr_str[ADDR_STR_COUNT][MAX_ADDR_STR_LEN]; 160 static atomic_t addr_str_seq = ATOMIC_INIT(0); 161 162 struct page *ceph_zero_page; /* used in certain error cases */ 163 164 const char *ceph_pr_addr(const struct ceph_entity_addr *addr) 165 { 166 int i; 167 char *s; 168 struct sockaddr_storage ss = addr->in_addr; /* align */ 169 struct sockaddr_in *in4 = (struct sockaddr_in *)&ss; 170 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)&ss; 171 172 i = atomic_inc_return(&addr_str_seq) & ADDR_STR_COUNT_MASK; 173 s = addr_str[i]; 174 175 switch (ss.ss_family) { 176 case AF_INET: 177 snprintf(s, MAX_ADDR_STR_LEN, "(%d)%pI4:%hu", 178 le32_to_cpu(addr->type), &in4->sin_addr, 179 ntohs(in4->sin_port)); 180 break; 181 182 case AF_INET6: 183 snprintf(s, MAX_ADDR_STR_LEN, "(%d)[%pI6c]:%hu", 184 le32_to_cpu(addr->type), &in6->sin6_addr, 185 ntohs(in6->sin6_port)); 186 break; 187 188 default: 189 snprintf(s, MAX_ADDR_STR_LEN, "(unknown sockaddr family %hu)", 190 ss.ss_family); 191 } 192 193 return s; 194 } 195 EXPORT_SYMBOL(ceph_pr_addr); 196 197 void ceph_encode_my_addr(struct ceph_messenger *msgr) 198 { 199 if (!ceph_msgr2(from_msgr(msgr))) { 200 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, 201 sizeof(msgr->my_enc_addr)); 202 ceph_encode_banner_addr(&msgr->my_enc_addr); 203 } 204 } 205 206 /* 207 * work queue for all reading and writing to/from the socket. 208 */ 209 static struct workqueue_struct *ceph_msgr_wq; 210 211 static int ceph_msgr_slab_init(void) 212 { 213 BUG_ON(ceph_msg_cache); 214 ceph_msg_cache = KMEM_CACHE(ceph_msg, 0); 215 if (!ceph_msg_cache) 216 return -ENOMEM; 217 218 return 0; 219 } 220 221 static void ceph_msgr_slab_exit(void) 222 { 223 BUG_ON(!ceph_msg_cache); 224 kmem_cache_destroy(ceph_msg_cache); 225 ceph_msg_cache = NULL; 226 } 227 228 static void _ceph_msgr_exit(void) 229 { 230 if (ceph_msgr_wq) { 231 destroy_workqueue(ceph_msgr_wq); 232 ceph_msgr_wq = NULL; 233 } 234 235 BUG_ON(!ceph_zero_page); 236 put_page(ceph_zero_page); 237 ceph_zero_page = NULL; 238 239 ceph_msgr_slab_exit(); 240 } 241 242 int __init ceph_msgr_init(void) 243 { 244 if (ceph_msgr_slab_init()) 245 return -ENOMEM; 246 247 BUG_ON(ceph_zero_page); 248 ceph_zero_page = ZERO_PAGE(0); 249 get_page(ceph_zero_page); 250 251 /* 252 * The number of active work items is limited by the number of 253 * connections, so leave @max_active at default. 254 */ 255 ceph_msgr_wq = alloc_workqueue("ceph-msgr", 256 WQ_MEM_RECLAIM | WQ_PERCPU, 0); 257 if (ceph_msgr_wq) 258 return 0; 259 260 pr_err("msgr_init failed to create workqueue\n"); 261 _ceph_msgr_exit(); 262 263 return -ENOMEM; 264 } 265 266 void ceph_msgr_exit(void) 267 { 268 BUG_ON(ceph_msgr_wq == NULL); 269 270 _ceph_msgr_exit(); 271 } 272 273 void ceph_msgr_flush(void) 274 { 275 flush_workqueue(ceph_msgr_wq); 276 } 277 EXPORT_SYMBOL(ceph_msgr_flush); 278 279 /* Connection socket state transition functions */ 280 281 static void con_sock_state_init(struct ceph_connection *con) 282 { 283 int old_state; 284 285 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); 286 if (WARN_ON(old_state != CON_SOCK_STATE_NEW)) 287 printk("%s: unexpected old state %d\n", __func__, old_state); 288 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 289 CON_SOCK_STATE_CLOSED); 290 } 291 292 static void con_sock_state_connecting(struct ceph_connection *con) 293 { 294 int old_state; 295 296 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); 297 if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED)) 298 printk("%s: unexpected old state %d\n", __func__, old_state); 299 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 300 CON_SOCK_STATE_CONNECTING); 301 } 302 303 static void con_sock_state_connected(struct ceph_connection *con) 304 { 305 int old_state; 306 307 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); 308 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING)) 309 printk("%s: unexpected old state %d\n", __func__, old_state); 310 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 311 CON_SOCK_STATE_CONNECTED); 312 } 313 314 static void con_sock_state_closing(struct ceph_connection *con) 315 { 316 int old_state; 317 318 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); 319 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING && 320 old_state != CON_SOCK_STATE_CONNECTED && 321 old_state != CON_SOCK_STATE_CLOSING)) 322 printk("%s: unexpected old state %d\n", __func__, old_state); 323 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 324 CON_SOCK_STATE_CLOSING); 325 } 326 327 static void con_sock_state_closed(struct ceph_connection *con) 328 { 329 int old_state; 330 331 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); 332 if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED && 333 old_state != CON_SOCK_STATE_CLOSING && 334 old_state != CON_SOCK_STATE_CONNECTING && 335 old_state != CON_SOCK_STATE_CLOSED)) 336 printk("%s: unexpected old state %d\n", __func__, old_state); 337 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, 338 CON_SOCK_STATE_CLOSED); 339 } 340 341 /* 342 * socket callback functions 343 */ 344 345 /* data available on socket, or listen socket received a connect */ 346 static void ceph_sock_data_ready(struct sock *sk) 347 { 348 struct ceph_connection *con = sk->sk_user_data; 349 350 trace_sk_data_ready(sk); 351 352 if (atomic_read(&con->msgr->stopping)) { 353 return; 354 } 355 356 if (sk->sk_state != TCP_CLOSE_WAIT) { 357 dout("%s %p state = %d, queueing work\n", __func__, 358 con, con->state); 359 queue_con(con); 360 } 361 } 362 363 /* socket has buffer space for writing */ 364 static void ceph_sock_write_space(struct sock *sk) 365 { 366 struct ceph_connection *con = sk->sk_user_data; 367 368 /* only queue to workqueue if there is data we want to write, 369 * and there is sufficient space in the socket buffer to accept 370 * more data. clear SOCK_NOSPACE so that ceph_sock_write_space() 371 * doesn't get called again until try_write() fills the socket 372 * buffer. See net/ipv4/tcp_input.c:tcp_check_space() 373 * and net/core/stream.c:sk_stream_write_space(). 374 */ 375 if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { 376 if (sk_stream_is_writeable(sk)) { 377 dout("%s %p queueing write work\n", __func__, con); 378 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 379 queue_con(con); 380 } 381 } else { 382 dout("%s %p nothing to write\n", __func__, con); 383 } 384 } 385 386 /* socket's state has changed */ 387 static void ceph_sock_state_change(struct sock *sk) 388 { 389 struct ceph_connection *con = sk->sk_user_data; 390 391 dout("%s %p state = %d sk_state = %u\n", __func__, 392 con, con->state, sk->sk_state); 393 394 switch (sk->sk_state) { 395 case TCP_CLOSE: 396 dout("%s TCP_CLOSE\n", __func__); 397 fallthrough; 398 case TCP_CLOSE_WAIT: 399 dout("%s TCP_CLOSE_WAIT\n", __func__); 400 con_sock_state_closing(con); 401 ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); 402 queue_con(con); 403 break; 404 case TCP_ESTABLISHED: 405 dout("%s TCP_ESTABLISHED\n", __func__); 406 con_sock_state_connected(con); 407 queue_con(con); 408 break; 409 default: /* Everything else is uninteresting */ 410 break; 411 } 412 } 413 414 /* 415 * set up socket callbacks 416 */ 417 static void set_sock_callbacks(struct socket *sock, 418 struct ceph_connection *con) 419 { 420 struct sock *sk = sock->sk; 421 sk->sk_user_data = con; 422 sk->sk_data_ready = ceph_sock_data_ready; 423 sk->sk_write_space = ceph_sock_write_space; 424 sk->sk_state_change = ceph_sock_state_change; 425 } 426 427 428 /* 429 * socket helpers 430 */ 431 432 /* 433 * initiate connection to a remote socket. 434 */ 435 int ceph_tcp_connect(struct ceph_connection *con) 436 { 437 struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */ 438 struct socket *sock; 439 unsigned int noio_flag; 440 int ret; 441 442 dout("%s con %p peer_addr %s\n", __func__, con, 443 ceph_pr_addr(&con->peer_addr)); 444 BUG_ON(con->sock); 445 446 /* sock_create_kern() allocates with GFP_KERNEL */ 447 noio_flag = memalloc_noio_save(); 448 ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family, 449 SOCK_STREAM, IPPROTO_TCP, &sock); 450 memalloc_noio_restore(noio_flag); 451 if (ret) 452 return ret; 453 sock->sk->sk_allocation = GFP_NOFS; 454 sock->sk->sk_use_task_frag = false; 455 456 #ifdef CONFIG_LOCKDEP 457 lockdep_set_class(&sock->sk->sk_lock, &socket_class); 458 #endif 459 460 set_sock_callbacks(sock, con); 461 462 con_sock_state_connecting(con); 463 ret = kernel_connect(sock, (struct sockaddr *)&ss, sizeof(ss), 464 O_NONBLOCK); 465 if (ret == -EINPROGRESS) { 466 dout("connect %s EINPROGRESS sk_state = %u\n", 467 ceph_pr_addr(&con->peer_addr), 468 sock->sk->sk_state); 469 } else if (ret < 0) { 470 pr_err("connect %s error %d\n", 471 ceph_pr_addr(&con->peer_addr), ret); 472 sock_release(sock); 473 return ret; 474 } 475 476 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) 477 tcp_sock_set_nodelay(sock->sk); 478 479 con->sock = sock; 480 return 0; 481 } 482 483 /* 484 * Shutdown/close the socket for the given connection. 485 */ 486 int ceph_con_close_socket(struct ceph_connection *con) 487 { 488 int rc = 0; 489 490 dout("%s con %p sock %p\n", __func__, con, con->sock); 491 if (con->sock) { 492 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); 493 sock_release(con->sock); 494 con->sock = NULL; 495 } 496 497 /* 498 * Forcibly clear the SOCK_CLOSED flag. It gets set 499 * independent of the connection mutex, and we could have 500 * received a socket close event before we had the chance to 501 * shut the socket down. 502 */ 503 ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); 504 505 con_sock_state_closed(con); 506 return rc; 507 } 508 509 static void ceph_con_reset_protocol(struct ceph_connection *con) 510 { 511 dout("%s con %p\n", __func__, con); 512 513 ceph_con_close_socket(con); 514 if (con->in_msg) { 515 WARN_ON(con->in_msg->con != con); 516 ceph_msg_put(con->in_msg); 517 con->in_msg = NULL; 518 } 519 if (con->out_msg) { 520 WARN_ON(con->out_msg->con != con); 521 ceph_msg_put(con->out_msg); 522 con->out_msg = NULL; 523 } 524 if (con->bounce_page) { 525 __free_page(con->bounce_page); 526 con->bounce_page = NULL; 527 } 528 529 if (ceph_msgr2(from_msgr(con->msgr))) 530 ceph_con_v2_reset_protocol(con); 531 else 532 ceph_con_v1_reset_protocol(con); 533 } 534 535 /* 536 * Reset a connection. Discard all incoming and outgoing messages 537 * and clear *_seq state. 538 */ 539 static void ceph_msg_remove(struct ceph_msg *msg) 540 { 541 list_del_init(&msg->list_head); 542 543 ceph_msg_put(msg); 544 } 545 546 static void ceph_msg_remove_list(struct list_head *head) 547 { 548 while (!list_empty(head)) { 549 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 550 list_head); 551 ceph_msg_remove(msg); 552 } 553 } 554 555 void ceph_con_reset_session(struct ceph_connection *con) 556 { 557 dout("%s con %p\n", __func__, con); 558 559 WARN_ON(con->in_msg); 560 WARN_ON(con->out_msg); 561 ceph_msg_remove_list(&con->out_queue); 562 ceph_msg_remove_list(&con->out_sent); 563 con->out_seq = 0; 564 con->in_seq = 0; 565 con->in_seq_acked = 0; 566 567 if (ceph_msgr2(from_msgr(con->msgr))) 568 ceph_con_v2_reset_session(con); 569 else 570 ceph_con_v1_reset_session(con); 571 } 572 573 /* 574 * mark a peer down. drop any open connections. 575 */ 576 void ceph_con_close(struct ceph_connection *con) 577 { 578 mutex_lock(&con->mutex); 579 dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); 580 con->state = CEPH_CON_S_CLOSED; 581 582 ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next 583 connect */ 584 ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); 585 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); 586 ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF); 587 588 ceph_con_reset_protocol(con); 589 ceph_con_reset_session(con); 590 cancel_con(con); 591 mutex_unlock(&con->mutex); 592 } 593 EXPORT_SYMBOL(ceph_con_close); 594 595 /* 596 * Reopen a closed connection, with a new peer address. 597 */ 598 void ceph_con_open(struct ceph_connection *con, 599 __u8 entity_type, __u64 entity_num, 600 struct ceph_entity_addr *addr) 601 { 602 mutex_lock(&con->mutex); 603 dout("con_open %p %s\n", con, ceph_pr_addr(addr)); 604 605 WARN_ON(con->state != CEPH_CON_S_CLOSED); 606 con->state = CEPH_CON_S_PREOPEN; 607 608 con->peer_name.type = (__u8) entity_type; 609 con->peer_name.num = cpu_to_le64(entity_num); 610 611 memcpy(&con->peer_addr, addr, sizeof(*addr)); 612 con->delay = 0; /* reset backoff memory */ 613 mutex_unlock(&con->mutex); 614 queue_con(con); 615 } 616 EXPORT_SYMBOL(ceph_con_open); 617 618 /* 619 * return true if this connection ever successfully opened 620 */ 621 bool ceph_con_opened(struct ceph_connection *con) 622 { 623 if (ceph_msgr2(from_msgr(con->msgr))) 624 return ceph_con_v2_opened(con); 625 626 return ceph_con_v1_opened(con); 627 } 628 629 /* 630 * initialize a new connection. 631 */ 632 void ceph_con_init(struct ceph_connection *con, void *private, 633 const struct ceph_connection_operations *ops, 634 struct ceph_messenger *msgr) 635 { 636 dout("con_init %p\n", con); 637 memset(con, 0, sizeof(*con)); 638 con->private = private; 639 con->ops = ops; 640 con->msgr = msgr; 641 642 con_sock_state_init(con); 643 644 mutex_init(&con->mutex); 645 INIT_LIST_HEAD(&con->out_queue); 646 INIT_LIST_HEAD(&con->out_sent); 647 INIT_DELAYED_WORK(&con->work, ceph_con_workfn); 648 649 con->state = CEPH_CON_S_CLOSED; 650 } 651 EXPORT_SYMBOL(ceph_con_init); 652 653 /* 654 * We maintain a global counter to order connection attempts. Get 655 * a unique seq greater than @gt. 656 */ 657 u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt) 658 { 659 u32 ret; 660 661 spin_lock(&msgr->global_seq_lock); 662 if (msgr->global_seq < gt) 663 msgr->global_seq = gt; 664 ret = ++msgr->global_seq; 665 spin_unlock(&msgr->global_seq_lock); 666 return ret; 667 } 668 669 /* 670 * Discard messages that have been acked by the server. 671 */ 672 void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) 673 { 674 struct ceph_msg *msg; 675 u64 seq; 676 677 dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq); 678 while (!list_empty(&con->out_sent)) { 679 msg = list_first_entry(&con->out_sent, struct ceph_msg, 680 list_head); 681 WARN_ON(msg->needs_out_seq); 682 seq = le64_to_cpu(msg->hdr.seq); 683 if (seq > ack_seq) 684 break; 685 686 dout("%s con %p discarding msg %p seq %llu\n", __func__, con, 687 msg, seq); 688 ceph_msg_remove(msg); 689 } 690 } 691 692 /* 693 * Discard messages that have been requeued in con_fault(), up to 694 * reconnect_seq. This avoids gratuitously resending messages that 695 * the server had received and handled prior to reconnect. 696 */ 697 void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) 698 { 699 struct ceph_msg *msg; 700 u64 seq; 701 702 dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq); 703 while (!list_empty(&con->out_queue)) { 704 msg = list_first_entry(&con->out_queue, struct ceph_msg, 705 list_head); 706 if (msg->needs_out_seq) 707 break; 708 seq = le64_to_cpu(msg->hdr.seq); 709 if (seq > reconnect_seq) 710 break; 711 712 dout("%s con %p discarding msg %p seq %llu\n", __func__, con, 713 msg, seq); 714 ceph_msg_remove(msg); 715 } 716 } 717 718 #ifdef CONFIG_BLOCK 719 720 /* 721 * For a bio data item, a piece is whatever remains of the next 722 * entry in the current bio iovec, or the first entry in the next 723 * bio in the list. 724 */ 725 static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor, 726 size_t length) 727 { 728 struct ceph_msg_data *data = cursor->data; 729 struct ceph_bio_iter *it = &cursor->bio_iter; 730 731 cursor->resid = min_t(size_t, length, data->bio_length); 732 *it = data->bio_pos; 733 if (cursor->resid < it->iter.bi_size) 734 it->iter.bi_size = cursor->resid; 735 736 BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter)); 737 } 738 739 static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor, 740 size_t *page_offset, 741 size_t *length) 742 { 743 struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio, 744 cursor->bio_iter.iter); 745 746 *page_offset = bv.bv_offset; 747 *length = bv.bv_len; 748 return bv.bv_page; 749 } 750 751 static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor, 752 size_t bytes) 753 { 754 struct ceph_bio_iter *it = &cursor->bio_iter; 755 struct page *page = bio_iter_page(it->bio, it->iter); 756 757 BUG_ON(bytes > cursor->resid); 758 BUG_ON(bytes > bio_iter_len(it->bio, it->iter)); 759 cursor->resid -= bytes; 760 bio_advance_iter(it->bio, &it->iter, bytes); 761 762 if (!cursor->resid) 763 return false; /* no more data */ 764 765 if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done && 766 page == bio_iter_page(it->bio, it->iter))) 767 return false; /* more bytes to process in this segment */ 768 769 if (!it->iter.bi_size) { 770 it->bio = it->bio->bi_next; 771 it->iter = it->bio->bi_iter; 772 if (cursor->resid < it->iter.bi_size) 773 it->iter.bi_size = cursor->resid; 774 } 775 776 BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter)); 777 return true; 778 } 779 #endif /* CONFIG_BLOCK */ 780 781 static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor, 782 size_t length) 783 { 784 struct ceph_msg_data *data = cursor->data; 785 struct bio_vec *bvecs = data->bvec_pos.bvecs; 786 787 cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size); 788 cursor->bvec_iter = data->bvec_pos.iter; 789 cursor->bvec_iter.bi_size = cursor->resid; 790 791 BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter)); 792 } 793 794 static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor, 795 size_t *page_offset, 796 size_t *length) 797 { 798 struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs, 799 cursor->bvec_iter); 800 801 *page_offset = bv.bv_offset; 802 *length = bv.bv_len; 803 return bv.bv_page; 804 } 805 806 static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor, 807 size_t bytes) 808 { 809 struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs; 810 struct page *page = bvec_iter_page(bvecs, cursor->bvec_iter); 811 812 BUG_ON(bytes > cursor->resid); 813 BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter)); 814 cursor->resid -= bytes; 815 bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes); 816 817 if (!cursor->resid) 818 return false; /* no more data */ 819 820 if (!bytes || (cursor->bvec_iter.bi_bvec_done && 821 page == bvec_iter_page(bvecs, cursor->bvec_iter))) 822 return false; /* more bytes to process in this segment */ 823 824 BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter)); 825 return true; 826 } 827 828 /* 829 * For a page array, a piece comes from the first page in the array 830 * that has not already been fully consumed. 831 */ 832 static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor, 833 size_t length) 834 { 835 struct ceph_msg_data *data = cursor->data; 836 int page_count; 837 838 BUG_ON(data->type != CEPH_MSG_DATA_PAGES); 839 840 BUG_ON(!data->pages); 841 BUG_ON(!data->length); 842 843 cursor->resid = min(length, data->length); 844 page_count = calc_pages_for(data->alignment, (u64)data->length); 845 cursor->page_offset = data->alignment & ~PAGE_MASK; 846 cursor->page_index = 0; 847 BUG_ON(page_count > (int)USHRT_MAX); 848 cursor->page_count = (unsigned short)page_count; 849 BUG_ON(length > SIZE_MAX - cursor->page_offset); 850 } 851 852 static struct page * 853 ceph_msg_data_pages_next(struct ceph_msg_data_cursor *cursor, 854 size_t *page_offset, size_t *length) 855 { 856 struct ceph_msg_data *data = cursor->data; 857 858 BUG_ON(data->type != CEPH_MSG_DATA_PAGES); 859 860 BUG_ON(cursor->page_index >= cursor->page_count); 861 BUG_ON(cursor->page_offset >= PAGE_SIZE); 862 863 *page_offset = cursor->page_offset; 864 *length = min_t(size_t, cursor->resid, PAGE_SIZE - *page_offset); 865 return data->pages[cursor->page_index]; 866 } 867 868 static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor *cursor, 869 size_t bytes) 870 { 871 BUG_ON(cursor->data->type != CEPH_MSG_DATA_PAGES); 872 873 BUG_ON(cursor->page_offset + bytes > PAGE_SIZE); 874 875 /* Advance the cursor page offset */ 876 877 cursor->resid -= bytes; 878 cursor->page_offset = (cursor->page_offset + bytes) & ~PAGE_MASK; 879 if (!bytes || cursor->page_offset) 880 return false; /* more bytes to process in the current page */ 881 882 if (!cursor->resid) 883 return false; /* no more data */ 884 885 /* Move on to the next page; offset is already at 0 */ 886 887 BUG_ON(cursor->page_index >= cursor->page_count); 888 cursor->page_index++; 889 return true; 890 } 891 892 /* 893 * For a pagelist, a piece is whatever remains to be consumed in the 894 * first page in the list, or the front of the next page. 895 */ 896 static void 897 ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor *cursor, 898 size_t length) 899 { 900 struct ceph_msg_data *data = cursor->data; 901 struct ceph_pagelist *pagelist; 902 struct page *page; 903 904 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 905 906 pagelist = data->pagelist; 907 BUG_ON(!pagelist); 908 909 if (!length) 910 return; /* pagelist can be assigned but empty */ 911 912 BUG_ON(list_empty(&pagelist->head)); 913 page = list_first_entry(&pagelist->head, struct page, lru); 914 915 cursor->resid = min(length, pagelist->length); 916 cursor->page = page; 917 cursor->offset = 0; 918 } 919 920 static struct page * 921 ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor *cursor, 922 size_t *page_offset, size_t *length) 923 { 924 struct ceph_msg_data *data = cursor->data; 925 struct ceph_pagelist *pagelist; 926 927 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 928 929 pagelist = data->pagelist; 930 BUG_ON(!pagelist); 931 932 BUG_ON(!cursor->page); 933 BUG_ON(cursor->offset + cursor->resid != pagelist->length); 934 935 /* offset of first page in pagelist is always 0 */ 936 *page_offset = cursor->offset & ~PAGE_MASK; 937 *length = min_t(size_t, cursor->resid, PAGE_SIZE - *page_offset); 938 return cursor->page; 939 } 940 941 static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, 942 size_t bytes) 943 { 944 struct ceph_msg_data *data = cursor->data; 945 struct ceph_pagelist *pagelist; 946 947 BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); 948 949 pagelist = data->pagelist; 950 BUG_ON(!pagelist); 951 952 BUG_ON(cursor->offset + cursor->resid != pagelist->length); 953 BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE); 954 955 /* Advance the cursor offset */ 956 957 cursor->resid -= bytes; 958 cursor->offset += bytes; 959 /* offset of first page in pagelist is always 0 */ 960 if (!bytes || cursor->offset & ~PAGE_MASK) 961 return false; /* more bytes to process in the current page */ 962 963 if (!cursor->resid) 964 return false; /* no more data */ 965 966 /* Move on to the next page */ 967 968 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); 969 cursor->page = list_next_entry(cursor->page, lru); 970 return true; 971 } 972 973 static void ceph_msg_data_iter_cursor_init(struct ceph_msg_data_cursor *cursor, 974 size_t length) 975 { 976 struct ceph_msg_data *data = cursor->data; 977 978 cursor->iov_iter = data->iter; 979 cursor->lastlen = 0; 980 iov_iter_truncate(&cursor->iov_iter, length); 981 cursor->resid = iov_iter_count(&cursor->iov_iter); 982 } 983 984 static struct page *ceph_msg_data_iter_next(struct ceph_msg_data_cursor *cursor, 985 size_t *page_offset, size_t *length) 986 { 987 struct page *page; 988 ssize_t len; 989 990 if (cursor->lastlen) 991 iov_iter_revert(&cursor->iov_iter, cursor->lastlen); 992 993 len = iov_iter_get_pages2(&cursor->iov_iter, &page, PAGE_SIZE, 994 1, page_offset); 995 BUG_ON(len < 0); 996 997 cursor->lastlen = len; 998 999 /* 1000 * FIXME: The assumption is that the pages represented by the iov_iter 1001 * are pinned, with the references held by the upper-level 1002 * callers, or by virtue of being under writeback. Eventually, 1003 * we'll get an iov_iter_get_pages2 variant that doesn't take 1004 * page refs. Until then, just put the page ref. 1005 */ 1006 VM_BUG_ON_PAGE(!PageWriteback(page) && page_count(page) < 2, page); 1007 put_page(page); 1008 1009 *length = min_t(size_t, len, cursor->resid); 1010 return page; 1011 } 1012 1013 static bool ceph_msg_data_iter_advance(struct ceph_msg_data_cursor *cursor, 1014 size_t bytes) 1015 { 1016 BUG_ON(bytes > cursor->resid); 1017 cursor->resid -= bytes; 1018 1019 if (bytes < cursor->lastlen) { 1020 cursor->lastlen -= bytes; 1021 } else { 1022 iov_iter_advance(&cursor->iov_iter, bytes - cursor->lastlen); 1023 cursor->lastlen = 0; 1024 } 1025 1026 return cursor->resid; 1027 } 1028 1029 /* 1030 * Message data is handled (sent or received) in pieces, where each 1031 * piece resides on a single page. The network layer might not 1032 * consume an entire piece at once. A data item's cursor keeps 1033 * track of which piece is next to process and how much remains to 1034 * be processed in that piece. It also tracks whether the current 1035 * piece is the last one in the data item. 1036 */ 1037 static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) 1038 { 1039 size_t length = cursor->total_resid; 1040 1041 switch (cursor->data->type) { 1042 case CEPH_MSG_DATA_PAGELIST: 1043 ceph_msg_data_pagelist_cursor_init(cursor, length); 1044 break; 1045 case CEPH_MSG_DATA_PAGES: 1046 ceph_msg_data_pages_cursor_init(cursor, length); 1047 break; 1048 #ifdef CONFIG_BLOCK 1049 case CEPH_MSG_DATA_BIO: 1050 ceph_msg_data_bio_cursor_init(cursor, length); 1051 break; 1052 #endif /* CONFIG_BLOCK */ 1053 case CEPH_MSG_DATA_BVECS: 1054 ceph_msg_data_bvecs_cursor_init(cursor, length); 1055 break; 1056 case CEPH_MSG_DATA_ITER: 1057 ceph_msg_data_iter_cursor_init(cursor, length); 1058 break; 1059 case CEPH_MSG_DATA_NONE: 1060 default: 1061 /* BUG(); */ 1062 break; 1063 } 1064 cursor->need_crc = true; 1065 } 1066 1067 void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, 1068 struct ceph_msg *msg, size_t length) 1069 { 1070 BUG_ON(!length); 1071 BUG_ON(length > msg->data_length); 1072 BUG_ON(!msg->num_data_items); 1073 1074 cursor->total_resid = length; 1075 cursor->data = msg->data; 1076 cursor->sr_resid = 0; 1077 1078 __ceph_msg_data_cursor_init(cursor); 1079 } 1080 1081 /* 1082 * Return the page containing the next piece to process for a given 1083 * data item, and supply the page offset and length of that piece. 1084 * Indicate whether this is the last piece in this data item. 1085 */ 1086 struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor, 1087 size_t *page_offset, size_t *length) 1088 { 1089 struct page *page; 1090 1091 switch (cursor->data->type) { 1092 case CEPH_MSG_DATA_PAGELIST: 1093 page = ceph_msg_data_pagelist_next(cursor, page_offset, length); 1094 break; 1095 case CEPH_MSG_DATA_PAGES: 1096 page = ceph_msg_data_pages_next(cursor, page_offset, length); 1097 break; 1098 #ifdef CONFIG_BLOCK 1099 case CEPH_MSG_DATA_BIO: 1100 page = ceph_msg_data_bio_next(cursor, page_offset, length); 1101 break; 1102 #endif /* CONFIG_BLOCK */ 1103 case CEPH_MSG_DATA_BVECS: 1104 page = ceph_msg_data_bvecs_next(cursor, page_offset, length); 1105 break; 1106 case CEPH_MSG_DATA_ITER: 1107 page = ceph_msg_data_iter_next(cursor, page_offset, length); 1108 break; 1109 case CEPH_MSG_DATA_NONE: 1110 default: 1111 page = NULL; 1112 break; 1113 } 1114 1115 BUG_ON(!page); 1116 BUG_ON(*page_offset + *length > PAGE_SIZE); 1117 BUG_ON(!*length); 1118 BUG_ON(*length > cursor->resid); 1119 1120 return page; 1121 } 1122 1123 /* 1124 * Returns true if the result moves the cursor on to the next piece 1125 * of the data item. 1126 */ 1127 void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) 1128 { 1129 bool new_piece; 1130 1131 BUG_ON(bytes > cursor->resid); 1132 switch (cursor->data->type) { 1133 case CEPH_MSG_DATA_PAGELIST: 1134 new_piece = ceph_msg_data_pagelist_advance(cursor, bytes); 1135 break; 1136 case CEPH_MSG_DATA_PAGES: 1137 new_piece = ceph_msg_data_pages_advance(cursor, bytes); 1138 break; 1139 #ifdef CONFIG_BLOCK 1140 case CEPH_MSG_DATA_BIO: 1141 new_piece = ceph_msg_data_bio_advance(cursor, bytes); 1142 break; 1143 #endif /* CONFIG_BLOCK */ 1144 case CEPH_MSG_DATA_BVECS: 1145 new_piece = ceph_msg_data_bvecs_advance(cursor, bytes); 1146 break; 1147 case CEPH_MSG_DATA_ITER: 1148 new_piece = ceph_msg_data_iter_advance(cursor, bytes); 1149 break; 1150 case CEPH_MSG_DATA_NONE: 1151 default: 1152 BUG(); 1153 break; 1154 } 1155 cursor->total_resid -= bytes; 1156 1157 if (!cursor->resid && cursor->total_resid) { 1158 cursor->data++; 1159 __ceph_msg_data_cursor_init(cursor); 1160 new_piece = true; 1161 } 1162 cursor->need_crc = new_piece; 1163 } 1164 1165 u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, 1166 unsigned int length) 1167 { 1168 char *kaddr; 1169 1170 kaddr = kmap(page); 1171 BUG_ON(kaddr == NULL); 1172 crc = crc32c(crc, kaddr + page_offset, length); 1173 kunmap(page); 1174 1175 return crc; 1176 } 1177 1178 bool ceph_addr_is_blank(const struct ceph_entity_addr *addr) 1179 { 1180 struct sockaddr_storage ss = addr->in_addr; /* align */ 1181 struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr; 1182 struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr; 1183 1184 switch (ss.ss_family) { 1185 case AF_INET: 1186 return addr4->s_addr == htonl(INADDR_ANY); 1187 case AF_INET6: 1188 return ipv6_addr_any(addr6); 1189 default: 1190 return true; 1191 } 1192 } 1193 EXPORT_SYMBOL(ceph_addr_is_blank); 1194 1195 int ceph_addr_port(const struct ceph_entity_addr *addr) 1196 { 1197 switch (get_unaligned(&addr->in_addr.ss_family)) { 1198 case AF_INET: 1199 return ntohs(get_unaligned(&((struct sockaddr_in *)&addr->in_addr)->sin_port)); 1200 case AF_INET6: 1201 return ntohs(get_unaligned(&((struct sockaddr_in6 *)&addr->in_addr)->sin6_port)); 1202 } 1203 return 0; 1204 } 1205 1206 void ceph_addr_set_port(struct ceph_entity_addr *addr, int p) 1207 { 1208 switch (get_unaligned(&addr->in_addr.ss_family)) { 1209 case AF_INET: 1210 put_unaligned(htons(p), &((struct sockaddr_in *)&addr->in_addr)->sin_port); 1211 break; 1212 case AF_INET6: 1213 put_unaligned(htons(p), &((struct sockaddr_in6 *)&addr->in_addr)->sin6_port); 1214 break; 1215 } 1216 } 1217 1218 /* 1219 * Unlike other *_pton function semantics, zero indicates success. 1220 */ 1221 static int ceph_pton(const char *str, size_t len, struct ceph_entity_addr *addr, 1222 char delim, const char **ipend) 1223 { 1224 memset(&addr->in_addr, 0, sizeof(addr->in_addr)); 1225 1226 if (in4_pton(str, len, (u8 *)&((struct sockaddr_in *)&addr->in_addr)->sin_addr.s_addr, delim, ipend)) { 1227 put_unaligned(AF_INET, &addr->in_addr.ss_family); 1228 return 0; 1229 } 1230 1231 if (in6_pton(str, len, (u8 *)&((struct sockaddr_in6 *)&addr->in_addr)->sin6_addr.s6_addr, delim, ipend)) { 1232 put_unaligned(AF_INET6, &addr->in_addr.ss_family); 1233 return 0; 1234 } 1235 1236 return -EINVAL; 1237 } 1238 1239 /* 1240 * Extract hostname string and resolve using kernel DNS facility. 1241 */ 1242 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER 1243 static int ceph_dns_resolve_name(const char *name, size_t namelen, 1244 struct ceph_entity_addr *addr, char delim, const char **ipend) 1245 { 1246 const char *end, *delim_p; 1247 char *colon_p, *ip_addr = NULL; 1248 int ip_len, ret; 1249 1250 /* 1251 * The end of the hostname occurs immediately preceding the delimiter or 1252 * the port marker (':') where the delimiter takes precedence. 1253 */ 1254 delim_p = memchr(name, delim, namelen); 1255 colon_p = memchr(name, ':', namelen); 1256 1257 if (delim_p && colon_p) 1258 end = min(delim_p, colon_p); 1259 else if (!delim_p && colon_p) 1260 end = colon_p; 1261 else { 1262 end = delim_p; 1263 if (!end) /* case: hostname:/ */ 1264 end = name + namelen; 1265 } 1266 1267 if (end <= name) 1268 return -EINVAL; 1269 1270 /* do dns_resolve upcall */ 1271 ip_len = dns_query(current->nsproxy->net_ns, 1272 NULL, name, end - name, NULL, &ip_addr, NULL, false); 1273 if (ip_len > 0) 1274 ret = ceph_pton(ip_addr, ip_len, addr, -1, NULL); 1275 else 1276 ret = -ESRCH; 1277 1278 kfree(ip_addr); 1279 1280 *ipend = end; 1281 1282 pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name, 1283 ret, ret ? "failed" : ceph_pr_addr(addr)); 1284 1285 return ret; 1286 } 1287 #else 1288 static inline int ceph_dns_resolve_name(const char *name, size_t namelen, 1289 struct ceph_entity_addr *addr, char delim, const char **ipend) 1290 { 1291 return -EINVAL; 1292 } 1293 #endif 1294 1295 /* 1296 * Parse a server name (IP or hostname). If a valid IP address is not found 1297 * then try to extract a hostname to resolve using userspace DNS upcall. 1298 */ 1299 static int ceph_parse_server_name(const char *name, size_t namelen, 1300 struct ceph_entity_addr *addr, char delim, const char **ipend) 1301 { 1302 int ret; 1303 1304 ret = ceph_pton(name, namelen, addr, delim, ipend); 1305 if (ret) 1306 ret = ceph_dns_resolve_name(name, namelen, addr, delim, ipend); 1307 1308 return ret; 1309 } 1310 1311 /* 1312 * Parse an ip[:port] list into an addr array. Use the default 1313 * monitor port if a port isn't specified. 1314 */ 1315 int ceph_parse_ips(const char *c, const char *end, 1316 struct ceph_entity_addr *addr, 1317 int max_count, int *count, char delim) 1318 { 1319 int i, ret = -EINVAL; 1320 const char *p = c; 1321 1322 dout("parse_ips on '%.*s'\n", (int)(end-c), c); 1323 for (i = 0; i < max_count; i++) { 1324 char cur_delim = delim; 1325 const char *ipend; 1326 int port; 1327 1328 if (*p == '[') { 1329 cur_delim = ']'; 1330 p++; 1331 } 1332 1333 ret = ceph_parse_server_name(p, end - p, &addr[i], cur_delim, 1334 &ipend); 1335 if (ret) 1336 goto bad; 1337 ret = -EINVAL; 1338 1339 p = ipend; 1340 1341 if (cur_delim == ']') { 1342 if (*p != ']') { 1343 dout("missing matching ']'\n"); 1344 goto bad; 1345 } 1346 p++; 1347 } 1348 1349 /* port? */ 1350 if (p < end && *p == ':') { 1351 port = 0; 1352 p++; 1353 while (p < end && *p >= '0' && *p <= '9') { 1354 port = (port * 10) + (*p - '0'); 1355 p++; 1356 } 1357 if (port == 0) 1358 port = CEPH_MON_PORT; 1359 else if (port > 65535) 1360 goto bad; 1361 } else { 1362 port = CEPH_MON_PORT; 1363 } 1364 1365 ceph_addr_set_port(&addr[i], port); 1366 /* 1367 * We want the type to be set according to ms_mode 1368 * option, but options are normally parsed after mon 1369 * addresses. Rather than complicating parsing, set 1370 * to LEGACY and override in build_initial_monmap() 1371 * for mon addresses and ceph_messenger_init() for 1372 * ip option. 1373 */ 1374 addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY; 1375 addr[i].nonce = 0; 1376 1377 dout("%s got %s\n", __func__, ceph_pr_addr(&addr[i])); 1378 1379 if (p == end) 1380 break; 1381 if (*p != delim) 1382 goto bad; 1383 p++; 1384 } 1385 1386 if (p != end) 1387 goto bad; 1388 1389 if (count) 1390 *count = i + 1; 1391 return 0; 1392 1393 bad: 1394 return ret; 1395 } 1396 1397 /* 1398 * Process message. This happens in the worker thread. The callback should 1399 * be careful not to do anything that waits on other incoming messages or it 1400 * may deadlock. 1401 */ 1402 void ceph_con_process_message(struct ceph_connection *con) 1403 { 1404 struct ceph_msg *msg = con->in_msg; 1405 1406 BUG_ON(con->in_msg->con != con); 1407 con->in_msg = NULL; 1408 1409 /* if first message, set peer_name */ 1410 if (con->peer_name.type == 0) 1411 con->peer_name = msg->hdr.src; 1412 1413 con->in_seq++; 1414 mutex_unlock(&con->mutex); 1415 1416 dout("===== %p %llu from %s%lld %d=%s len %d+%d+%d (%u %u %u) =====\n", 1417 msg, le64_to_cpu(msg->hdr.seq), 1418 ENTITY_NAME(msg->hdr.src), 1419 le16_to_cpu(msg->hdr.type), 1420 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1421 le32_to_cpu(msg->hdr.front_len), 1422 le32_to_cpu(msg->hdr.middle_len), 1423 le32_to_cpu(msg->hdr.data_len), 1424 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 1425 con->ops->dispatch(con, msg); 1426 1427 mutex_lock(&con->mutex); 1428 } 1429 1430 /* 1431 * Atomically queue work on a connection after the specified delay. 1432 * Bump @con reference to avoid races with connection teardown. 1433 * Returns 0 if work was queued, or an error code otherwise. 1434 */ 1435 static int queue_con_delay(struct ceph_connection *con, unsigned long delay) 1436 { 1437 if (!con->ops->get(con)) { 1438 dout("%s %p ref count 0\n", __func__, con); 1439 return -ENOENT; 1440 } 1441 1442 if (delay >= HZ) 1443 delay = round_jiffies_relative(delay); 1444 1445 dout("%s %p %lu\n", __func__, con, delay); 1446 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { 1447 dout("%s %p - already queued\n", __func__, con); 1448 con->ops->put(con); 1449 return -EBUSY; 1450 } 1451 1452 return 0; 1453 } 1454 1455 static void queue_con(struct ceph_connection *con) 1456 { 1457 (void) queue_con_delay(con, 0); 1458 } 1459 1460 static void cancel_con(struct ceph_connection *con) 1461 { 1462 if (cancel_delayed_work(&con->work)) { 1463 dout("%s %p\n", __func__, con); 1464 con->ops->put(con); 1465 } 1466 } 1467 1468 static bool con_sock_closed(struct ceph_connection *con) 1469 { 1470 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) 1471 return false; 1472 1473 #define CASE(x) \ 1474 case CEPH_CON_S_ ## x: \ 1475 con->error_msg = "socket closed (con state " #x ")"; \ 1476 break; 1477 1478 switch (con->state) { 1479 CASE(CLOSED); 1480 CASE(PREOPEN); 1481 CASE(V1_BANNER); 1482 CASE(V1_CONNECT_MSG); 1483 CASE(V2_BANNER_PREFIX); 1484 CASE(V2_BANNER_PAYLOAD); 1485 CASE(V2_HELLO); 1486 CASE(V2_AUTH); 1487 CASE(V2_AUTH_SIGNATURE); 1488 CASE(V2_SESSION_CONNECT); 1489 CASE(V2_SESSION_RECONNECT); 1490 CASE(OPEN); 1491 CASE(STANDBY); 1492 default: 1493 BUG(); 1494 } 1495 #undef CASE 1496 1497 return true; 1498 } 1499 1500 static bool con_backoff(struct ceph_connection *con) 1501 { 1502 int ret; 1503 1504 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) 1505 return false; 1506 1507 ret = queue_con_delay(con, con->delay); 1508 if (ret) { 1509 dout("%s: con %p FAILED to back off %lu\n", __func__, 1510 con, con->delay); 1511 BUG_ON(ret == -ENOENT); 1512 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); 1513 } 1514 1515 return true; 1516 } 1517 1518 /* Finish fault handling; con->mutex must *not* be held here */ 1519 1520 static void con_fault_finish(struct ceph_connection *con) 1521 { 1522 dout("%s %p\n", __func__, con); 1523 1524 /* 1525 * in case we faulted due to authentication, invalidate our 1526 * current tickets so that we can get new ones. 1527 */ 1528 if (!ceph_msgr2(from_msgr(con->msgr)) && con->v1.auth_retry) { 1529 dout("auth_retry %d, invalidating\n", con->v1.auth_retry); 1530 if (con->ops->invalidate_authorizer) 1531 con->ops->invalidate_authorizer(con); 1532 con->v1.auth_retry = 0; 1533 } 1534 1535 if (con->ops->fault) 1536 con->ops->fault(con); 1537 } 1538 1539 /* 1540 * Do some work on a connection. Drop a connection ref when we're done. 1541 */ 1542 static void ceph_con_workfn(struct work_struct *work) 1543 { 1544 struct ceph_connection *con = container_of(work, struct ceph_connection, 1545 work.work); 1546 bool fault; 1547 1548 mutex_lock(&con->mutex); 1549 while (true) { 1550 int ret; 1551 1552 if ((fault = con_sock_closed(con))) { 1553 dout("%s: con %p SOCK_CLOSED\n", __func__, con); 1554 break; 1555 } 1556 if (con_backoff(con)) { 1557 dout("%s: con %p BACKOFF\n", __func__, con); 1558 break; 1559 } 1560 if (con->state == CEPH_CON_S_STANDBY) { 1561 dout("%s: con %p STANDBY\n", __func__, con); 1562 break; 1563 } 1564 if (con->state == CEPH_CON_S_CLOSED) { 1565 dout("%s: con %p CLOSED\n", __func__, con); 1566 BUG_ON(con->sock); 1567 break; 1568 } 1569 if (con->state == CEPH_CON_S_PREOPEN) { 1570 dout("%s: con %p PREOPEN\n", __func__, con); 1571 BUG_ON(con->sock); 1572 } 1573 1574 if (ceph_msgr2(from_msgr(con->msgr))) 1575 ret = ceph_con_v2_try_read(con); 1576 else 1577 ret = ceph_con_v1_try_read(con); 1578 if (ret < 0) { 1579 if (ret == -EAGAIN) 1580 continue; 1581 if (!con->error_msg) 1582 con->error_msg = "socket error on read"; 1583 fault = true; 1584 break; 1585 } 1586 1587 if (ceph_msgr2(from_msgr(con->msgr))) 1588 ret = ceph_con_v2_try_write(con); 1589 else 1590 ret = ceph_con_v1_try_write(con); 1591 if (ret < 0) { 1592 if (ret == -EAGAIN) 1593 continue; 1594 if (!con->error_msg) 1595 con->error_msg = "socket error on write"; 1596 fault = true; 1597 } 1598 1599 break; /* If we make it to here, we're done */ 1600 } 1601 if (fault) 1602 con_fault(con); 1603 mutex_unlock(&con->mutex); 1604 1605 if (fault) 1606 con_fault_finish(con); 1607 1608 con->ops->put(con); 1609 } 1610 1611 /* 1612 * Generic error/fault handler. A retry mechanism is used with 1613 * exponential backoff 1614 */ 1615 static void con_fault(struct ceph_connection *con) 1616 { 1617 dout("fault %p state %d to peer %s\n", 1618 con, con->state, ceph_pr_addr(&con->peer_addr)); 1619 1620 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 1621 ceph_pr_addr(&con->peer_addr), con->error_msg); 1622 con->error_msg = NULL; 1623 1624 WARN_ON(con->state == CEPH_CON_S_STANDBY || 1625 con->state == CEPH_CON_S_CLOSED); 1626 1627 ceph_con_reset_protocol(con); 1628 1629 if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) { 1630 dout("fault on LOSSYTX channel, marking CLOSED\n"); 1631 con->state = CEPH_CON_S_CLOSED; 1632 return; 1633 } 1634 1635 /* Requeue anything that hasn't been acked */ 1636 list_splice_init(&con->out_sent, &con->out_queue); 1637 1638 /* If there are no messages queued or keepalive pending, place 1639 * the connection in a STANDBY state */ 1640 if (list_empty(&con->out_queue) && 1641 !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { 1642 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 1643 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); 1644 con->state = CEPH_CON_S_STANDBY; 1645 } else { 1646 /* retry after a delay. */ 1647 con->state = CEPH_CON_S_PREOPEN; 1648 if (!con->delay) { 1649 con->delay = BASE_DELAY_INTERVAL; 1650 } else if (con->delay < MAX_DELAY_INTERVAL) { 1651 con->delay *= 2; 1652 if (con->delay > MAX_DELAY_INTERVAL) 1653 con->delay = MAX_DELAY_INTERVAL; 1654 } 1655 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); 1656 queue_con(con); 1657 } 1658 } 1659 1660 void ceph_messenger_reset_nonce(struct ceph_messenger *msgr) 1661 { 1662 u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000; 1663 msgr->inst.addr.nonce = cpu_to_le32(nonce); 1664 ceph_encode_my_addr(msgr); 1665 } 1666 1667 /* 1668 * initialize a new messenger instance 1669 */ 1670 void ceph_messenger_init(struct ceph_messenger *msgr, 1671 struct ceph_entity_addr *myaddr) 1672 { 1673 spin_lock_init(&msgr->global_seq_lock); 1674 1675 if (myaddr) { 1676 memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr, 1677 sizeof(msgr->inst.addr.in_addr)); 1678 ceph_addr_set_port(&msgr->inst.addr, 0); 1679 } 1680 1681 /* 1682 * Since nautilus, clients are identified using type ANY. 1683 * For msgr1, ceph_encode_banner_addr() munges it to NONE. 1684 */ 1685 msgr->inst.addr.type = CEPH_ENTITY_ADDR_TYPE_ANY; 1686 1687 /* generate a random non-zero nonce */ 1688 do { 1689 get_random_bytes(&msgr->inst.addr.nonce, 1690 sizeof(msgr->inst.addr.nonce)); 1691 } while (!msgr->inst.addr.nonce); 1692 ceph_encode_my_addr(msgr); 1693 1694 atomic_set(&msgr->stopping, 0); 1695 write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); 1696 1697 dout("%s %p\n", __func__, msgr); 1698 } 1699 1700 void ceph_messenger_fini(struct ceph_messenger *msgr) 1701 { 1702 put_net(read_pnet(&msgr->net)); 1703 } 1704 1705 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) 1706 { 1707 if (msg->con) 1708 msg->con->ops->put(msg->con); 1709 1710 msg->con = con ? con->ops->get(con) : NULL; 1711 BUG_ON(msg->con != con); 1712 } 1713 1714 static void clear_standby(struct ceph_connection *con) 1715 { 1716 /* come back from STANDBY? */ 1717 if (con->state == CEPH_CON_S_STANDBY) { 1718 dout("clear_standby %p\n", con); 1719 con->state = CEPH_CON_S_PREOPEN; 1720 if (!ceph_msgr2(from_msgr(con->msgr))) 1721 con->v1.connect_seq++; 1722 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); 1723 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); 1724 } 1725 } 1726 1727 /* 1728 * Queue up an outgoing message on the given connection. 1729 * 1730 * Consumes a ref on @msg. 1731 */ 1732 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) 1733 { 1734 /* set src+dst */ 1735 msg->hdr.src = con->msgr->inst.name; 1736 BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len)); 1737 msg->needs_out_seq = true; 1738 1739 mutex_lock(&con->mutex); 1740 1741 if (con->state == CEPH_CON_S_CLOSED) { 1742 dout("con_send %p closed, dropping %p\n", con, msg); 1743 ceph_msg_put(msg); 1744 mutex_unlock(&con->mutex); 1745 return; 1746 } 1747 1748 msg_con_set(msg, con); 1749 1750 BUG_ON(!list_empty(&msg->list_head)); 1751 list_add_tail(&msg->list_head, &con->out_queue); 1752 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, 1753 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), 1754 ceph_msg_type_name(le16_to_cpu(msg->hdr.type)), 1755 le32_to_cpu(msg->hdr.front_len), 1756 le32_to_cpu(msg->hdr.middle_len), 1757 le32_to_cpu(msg->hdr.data_len)); 1758 1759 clear_standby(con); 1760 mutex_unlock(&con->mutex); 1761 1762 /* if there wasn't anything waiting to send before, queue 1763 * new work */ 1764 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) 1765 queue_con(con); 1766 } 1767 EXPORT_SYMBOL(ceph_con_send); 1768 1769 /* 1770 * Revoke a message that was previously queued for send 1771 */ 1772 void ceph_msg_revoke(struct ceph_msg *msg) 1773 { 1774 struct ceph_connection *con = msg->con; 1775 1776 if (!con) { 1777 dout("%s msg %p null con\n", __func__, msg); 1778 return; /* Message not in our possession */ 1779 } 1780 1781 mutex_lock(&con->mutex); 1782 if (list_empty(&msg->list_head)) { 1783 WARN_ON(con->out_msg == msg); 1784 dout("%s con %p msg %p not linked\n", __func__, con, msg); 1785 mutex_unlock(&con->mutex); 1786 return; 1787 } 1788 1789 dout("%s con %p msg %p was linked\n", __func__, con, msg); 1790 msg->hdr.seq = 0; 1791 ceph_msg_remove(msg); 1792 1793 if (con->out_msg == msg) { 1794 WARN_ON(con->state != CEPH_CON_S_OPEN); 1795 dout("%s con %p msg %p was sending\n", __func__, con, msg); 1796 if (ceph_msgr2(from_msgr(con->msgr))) 1797 ceph_con_v2_revoke(con); 1798 else 1799 ceph_con_v1_revoke(con); 1800 ceph_msg_put(con->out_msg); 1801 con->out_msg = NULL; 1802 } else { 1803 dout("%s con %p msg %p not current, out_msg %p\n", __func__, 1804 con, msg, con->out_msg); 1805 } 1806 mutex_unlock(&con->mutex); 1807 } 1808 1809 /* 1810 * Revoke a message that we may be reading data into 1811 */ 1812 void ceph_msg_revoke_incoming(struct ceph_msg *msg) 1813 { 1814 struct ceph_connection *con = msg->con; 1815 1816 if (!con) { 1817 dout("%s msg %p null con\n", __func__, msg); 1818 return; /* Message not in our possession */ 1819 } 1820 1821 mutex_lock(&con->mutex); 1822 if (con->in_msg == msg) { 1823 WARN_ON(con->state != CEPH_CON_S_OPEN); 1824 dout("%s con %p msg %p was recving\n", __func__, con, msg); 1825 if (ceph_msgr2(from_msgr(con->msgr))) 1826 ceph_con_v2_revoke_incoming(con); 1827 else 1828 ceph_con_v1_revoke_incoming(con); 1829 ceph_msg_put(con->in_msg); 1830 con->in_msg = NULL; 1831 } else { 1832 dout("%s con %p msg %p not current, in_msg %p\n", __func__, 1833 con, msg, con->in_msg); 1834 } 1835 mutex_unlock(&con->mutex); 1836 } 1837 1838 /* 1839 * Queue a keepalive byte to ensure the tcp connection is alive. 1840 */ 1841 void ceph_con_keepalive(struct ceph_connection *con) 1842 { 1843 dout("con_keepalive %p\n", con); 1844 mutex_lock(&con->mutex); 1845 clear_standby(con); 1846 ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); 1847 mutex_unlock(&con->mutex); 1848 1849 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) 1850 queue_con(con); 1851 } 1852 EXPORT_SYMBOL(ceph_con_keepalive); 1853 1854 bool ceph_con_keepalive_expired(struct ceph_connection *con, 1855 unsigned long interval) 1856 { 1857 if (interval > 0 && 1858 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { 1859 struct timespec64 now; 1860 struct timespec64 ts; 1861 ktime_get_real_ts64(&now); 1862 jiffies_to_timespec64(interval, &ts); 1863 ts = timespec64_add(con->last_keepalive_ack, ts); 1864 return timespec64_compare(&now, &ts) >= 0; 1865 } 1866 return false; 1867 } 1868 1869 static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg) 1870 { 1871 BUG_ON(msg->num_data_items >= msg->max_data_items); 1872 return &msg->data[msg->num_data_items++]; 1873 } 1874 1875 static void ceph_msg_data_destroy(struct ceph_msg_data *data) 1876 { 1877 if (data->type == CEPH_MSG_DATA_PAGES && data->own_pages) { 1878 int num_pages = calc_pages_for(data->alignment, data->length); 1879 ceph_release_page_vector(data->pages, num_pages); 1880 } else if (data->type == CEPH_MSG_DATA_PAGELIST) { 1881 ceph_pagelist_release(data->pagelist); 1882 } 1883 } 1884 1885 void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, 1886 size_t length, size_t alignment, bool own_pages) 1887 { 1888 struct ceph_msg_data *data; 1889 1890 BUG_ON(!pages); 1891 BUG_ON(!length); 1892 1893 data = ceph_msg_data_add(msg); 1894 data->type = CEPH_MSG_DATA_PAGES; 1895 data->pages = pages; 1896 data->length = length; 1897 data->alignment = alignment & ~PAGE_MASK; 1898 data->own_pages = own_pages; 1899 1900 msg->data_length += length; 1901 } 1902 EXPORT_SYMBOL(ceph_msg_data_add_pages); 1903 1904 void ceph_msg_data_add_pagelist(struct ceph_msg *msg, 1905 struct ceph_pagelist *pagelist) 1906 { 1907 struct ceph_msg_data *data; 1908 1909 BUG_ON(!pagelist); 1910 BUG_ON(!pagelist->length); 1911 1912 data = ceph_msg_data_add(msg); 1913 data->type = CEPH_MSG_DATA_PAGELIST; 1914 refcount_inc(&pagelist->refcnt); 1915 data->pagelist = pagelist; 1916 1917 msg->data_length += pagelist->length; 1918 } 1919 EXPORT_SYMBOL(ceph_msg_data_add_pagelist); 1920 1921 #ifdef CONFIG_BLOCK 1922 void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, 1923 u32 length) 1924 { 1925 struct ceph_msg_data *data; 1926 1927 data = ceph_msg_data_add(msg); 1928 data->type = CEPH_MSG_DATA_BIO; 1929 data->bio_pos = *bio_pos; 1930 data->bio_length = length; 1931 1932 msg->data_length += length; 1933 } 1934 EXPORT_SYMBOL(ceph_msg_data_add_bio); 1935 #endif /* CONFIG_BLOCK */ 1936 1937 void ceph_msg_data_add_bvecs(struct ceph_msg *msg, 1938 struct ceph_bvec_iter *bvec_pos) 1939 { 1940 struct ceph_msg_data *data; 1941 1942 data = ceph_msg_data_add(msg); 1943 data->type = CEPH_MSG_DATA_BVECS; 1944 data->bvec_pos = *bvec_pos; 1945 1946 msg->data_length += bvec_pos->iter.bi_size; 1947 } 1948 EXPORT_SYMBOL(ceph_msg_data_add_bvecs); 1949 1950 void ceph_msg_data_add_iter(struct ceph_msg *msg, 1951 struct iov_iter *iter) 1952 { 1953 struct ceph_msg_data *data; 1954 1955 data = ceph_msg_data_add(msg); 1956 data->type = CEPH_MSG_DATA_ITER; 1957 data->iter = *iter; 1958 1959 msg->data_length += iov_iter_count(&data->iter); 1960 } 1961 1962 /* 1963 * construct a new message with given type, size 1964 * the new msg has a ref count of 1. 1965 */ 1966 struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items, 1967 gfp_t flags, bool can_fail) 1968 { 1969 struct ceph_msg *m; 1970 1971 m = kmem_cache_zalloc(ceph_msg_cache, flags); 1972 if (m == NULL) 1973 goto out; 1974 1975 m->hdr.type = cpu_to_le16(type); 1976 m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); 1977 m->hdr.front_len = cpu_to_le32(front_len); 1978 1979 INIT_LIST_HEAD(&m->list_head); 1980 kref_init(&m->kref); 1981 1982 /* front */ 1983 if (front_len) { 1984 m->front.iov_base = kvmalloc(front_len, flags); 1985 if (m->front.iov_base == NULL) { 1986 dout("ceph_msg_new can't allocate %d bytes\n", 1987 front_len); 1988 goto out2; 1989 } 1990 } else { 1991 m->front.iov_base = NULL; 1992 } 1993 m->front_alloc_len = m->front.iov_len = front_len; 1994 1995 if (max_data_items) { 1996 m->data = kmalloc_array(max_data_items, sizeof(*m->data), 1997 flags); 1998 if (!m->data) 1999 goto out2; 2000 2001 m->max_data_items = max_data_items; 2002 } 2003 2004 dout("ceph_msg_new %p front %d\n", m, front_len); 2005 return m; 2006 2007 out2: 2008 ceph_msg_put(m); 2009 out: 2010 if (!can_fail) { 2011 pr_err("msg_new can't create type %d front %d\n", type, 2012 front_len); 2013 WARN_ON(1); 2014 } else { 2015 dout("msg_new can't create type %d front %d\n", type, 2016 front_len); 2017 } 2018 return NULL; 2019 } 2020 EXPORT_SYMBOL(ceph_msg_new2); 2021 2022 struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, 2023 bool can_fail) 2024 { 2025 return ceph_msg_new2(type, front_len, 0, flags, can_fail); 2026 } 2027 EXPORT_SYMBOL(ceph_msg_new); 2028 2029 /* 2030 * Allocate "middle" portion of a message, if it is needed and wasn't 2031 * allocated by alloc_msg. This allows us to read a small fixed-size 2032 * per-type header in the front and then gracefully fail (i.e., 2033 * propagate the error to the caller based on info in the front) when 2034 * the middle is too large. 2035 */ 2036 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) 2037 { 2038 int type = le16_to_cpu(msg->hdr.type); 2039 int middle_len = le32_to_cpu(msg->hdr.middle_len); 2040 2041 dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, 2042 ceph_msg_type_name(type), middle_len); 2043 BUG_ON(!middle_len); 2044 BUG_ON(msg->middle); 2045 2046 msg->middle = ceph_buffer_new(middle_len, GFP_NOFS); 2047 if (!msg->middle) 2048 return -ENOMEM; 2049 return 0; 2050 } 2051 2052 /* 2053 * Allocate a message for receiving an incoming message on a 2054 * connection, and save the result in con->in_msg. Uses the 2055 * connection's private alloc_msg op if available. 2056 * 2057 * Returns 0 on success, or a negative error code. 2058 * 2059 * On success, if we set *skip = 1: 2060 * - the next message should be skipped and ignored. 2061 * - con->in_msg == NULL 2062 * or if we set *skip = 0: 2063 * - con->in_msg is non-null. 2064 * On error (ENOMEM, EAGAIN, ...), 2065 * - con->in_msg == NULL 2066 */ 2067 int ceph_con_in_msg_alloc(struct ceph_connection *con, 2068 struct ceph_msg_header *hdr, int *skip) 2069 { 2070 int middle_len = le32_to_cpu(hdr->middle_len); 2071 struct ceph_msg *msg; 2072 int ret = 0; 2073 2074 BUG_ON(con->in_msg != NULL); 2075 BUG_ON(!con->ops->alloc_msg); 2076 2077 mutex_unlock(&con->mutex); 2078 msg = con->ops->alloc_msg(con, hdr, skip); 2079 mutex_lock(&con->mutex); 2080 if (con->state != CEPH_CON_S_OPEN) { 2081 if (msg) 2082 ceph_msg_put(msg); 2083 return -EAGAIN; 2084 } 2085 if (msg) { 2086 BUG_ON(*skip); 2087 msg_con_set(msg, con); 2088 con->in_msg = msg; 2089 } else { 2090 /* 2091 * Null message pointer means either we should skip 2092 * this message or we couldn't allocate memory. The 2093 * former is not an error. 2094 */ 2095 if (*skip) 2096 return 0; 2097 2098 con->error_msg = "error allocating memory for incoming message"; 2099 return -ENOMEM; 2100 } 2101 memcpy(&con->in_msg->hdr, hdr, sizeof(*hdr)); 2102 2103 if (middle_len && !con->in_msg->middle) { 2104 ret = ceph_alloc_middle(con, con->in_msg); 2105 if (ret < 0) { 2106 ceph_msg_put(con->in_msg); 2107 con->in_msg = NULL; 2108 } 2109 } 2110 2111 return ret; 2112 } 2113 2114 void ceph_con_get_out_msg(struct ceph_connection *con) 2115 { 2116 struct ceph_msg *msg; 2117 2118 BUG_ON(list_empty(&con->out_queue)); 2119 msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head); 2120 WARN_ON(msg->con != con); 2121 2122 /* 2123 * Put the message on "sent" list using a ref from ceph_con_send(). 2124 * It is put when the message is acked or revoked. 2125 */ 2126 list_move_tail(&msg->list_head, &con->out_sent); 2127 2128 /* 2129 * Only assign outgoing seq # if we haven't sent this message 2130 * yet. If it is requeued, resend with it's original seq. 2131 */ 2132 if (msg->needs_out_seq) { 2133 msg->hdr.seq = cpu_to_le64(++con->out_seq); 2134 msg->needs_out_seq = false; 2135 2136 if (con->ops->reencode_message) 2137 con->ops->reencode_message(msg); 2138 } 2139 2140 /* 2141 * Get a ref for out_msg. It is put when we are done sending the 2142 * message or in case of a fault. 2143 */ 2144 WARN_ON(con->out_msg); 2145 con->out_msg = ceph_msg_get(msg); 2146 } 2147 2148 /* 2149 * Free a generically kmalloc'd message. 2150 */ 2151 static void ceph_msg_free(struct ceph_msg *m) 2152 { 2153 dout("%s %p\n", __func__, m); 2154 kvfree(m->front.iov_base); 2155 kfree(m->data); 2156 kmem_cache_free(ceph_msg_cache, m); 2157 } 2158 2159 static void ceph_msg_release(struct kref *kref) 2160 { 2161 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 2162 int i; 2163 2164 dout("%s %p\n", __func__, m); 2165 WARN_ON(!list_empty(&m->list_head)); 2166 2167 msg_con_set(m, NULL); 2168 2169 /* drop middle, data, if any */ 2170 if (m->middle) { 2171 ceph_buffer_put(m->middle); 2172 m->middle = NULL; 2173 } 2174 2175 for (i = 0; i < m->num_data_items; i++) 2176 ceph_msg_data_destroy(&m->data[i]); 2177 2178 if (m->pool) 2179 ceph_msgpool_put(m->pool, m); 2180 else 2181 ceph_msg_free(m); 2182 } 2183 2184 struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) 2185 { 2186 dout("%s %p (was %d)\n", __func__, msg, 2187 kref_read(&msg->kref)); 2188 kref_get(&msg->kref); 2189 return msg; 2190 } 2191 EXPORT_SYMBOL(ceph_msg_get); 2192 2193 void ceph_msg_put(struct ceph_msg *msg) 2194 { 2195 dout("%s %p (was %d)\n", __func__, msg, 2196 kref_read(&msg->kref)); 2197 kref_put(&msg->kref, ceph_msg_release); 2198 } 2199 EXPORT_SYMBOL(ceph_msg_put); 2200 2201 void ceph_msg_dump(struct ceph_msg *msg) 2202 { 2203 pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg, 2204 msg->front_alloc_len, msg->data_length); 2205 print_hex_dump(KERN_DEBUG, "header: ", 2206 DUMP_PREFIX_OFFSET, 16, 1, 2207 &msg->hdr, sizeof(msg->hdr), true); 2208 print_hex_dump(KERN_DEBUG, " front: ", 2209 DUMP_PREFIX_OFFSET, 16, 1, 2210 msg->front.iov_base, msg->front.iov_len, true); 2211 if (msg->middle) 2212 print_hex_dump(KERN_DEBUG, "middle: ", 2213 DUMP_PREFIX_OFFSET, 16, 1, 2214 msg->middle->vec.iov_base, 2215 msg->middle->vec.iov_len, true); 2216 print_hex_dump(KERN_DEBUG, "footer: ", 2217 DUMP_PREFIX_OFFSET, 16, 1, 2218 &msg->footer, sizeof(msg->footer), true); 2219 } 2220 EXPORT_SYMBOL(ceph_msg_dump); 2221