1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/bvec.h> 5 #include <linux/crc32c.h> 6 #include <linux/net.h> 7 #include <linux/socket.h> 8 #include <net/sock.h> 9 10 #include <linux/ceph/ceph_features.h> 11 #include <linux/ceph/decode.h> 12 #include <linux/ceph/libceph.h> 13 #include <linux/ceph/messenger.h> 14 15 /* static tag bytes (protocol control messages) */ 16 static char tag_msg = CEPH_MSGR_TAG_MSG; 17 static char tag_ack = CEPH_MSGR_TAG_ACK; 18 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 19 static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; 20 21 /* 22 * If @buf is NULL, discard up to @len bytes. 23 */ 24 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 25 { 26 struct kvec iov = {buf, len}; 27 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 28 int r; 29 30 if (!buf) 31 msg.msg_flags |= MSG_TRUNC; 32 33 iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, len); 34 r = sock_recvmsg(sock, &msg, msg.msg_flags); 35 if (r == -EAGAIN) 36 r = 0; 37 return r; 38 } 39 40 static int ceph_tcp_recvpage(struct socket *sock, struct page *page, 41 int page_offset, size_t length) 42 { 43 struct bio_vec bvec; 44 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 45 int r; 46 47 BUG_ON(page_offset + length > PAGE_SIZE); 48 bvec_set_page(&bvec, page, length, page_offset); 49 iov_iter_bvec(&msg.msg_iter, ITER_DEST, &bvec, 1, length); 50 r = sock_recvmsg(sock, &msg, msg.msg_flags); 51 if (r == -EAGAIN) 52 r = 0; 53 return r; 54 } 55 56 /* 57 * write something. @more is true if caller will be sending more data 58 * shortly. 59 */ 60 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 61 size_t kvlen, size_t len, bool more) 62 { 63 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 64 int r; 65 66 if (more) 67 msg.msg_flags |= MSG_MORE; 68 else 69 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 70 71 r = kernel_sendmsg(sock, &msg, iov, kvlen, len); 72 if (r == -EAGAIN) 73 r = 0; 74 return r; 75 } 76 77 /* 78 * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST 79 */ 80 static int ceph_tcp_sendpage(struct socket *sock, struct page *page, 81 int offset, size_t size, int more) 82 { 83 ssize_t (*sendpage)(struct socket *sock, struct page *page, 84 int offset, size_t size, int flags); 85 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more; 86 int ret; 87 88 /* 89 * sendpage cannot properly handle pages with page_count == 0, 90 * we need to fall back to sendmsg if that's the case. 91 * 92 * Same goes for slab pages: skb_can_coalesce() allows 93 * coalescing neighboring slab objects into a single frag which 94 * triggers one of hardened usercopy checks. 95 */ 96 if (sendpage_ok(page)) 97 sendpage = sock->ops->sendpage; 98 else 99 sendpage = sock_no_sendpage; 100 101 ret = sendpage(sock, page, offset, size, flags); 102 if (ret == -EAGAIN) 103 ret = 0; 104 105 return ret; 106 } 107 108 static void con_out_kvec_reset(struct ceph_connection *con) 109 { 110 BUG_ON(con->v1.out_skip); 111 112 con->v1.out_kvec_left = 0; 113 con->v1.out_kvec_bytes = 0; 114 con->v1.out_kvec_cur = &con->v1.out_kvec[0]; 115 } 116 117 static void con_out_kvec_add(struct ceph_connection *con, 118 size_t size, void *data) 119 { 120 int index = con->v1.out_kvec_left; 121 122 BUG_ON(con->v1.out_skip); 123 BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec)); 124 125 con->v1.out_kvec[index].iov_len = size; 126 con->v1.out_kvec[index].iov_base = data; 127 con->v1.out_kvec_left++; 128 con->v1.out_kvec_bytes += size; 129 } 130 131 /* 132 * Chop off a kvec from the end. Return residual number of bytes for 133 * that kvec, i.e. how many bytes would have been written if the kvec 134 * hadn't been nuked. 135 */ 136 static int con_out_kvec_skip(struct ceph_connection *con) 137 { 138 int skip = 0; 139 140 if (con->v1.out_kvec_bytes > 0) { 141 skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len; 142 BUG_ON(con->v1.out_kvec_bytes < skip); 143 BUG_ON(!con->v1.out_kvec_left); 144 con->v1.out_kvec_bytes -= skip; 145 con->v1.out_kvec_left--; 146 } 147 148 return skip; 149 } 150 151 static size_t sizeof_footer(struct ceph_connection *con) 152 { 153 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? 154 sizeof(struct ceph_msg_footer) : 155 sizeof(struct ceph_msg_footer_old); 156 } 157 158 static void prepare_message_data(struct ceph_msg *msg, u32 data_len) 159 { 160 /* Initialize data cursor */ 161 162 ceph_msg_data_cursor_init(&msg->cursor, msg, data_len); 163 } 164 165 /* 166 * Prepare footer for currently outgoing message, and finish things 167 * off. Assumes out_kvec* are already valid.. we just add on to the end. 168 */ 169 static void prepare_write_message_footer(struct ceph_connection *con) 170 { 171 struct ceph_msg *m = con->out_msg; 172 173 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; 174 175 dout("prepare_write_message_footer %p\n", con); 176 con_out_kvec_add(con, sizeof_footer(con), &m->footer); 177 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { 178 if (con->ops->sign_message) 179 con->ops->sign_message(m); 180 else 181 m->footer.sig = 0; 182 } else { 183 m->old_footer.flags = m->footer.flags; 184 } 185 con->v1.out_more = m->more_to_follow; 186 con->v1.out_msg_done = true; 187 } 188 189 /* 190 * Prepare headers for the next outgoing message. 191 */ 192 static void prepare_write_message(struct ceph_connection *con) 193 { 194 struct ceph_msg *m; 195 u32 crc; 196 197 con_out_kvec_reset(con); 198 con->v1.out_msg_done = false; 199 200 /* Sneak an ack in there first? If we can get it into the same 201 * TCP packet that's a good thing. */ 202 if (con->in_seq > con->in_seq_acked) { 203 con->in_seq_acked = con->in_seq; 204 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 205 con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); 206 con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), 207 &con->v1.out_temp_ack); 208 } 209 210 ceph_con_get_out_msg(con); 211 m = con->out_msg; 212 213 dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n", 214 m, con->out_seq, le16_to_cpu(m->hdr.type), 215 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 216 m->data_length); 217 WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len)); 218 WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len)); 219 220 /* tag + hdr + front + middle */ 221 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 222 con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr); 223 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 224 225 if (m->middle) 226 con_out_kvec_add(con, m->middle->vec.iov_len, 227 m->middle->vec.iov_base); 228 229 /* fill in hdr crc and finalize hdr */ 230 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 231 con->out_msg->hdr.crc = cpu_to_le32(crc); 232 memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr)); 233 234 /* fill in front and middle crc, footer */ 235 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 236 con->out_msg->footer.front_crc = cpu_to_le32(crc); 237 if (m->middle) { 238 crc = crc32c(0, m->middle->vec.iov_base, 239 m->middle->vec.iov_len); 240 con->out_msg->footer.middle_crc = cpu_to_le32(crc); 241 } else 242 con->out_msg->footer.middle_crc = 0; 243 dout("%s front_crc %u middle_crc %u\n", __func__, 244 le32_to_cpu(con->out_msg->footer.front_crc), 245 le32_to_cpu(con->out_msg->footer.middle_crc)); 246 con->out_msg->footer.flags = 0; 247 248 /* is there a data payload? */ 249 con->out_msg->footer.data_crc = 0; 250 if (m->data_length) { 251 prepare_message_data(con->out_msg, m->data_length); 252 con->v1.out_more = 1; /* data + footer will follow */ 253 } else { 254 /* no, queue up footer too and be done */ 255 prepare_write_message_footer(con); 256 } 257 258 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 259 } 260 261 /* 262 * Prepare an ack. 263 */ 264 static void prepare_write_ack(struct ceph_connection *con) 265 { 266 dout("prepare_write_ack %p %llu -> %llu\n", con, 267 con->in_seq_acked, con->in_seq); 268 con->in_seq_acked = con->in_seq; 269 270 con_out_kvec_reset(con); 271 272 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 273 274 con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); 275 con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), 276 &con->v1.out_temp_ack); 277 278 con->v1.out_more = 1; /* more will follow.. eventually.. */ 279 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 280 } 281 282 /* 283 * Prepare to share the seq during handshake 284 */ 285 static void prepare_write_seq(struct ceph_connection *con) 286 { 287 dout("prepare_write_seq %p %llu -> %llu\n", con, 288 con->in_seq_acked, con->in_seq); 289 con->in_seq_acked = con->in_seq; 290 291 con_out_kvec_reset(con); 292 293 con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked); 294 con_out_kvec_add(con, sizeof(con->v1.out_temp_ack), 295 &con->v1.out_temp_ack); 296 297 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 298 } 299 300 /* 301 * Prepare to write keepalive byte. 302 */ 303 static void prepare_write_keepalive(struct ceph_connection *con) 304 { 305 dout("prepare_write_keepalive %p\n", con); 306 con_out_kvec_reset(con); 307 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { 308 struct timespec64 now; 309 310 ktime_get_real_ts64(&now); 311 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); 312 ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now); 313 con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2), 314 &con->v1.out_temp_keepalive2); 315 } else { 316 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); 317 } 318 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 319 } 320 321 /* 322 * Connection negotiation. 323 */ 324 325 static int get_connect_authorizer(struct ceph_connection *con) 326 { 327 struct ceph_auth_handshake *auth; 328 int auth_proto; 329 330 if (!con->ops->get_authorizer) { 331 con->v1.auth = NULL; 332 con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 333 con->v1.out_connect.authorizer_len = 0; 334 return 0; 335 } 336 337 auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry); 338 if (IS_ERR(auth)) 339 return PTR_ERR(auth); 340 341 con->v1.auth = auth; 342 con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto); 343 con->v1.out_connect.authorizer_len = 344 cpu_to_le32(auth->authorizer_buf_len); 345 return 0; 346 } 347 348 /* 349 * We connected to a peer and are saying hello. 350 */ 351 static void prepare_write_banner(struct ceph_connection *con) 352 { 353 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 354 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), 355 &con->msgr->my_enc_addr); 356 357 con->v1.out_more = 0; 358 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 359 } 360 361 static void __prepare_write_connect(struct ceph_connection *con) 362 { 363 con_out_kvec_add(con, sizeof(con->v1.out_connect), 364 &con->v1.out_connect); 365 if (con->v1.auth) 366 con_out_kvec_add(con, con->v1.auth->authorizer_buf_len, 367 con->v1.auth->authorizer_buf); 368 369 con->v1.out_more = 0; 370 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 371 } 372 373 static int prepare_write_connect(struct ceph_connection *con) 374 { 375 unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); 376 int proto; 377 int ret; 378 379 switch (con->peer_name.type) { 380 case CEPH_ENTITY_TYPE_MON: 381 proto = CEPH_MONC_PROTOCOL; 382 break; 383 case CEPH_ENTITY_TYPE_OSD: 384 proto = CEPH_OSDC_PROTOCOL; 385 break; 386 case CEPH_ENTITY_TYPE_MDS: 387 proto = CEPH_MDSC_PROTOCOL; 388 break; 389 default: 390 BUG(); 391 } 392 393 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 394 con->v1.connect_seq, global_seq, proto); 395 396 con->v1.out_connect.features = 397 cpu_to_le64(from_msgr(con->msgr)->supported_features); 398 con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 399 con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq); 400 con->v1.out_connect.global_seq = cpu_to_le32(global_seq); 401 con->v1.out_connect.protocol_version = cpu_to_le32(proto); 402 con->v1.out_connect.flags = 0; 403 404 ret = get_connect_authorizer(con); 405 if (ret) 406 return ret; 407 408 __prepare_write_connect(con); 409 return 0; 410 } 411 412 /* 413 * write as much of pending kvecs to the socket as we can. 414 * 1 -> done 415 * 0 -> socket full, but more to do 416 * <0 -> error 417 */ 418 static int write_partial_kvec(struct ceph_connection *con) 419 { 420 int ret; 421 422 dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes); 423 while (con->v1.out_kvec_bytes > 0) { 424 ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur, 425 con->v1.out_kvec_left, 426 con->v1.out_kvec_bytes, 427 con->v1.out_more); 428 if (ret <= 0) 429 goto out; 430 con->v1.out_kvec_bytes -= ret; 431 if (!con->v1.out_kvec_bytes) 432 break; /* done */ 433 434 /* account for full iov entries consumed */ 435 while (ret >= con->v1.out_kvec_cur->iov_len) { 436 BUG_ON(!con->v1.out_kvec_left); 437 ret -= con->v1.out_kvec_cur->iov_len; 438 con->v1.out_kvec_cur++; 439 con->v1.out_kvec_left--; 440 } 441 /* and for a partially-consumed entry */ 442 if (ret) { 443 con->v1.out_kvec_cur->iov_len -= ret; 444 con->v1.out_kvec_cur->iov_base += ret; 445 } 446 } 447 con->v1.out_kvec_left = 0; 448 ret = 1; 449 out: 450 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 451 con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret); 452 return ret; /* done! */ 453 } 454 455 /* 456 * Write as much message data payload as we can. If we finish, queue 457 * up the footer. 458 * 1 -> done, footer is now queued in out_kvec[]. 459 * 0 -> socket full, but more to do 460 * <0 -> error 461 */ 462 static int write_partial_message_data(struct ceph_connection *con) 463 { 464 struct ceph_msg *msg = con->out_msg; 465 struct ceph_msg_data_cursor *cursor = &msg->cursor; 466 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 467 int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; 468 u32 crc; 469 470 dout("%s %p msg %p\n", __func__, con, msg); 471 472 if (!msg->num_data_items) 473 return -EINVAL; 474 475 /* 476 * Iterate through each page that contains data to be 477 * written, and send as much as possible for each. 478 * 479 * If we are calculating the data crc (the default), we will 480 * need to map the page. If we have no pages, they have 481 * been revoked, so use the zero page. 482 */ 483 crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; 484 while (cursor->total_resid) { 485 struct page *page; 486 size_t page_offset; 487 size_t length; 488 int ret; 489 490 if (!cursor->resid) { 491 ceph_msg_data_advance(cursor, 0); 492 continue; 493 } 494 495 page = ceph_msg_data_next(cursor, &page_offset, &length); 496 if (length == cursor->total_resid) 497 more = MSG_MORE; 498 ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, 499 more); 500 if (ret <= 0) { 501 if (do_datacrc) 502 msg->footer.data_crc = cpu_to_le32(crc); 503 504 return ret; 505 } 506 if (do_datacrc && cursor->need_crc) 507 crc = ceph_crc32c_page(crc, page, page_offset, length); 508 ceph_msg_data_advance(cursor, (size_t)ret); 509 } 510 511 dout("%s %p msg %p done\n", __func__, con, msg); 512 513 /* prepare and queue up footer, too */ 514 if (do_datacrc) 515 msg->footer.data_crc = cpu_to_le32(crc); 516 else 517 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 518 con_out_kvec_reset(con); 519 prepare_write_message_footer(con); 520 521 return 1; /* must return > 0 to indicate success */ 522 } 523 524 /* 525 * write some zeros 526 */ 527 static int write_partial_skip(struct ceph_connection *con) 528 { 529 int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; 530 int ret; 531 532 dout("%s %p %d left\n", __func__, con, con->v1.out_skip); 533 while (con->v1.out_skip > 0) { 534 size_t size = min(con->v1.out_skip, (int)PAGE_SIZE); 535 536 if (size == con->v1.out_skip) 537 more = MSG_MORE; 538 ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, 539 more); 540 if (ret <= 0) 541 goto out; 542 con->v1.out_skip -= ret; 543 } 544 ret = 1; 545 out: 546 return ret; 547 } 548 549 /* 550 * Prepare to read connection handshake, or an ack. 551 */ 552 static void prepare_read_banner(struct ceph_connection *con) 553 { 554 dout("prepare_read_banner %p\n", con); 555 con->v1.in_base_pos = 0; 556 } 557 558 static void prepare_read_connect(struct ceph_connection *con) 559 { 560 dout("prepare_read_connect %p\n", con); 561 con->v1.in_base_pos = 0; 562 } 563 564 static void prepare_read_ack(struct ceph_connection *con) 565 { 566 dout("prepare_read_ack %p\n", con); 567 con->v1.in_base_pos = 0; 568 } 569 570 static void prepare_read_seq(struct ceph_connection *con) 571 { 572 dout("prepare_read_seq %p\n", con); 573 con->v1.in_base_pos = 0; 574 con->v1.in_tag = CEPH_MSGR_TAG_SEQ; 575 } 576 577 static void prepare_read_tag(struct ceph_connection *con) 578 { 579 dout("prepare_read_tag %p\n", con); 580 con->v1.in_base_pos = 0; 581 con->v1.in_tag = CEPH_MSGR_TAG_READY; 582 } 583 584 static void prepare_read_keepalive_ack(struct ceph_connection *con) 585 { 586 dout("prepare_read_keepalive_ack %p\n", con); 587 con->v1.in_base_pos = 0; 588 } 589 590 /* 591 * Prepare to read a message. 592 */ 593 static int prepare_read_message(struct ceph_connection *con) 594 { 595 dout("prepare_read_message %p\n", con); 596 BUG_ON(con->in_msg != NULL); 597 con->v1.in_base_pos = 0; 598 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 599 return 0; 600 } 601 602 static int read_partial(struct ceph_connection *con, 603 int end, int size, void *object) 604 { 605 while (con->v1.in_base_pos < end) { 606 int left = end - con->v1.in_base_pos; 607 int have = size - left; 608 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 609 if (ret <= 0) 610 return ret; 611 con->v1.in_base_pos += ret; 612 } 613 return 1; 614 } 615 616 /* 617 * Read all or part of the connect-side handshake on a new connection 618 */ 619 static int read_partial_banner(struct ceph_connection *con) 620 { 621 int size; 622 int end; 623 int ret; 624 625 dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos); 626 627 /* peer's banner */ 628 size = strlen(CEPH_BANNER); 629 end = size; 630 ret = read_partial(con, end, size, con->v1.in_banner); 631 if (ret <= 0) 632 goto out; 633 634 size = sizeof(con->v1.actual_peer_addr); 635 end += size; 636 ret = read_partial(con, end, size, &con->v1.actual_peer_addr); 637 if (ret <= 0) 638 goto out; 639 ceph_decode_banner_addr(&con->v1.actual_peer_addr); 640 641 size = sizeof(con->v1.peer_addr_for_me); 642 end += size; 643 ret = read_partial(con, end, size, &con->v1.peer_addr_for_me); 644 if (ret <= 0) 645 goto out; 646 ceph_decode_banner_addr(&con->v1.peer_addr_for_me); 647 648 out: 649 return ret; 650 } 651 652 static int read_partial_connect(struct ceph_connection *con) 653 { 654 int size; 655 int end; 656 int ret; 657 658 dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos); 659 660 size = sizeof(con->v1.in_reply); 661 end = size; 662 ret = read_partial(con, end, size, &con->v1.in_reply); 663 if (ret <= 0) 664 goto out; 665 666 if (con->v1.auth) { 667 size = le32_to_cpu(con->v1.in_reply.authorizer_len); 668 if (size > con->v1.auth->authorizer_reply_buf_len) { 669 pr_err("authorizer reply too big: %d > %zu\n", size, 670 con->v1.auth->authorizer_reply_buf_len); 671 ret = -EINVAL; 672 goto out; 673 } 674 675 end += size; 676 ret = read_partial(con, end, size, 677 con->v1.auth->authorizer_reply_buf); 678 if (ret <= 0) 679 goto out; 680 } 681 682 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 683 con, con->v1.in_reply.tag, 684 le32_to_cpu(con->v1.in_reply.connect_seq), 685 le32_to_cpu(con->v1.in_reply.global_seq)); 686 out: 687 return ret; 688 } 689 690 /* 691 * Verify the hello banner looks okay. 692 */ 693 static int verify_hello(struct ceph_connection *con) 694 { 695 if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 696 pr_err("connect to %s got bad banner\n", 697 ceph_pr_addr(&con->peer_addr)); 698 con->error_msg = "protocol error, bad banner"; 699 return -1; 700 } 701 return 0; 702 } 703 704 static int process_banner(struct ceph_connection *con) 705 { 706 struct ceph_entity_addr *my_addr = &con->msgr->inst.addr; 707 708 dout("process_banner on %p\n", con); 709 710 if (verify_hello(con) < 0) 711 return -1; 712 713 /* 714 * Make sure the other end is who we wanted. note that the other 715 * end may not yet know their ip address, so if it's 0.0.0.0, give 716 * them the benefit of the doubt. 717 */ 718 if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr, 719 sizeof(con->peer_addr)) != 0 && 720 !(ceph_addr_is_blank(&con->v1.actual_peer_addr) && 721 con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) { 722 pr_warn("wrong peer, want %s/%u, got %s/%u\n", 723 ceph_pr_addr(&con->peer_addr), 724 le32_to_cpu(con->peer_addr.nonce), 725 ceph_pr_addr(&con->v1.actual_peer_addr), 726 le32_to_cpu(con->v1.actual_peer_addr.nonce)); 727 con->error_msg = "wrong peer at address"; 728 return -1; 729 } 730 731 /* 732 * did we learn our address? 733 */ 734 if (ceph_addr_is_blank(my_addr)) { 735 memcpy(&my_addr->in_addr, 736 &con->v1.peer_addr_for_me.in_addr, 737 sizeof(con->v1.peer_addr_for_me.in_addr)); 738 ceph_addr_set_port(my_addr, 0); 739 ceph_encode_my_addr(con->msgr); 740 dout("process_banner learned my addr is %s\n", 741 ceph_pr_addr(my_addr)); 742 } 743 744 return 0; 745 } 746 747 static int process_connect(struct ceph_connection *con) 748 { 749 u64 sup_feat = from_msgr(con->msgr)->supported_features; 750 u64 req_feat = from_msgr(con->msgr)->required_features; 751 u64 server_feat = le64_to_cpu(con->v1.in_reply.features); 752 int ret; 753 754 dout("process_connect on %p tag %d\n", con, con->v1.in_tag); 755 756 if (con->v1.auth) { 757 int len = le32_to_cpu(con->v1.in_reply.authorizer_len); 758 759 /* 760 * Any connection that defines ->get_authorizer() 761 * should also define ->add_authorizer_challenge() and 762 * ->verify_authorizer_reply(). 763 * 764 * See get_connect_authorizer(). 765 */ 766 if (con->v1.in_reply.tag == 767 CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { 768 ret = con->ops->add_authorizer_challenge( 769 con, con->v1.auth->authorizer_reply_buf, len); 770 if (ret < 0) 771 return ret; 772 773 con_out_kvec_reset(con); 774 __prepare_write_connect(con); 775 prepare_read_connect(con); 776 return 0; 777 } 778 779 if (len) { 780 ret = con->ops->verify_authorizer_reply(con); 781 if (ret < 0) { 782 con->error_msg = "bad authorize reply"; 783 return ret; 784 } 785 } 786 } 787 788 switch (con->v1.in_reply.tag) { 789 case CEPH_MSGR_TAG_FEATURES: 790 pr_err("%s%lld %s feature set mismatch," 791 " my %llx < server's %llx, missing %llx\n", 792 ENTITY_NAME(con->peer_name), 793 ceph_pr_addr(&con->peer_addr), 794 sup_feat, server_feat, server_feat & ~sup_feat); 795 con->error_msg = "missing required protocol features"; 796 return -1; 797 798 case CEPH_MSGR_TAG_BADPROTOVER: 799 pr_err("%s%lld %s protocol version mismatch," 800 " my %d != server's %d\n", 801 ENTITY_NAME(con->peer_name), 802 ceph_pr_addr(&con->peer_addr), 803 le32_to_cpu(con->v1.out_connect.protocol_version), 804 le32_to_cpu(con->v1.in_reply.protocol_version)); 805 con->error_msg = "protocol version mismatch"; 806 return -1; 807 808 case CEPH_MSGR_TAG_BADAUTHORIZER: 809 con->v1.auth_retry++; 810 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 811 con->v1.auth_retry); 812 if (con->v1.auth_retry == 2) { 813 con->error_msg = "connect authorization failure"; 814 return -1; 815 } 816 con_out_kvec_reset(con); 817 ret = prepare_write_connect(con); 818 if (ret < 0) 819 return ret; 820 prepare_read_connect(con); 821 break; 822 823 case CEPH_MSGR_TAG_RESETSESSION: 824 /* 825 * If we connected with a large connect_seq but the peer 826 * has no record of a session with us (no connection, or 827 * connect_seq == 0), they will send RESETSESION to indicate 828 * that they must have reset their session, and may have 829 * dropped messages. 830 */ 831 dout("process_connect got RESET peer seq %u\n", 832 le32_to_cpu(con->v1.in_reply.connect_seq)); 833 pr_info("%s%lld %s session reset\n", 834 ENTITY_NAME(con->peer_name), 835 ceph_pr_addr(&con->peer_addr)); 836 ceph_con_reset_session(con); 837 con_out_kvec_reset(con); 838 ret = prepare_write_connect(con); 839 if (ret < 0) 840 return ret; 841 prepare_read_connect(con); 842 843 /* Tell ceph about it. */ 844 mutex_unlock(&con->mutex); 845 if (con->ops->peer_reset) 846 con->ops->peer_reset(con); 847 mutex_lock(&con->mutex); 848 if (con->state != CEPH_CON_S_V1_CONNECT_MSG) 849 return -EAGAIN; 850 break; 851 852 case CEPH_MSGR_TAG_RETRY_SESSION: 853 /* 854 * If we sent a smaller connect_seq than the peer has, try 855 * again with a larger value. 856 */ 857 dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", 858 le32_to_cpu(con->v1.out_connect.connect_seq), 859 le32_to_cpu(con->v1.in_reply.connect_seq)); 860 con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq); 861 con_out_kvec_reset(con); 862 ret = prepare_write_connect(con); 863 if (ret < 0) 864 return ret; 865 prepare_read_connect(con); 866 break; 867 868 case CEPH_MSGR_TAG_RETRY_GLOBAL: 869 /* 870 * If we sent a smaller global_seq than the peer has, try 871 * again with a larger value. 872 */ 873 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 874 con->v1.peer_global_seq, 875 le32_to_cpu(con->v1.in_reply.global_seq)); 876 ceph_get_global_seq(con->msgr, 877 le32_to_cpu(con->v1.in_reply.global_seq)); 878 con_out_kvec_reset(con); 879 ret = prepare_write_connect(con); 880 if (ret < 0) 881 return ret; 882 prepare_read_connect(con); 883 break; 884 885 case CEPH_MSGR_TAG_SEQ: 886 case CEPH_MSGR_TAG_READY: 887 if (req_feat & ~server_feat) { 888 pr_err("%s%lld %s protocol feature mismatch," 889 " my required %llx > server's %llx, need %llx\n", 890 ENTITY_NAME(con->peer_name), 891 ceph_pr_addr(&con->peer_addr), 892 req_feat, server_feat, req_feat & ~server_feat); 893 con->error_msg = "missing required protocol features"; 894 return -1; 895 } 896 897 WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG); 898 con->state = CEPH_CON_S_OPEN; 899 con->v1.auth_retry = 0; /* we authenticated; clear flag */ 900 con->v1.peer_global_seq = 901 le32_to_cpu(con->v1.in_reply.global_seq); 902 con->v1.connect_seq++; 903 con->peer_features = server_feat; 904 dout("process_connect got READY gseq %d cseq %d (%d)\n", 905 con->v1.peer_global_seq, 906 le32_to_cpu(con->v1.in_reply.connect_seq), 907 con->v1.connect_seq); 908 WARN_ON(con->v1.connect_seq != 909 le32_to_cpu(con->v1.in_reply.connect_seq)); 910 911 if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 912 ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); 913 914 con->delay = 0; /* reset backoff memory */ 915 916 if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) { 917 prepare_write_seq(con); 918 prepare_read_seq(con); 919 } else { 920 prepare_read_tag(con); 921 } 922 break; 923 924 case CEPH_MSGR_TAG_WAIT: 925 /* 926 * If there is a connection race (we are opening 927 * connections to each other), one of us may just have 928 * to WAIT. This shouldn't happen if we are the 929 * client. 930 */ 931 con->error_msg = "protocol error, got WAIT as client"; 932 return -1; 933 934 default: 935 con->error_msg = "protocol error, garbage tag during connect"; 936 return -1; 937 } 938 return 0; 939 } 940 941 /* 942 * read (part of) an ack 943 */ 944 static int read_partial_ack(struct ceph_connection *con) 945 { 946 int size = sizeof(con->v1.in_temp_ack); 947 int end = size; 948 949 return read_partial(con, end, size, &con->v1.in_temp_ack); 950 } 951 952 /* 953 * We can finally discard anything that's been acked. 954 */ 955 static void process_ack(struct ceph_connection *con) 956 { 957 u64 ack = le64_to_cpu(con->v1.in_temp_ack); 958 959 if (con->v1.in_tag == CEPH_MSGR_TAG_ACK) 960 ceph_con_discard_sent(con, ack); 961 else 962 ceph_con_discard_requeued(con, ack); 963 964 prepare_read_tag(con); 965 } 966 967 static int read_partial_message_section(struct ceph_connection *con, 968 struct kvec *section, 969 unsigned int sec_len, u32 *crc) 970 { 971 int ret, left; 972 973 BUG_ON(!section); 974 975 while (section->iov_len < sec_len) { 976 BUG_ON(section->iov_base == NULL); 977 left = sec_len - section->iov_len; 978 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 979 section->iov_len, left); 980 if (ret <= 0) 981 return ret; 982 section->iov_len += ret; 983 } 984 if (section->iov_len == sec_len) 985 *crc = crc32c(0, section->iov_base, section->iov_len); 986 987 return 1; 988 } 989 990 static int read_partial_msg_data(struct ceph_connection *con) 991 { 992 struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; 993 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 994 struct page *page; 995 size_t page_offset; 996 size_t length; 997 u32 crc = 0; 998 int ret; 999 1000 if (do_datacrc) 1001 crc = con->in_data_crc; 1002 while (cursor->total_resid) { 1003 if (!cursor->resid) { 1004 ceph_msg_data_advance(cursor, 0); 1005 continue; 1006 } 1007 1008 page = ceph_msg_data_next(cursor, &page_offset, &length); 1009 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); 1010 if (ret <= 0) { 1011 if (do_datacrc) 1012 con->in_data_crc = crc; 1013 1014 return ret; 1015 } 1016 1017 if (do_datacrc) 1018 crc = ceph_crc32c_page(crc, page, page_offset, ret); 1019 ceph_msg_data_advance(cursor, (size_t)ret); 1020 } 1021 if (do_datacrc) 1022 con->in_data_crc = crc; 1023 1024 return 1; /* must return > 0 to indicate success */ 1025 } 1026 1027 static int read_partial_msg_data_bounce(struct ceph_connection *con) 1028 { 1029 struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; 1030 struct page *page; 1031 size_t off, len; 1032 u32 crc; 1033 int ret; 1034 1035 if (unlikely(!con->bounce_page)) { 1036 con->bounce_page = alloc_page(GFP_NOIO); 1037 if (!con->bounce_page) { 1038 pr_err("failed to allocate bounce page\n"); 1039 return -ENOMEM; 1040 } 1041 } 1042 1043 crc = con->in_data_crc; 1044 while (cursor->total_resid) { 1045 if (!cursor->resid) { 1046 ceph_msg_data_advance(cursor, 0); 1047 continue; 1048 } 1049 1050 page = ceph_msg_data_next(cursor, &off, &len); 1051 ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len); 1052 if (ret <= 0) { 1053 con->in_data_crc = crc; 1054 return ret; 1055 } 1056 1057 crc = crc32c(crc, page_address(con->bounce_page), ret); 1058 memcpy_to_page(page, off, page_address(con->bounce_page), ret); 1059 1060 ceph_msg_data_advance(cursor, ret); 1061 } 1062 con->in_data_crc = crc; 1063 1064 return 1; /* must return > 0 to indicate success */ 1065 } 1066 1067 /* 1068 * read (part of) a message. 1069 */ 1070 static int read_partial_message(struct ceph_connection *con) 1071 { 1072 struct ceph_msg *m = con->in_msg; 1073 int size; 1074 int end; 1075 int ret; 1076 unsigned int front_len, middle_len, data_len; 1077 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 1078 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); 1079 u64 seq; 1080 u32 crc; 1081 1082 dout("read_partial_message con %p msg %p\n", con, m); 1083 1084 /* header */ 1085 size = sizeof(con->v1.in_hdr); 1086 end = size; 1087 ret = read_partial(con, end, size, &con->v1.in_hdr); 1088 if (ret <= 0) 1089 return ret; 1090 1091 crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc)); 1092 if (cpu_to_le32(crc) != con->v1.in_hdr.crc) { 1093 pr_err("read_partial_message bad hdr crc %u != expected %u\n", 1094 crc, con->v1.in_hdr.crc); 1095 return -EBADMSG; 1096 } 1097 1098 front_len = le32_to_cpu(con->v1.in_hdr.front_len); 1099 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1100 return -EIO; 1101 middle_len = le32_to_cpu(con->v1.in_hdr.middle_len); 1102 if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN) 1103 return -EIO; 1104 data_len = le32_to_cpu(con->v1.in_hdr.data_len); 1105 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1106 return -EIO; 1107 1108 /* verify seq# */ 1109 seq = le64_to_cpu(con->v1.in_hdr.seq); 1110 if ((s64)seq - (s64)con->in_seq < 1) { 1111 pr_info("skipping %s%lld %s seq %lld expected %lld\n", 1112 ENTITY_NAME(con->peer_name), 1113 ceph_pr_addr(&con->peer_addr), 1114 seq, con->in_seq + 1); 1115 con->v1.in_base_pos = -front_len - middle_len - data_len - 1116 sizeof_footer(con); 1117 con->v1.in_tag = CEPH_MSGR_TAG_READY; 1118 return 1; 1119 } else if ((s64)seq - (s64)con->in_seq > 1) { 1120 pr_err("read_partial_message bad seq %lld expected %lld\n", 1121 seq, con->in_seq + 1); 1122 con->error_msg = "bad message sequence # for incoming message"; 1123 return -EBADE; 1124 } 1125 1126 /* allocate message? */ 1127 if (!con->in_msg) { 1128 int skip = 0; 1129 1130 dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type, 1131 front_len, data_len); 1132 ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip); 1133 if (ret < 0) 1134 return ret; 1135 1136 BUG_ON((!con->in_msg) ^ skip); 1137 if (skip) { 1138 /* skip this message */ 1139 dout("alloc_msg said skip message\n"); 1140 con->v1.in_base_pos = -front_len - middle_len - 1141 data_len - sizeof_footer(con); 1142 con->v1.in_tag = CEPH_MSGR_TAG_READY; 1143 con->in_seq++; 1144 return 1; 1145 } 1146 1147 BUG_ON(!con->in_msg); 1148 BUG_ON(con->in_msg->con != con); 1149 m = con->in_msg; 1150 m->front.iov_len = 0; /* haven't read it yet */ 1151 if (m->middle) 1152 m->middle->vec.iov_len = 0; 1153 1154 /* prepare for data payload, if any */ 1155 1156 if (data_len) 1157 prepare_message_data(con->in_msg, data_len); 1158 } 1159 1160 /* front */ 1161 ret = read_partial_message_section(con, &m->front, front_len, 1162 &con->in_front_crc); 1163 if (ret <= 0) 1164 return ret; 1165 1166 /* middle */ 1167 if (m->middle) { 1168 ret = read_partial_message_section(con, &m->middle->vec, 1169 middle_len, 1170 &con->in_middle_crc); 1171 if (ret <= 0) 1172 return ret; 1173 } 1174 1175 /* (page) data */ 1176 if (data_len) { 1177 if (!m->num_data_items) 1178 return -EIO; 1179 1180 if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) 1181 ret = read_partial_msg_data_bounce(con); 1182 else 1183 ret = read_partial_msg_data(con); 1184 if (ret <= 0) 1185 return ret; 1186 } 1187 1188 /* footer */ 1189 size = sizeof_footer(con); 1190 end += size; 1191 ret = read_partial(con, end, size, &m->footer); 1192 if (ret <= 0) 1193 return ret; 1194 1195 if (!need_sign) { 1196 m->footer.flags = m->old_footer.flags; 1197 m->footer.sig = 0; 1198 } 1199 1200 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1201 m, front_len, m->footer.front_crc, middle_len, 1202 m->footer.middle_crc, data_len, m->footer.data_crc); 1203 1204 /* crc ok? */ 1205 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 1206 pr_err("read_partial_message %p front crc %u != exp. %u\n", 1207 m, con->in_front_crc, m->footer.front_crc); 1208 return -EBADMSG; 1209 } 1210 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 1211 pr_err("read_partial_message %p middle crc %u != exp %u\n", 1212 m, con->in_middle_crc, m->footer.middle_crc); 1213 return -EBADMSG; 1214 } 1215 if (do_datacrc && 1216 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 1217 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 1218 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 1219 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 1220 return -EBADMSG; 1221 } 1222 1223 if (need_sign && con->ops->check_message_signature && 1224 con->ops->check_message_signature(m)) { 1225 pr_err("read_partial_message %p signature check failed\n", m); 1226 return -EBADMSG; 1227 } 1228 1229 return 1; /* done! */ 1230 } 1231 1232 static int read_keepalive_ack(struct ceph_connection *con) 1233 { 1234 struct ceph_timespec ceph_ts; 1235 size_t size = sizeof(ceph_ts); 1236 int ret = read_partial(con, size, size, &ceph_ts); 1237 if (ret <= 0) 1238 return ret; 1239 ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); 1240 prepare_read_tag(con); 1241 return 1; 1242 } 1243 1244 /* 1245 * Read what we can from the socket. 1246 */ 1247 int ceph_con_v1_try_read(struct ceph_connection *con) 1248 { 1249 int ret = -1; 1250 1251 more: 1252 dout("try_read start %p state %d\n", con, con->state); 1253 if (con->state != CEPH_CON_S_V1_BANNER && 1254 con->state != CEPH_CON_S_V1_CONNECT_MSG && 1255 con->state != CEPH_CON_S_OPEN) 1256 return 0; 1257 1258 BUG_ON(!con->sock); 1259 1260 dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag, 1261 con->v1.in_base_pos); 1262 1263 if (con->state == CEPH_CON_S_V1_BANNER) { 1264 ret = read_partial_banner(con); 1265 if (ret <= 0) 1266 goto out; 1267 ret = process_banner(con); 1268 if (ret < 0) 1269 goto out; 1270 1271 con->state = CEPH_CON_S_V1_CONNECT_MSG; 1272 1273 /* 1274 * Received banner is good, exchange connection info. 1275 * Do not reset out_kvec, as sending our banner raced 1276 * with receiving peer banner after connect completed. 1277 */ 1278 ret = prepare_write_connect(con); 1279 if (ret < 0) 1280 goto out; 1281 prepare_read_connect(con); 1282 1283 /* Send connection info before awaiting response */ 1284 goto out; 1285 } 1286 1287 if (con->state == CEPH_CON_S_V1_CONNECT_MSG) { 1288 ret = read_partial_connect(con); 1289 if (ret <= 0) 1290 goto out; 1291 ret = process_connect(con); 1292 if (ret < 0) 1293 goto out; 1294 goto more; 1295 } 1296 1297 WARN_ON(con->state != CEPH_CON_S_OPEN); 1298 1299 if (con->v1.in_base_pos < 0) { 1300 /* 1301 * skipping + discarding content. 1302 */ 1303 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos); 1304 if (ret <= 0) 1305 goto out; 1306 dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos); 1307 con->v1.in_base_pos += ret; 1308 if (con->v1.in_base_pos) 1309 goto more; 1310 } 1311 if (con->v1.in_tag == CEPH_MSGR_TAG_READY) { 1312 /* 1313 * what's next? 1314 */ 1315 ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1); 1316 if (ret <= 0) 1317 goto out; 1318 dout("try_read got tag %d\n", con->v1.in_tag); 1319 switch (con->v1.in_tag) { 1320 case CEPH_MSGR_TAG_MSG: 1321 prepare_read_message(con); 1322 break; 1323 case CEPH_MSGR_TAG_ACK: 1324 prepare_read_ack(con); 1325 break; 1326 case CEPH_MSGR_TAG_KEEPALIVE2_ACK: 1327 prepare_read_keepalive_ack(con); 1328 break; 1329 case CEPH_MSGR_TAG_CLOSE: 1330 ceph_con_close_socket(con); 1331 con->state = CEPH_CON_S_CLOSED; 1332 goto out; 1333 default: 1334 goto bad_tag; 1335 } 1336 } 1337 if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) { 1338 ret = read_partial_message(con); 1339 if (ret <= 0) { 1340 switch (ret) { 1341 case -EBADMSG: 1342 con->error_msg = "bad crc/signature"; 1343 fallthrough; 1344 case -EBADE: 1345 ret = -EIO; 1346 break; 1347 case -EIO: 1348 con->error_msg = "io error"; 1349 break; 1350 } 1351 goto out; 1352 } 1353 if (con->v1.in_tag == CEPH_MSGR_TAG_READY) 1354 goto more; 1355 ceph_con_process_message(con); 1356 if (con->state == CEPH_CON_S_OPEN) 1357 prepare_read_tag(con); 1358 goto more; 1359 } 1360 if (con->v1.in_tag == CEPH_MSGR_TAG_ACK || 1361 con->v1.in_tag == CEPH_MSGR_TAG_SEQ) { 1362 /* 1363 * the final handshake seq exchange is semantically 1364 * equivalent to an ACK 1365 */ 1366 ret = read_partial_ack(con); 1367 if (ret <= 0) 1368 goto out; 1369 process_ack(con); 1370 goto more; 1371 } 1372 if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { 1373 ret = read_keepalive_ack(con); 1374 if (ret <= 0) 1375 goto out; 1376 goto more; 1377 } 1378 1379 out: 1380 dout("try_read done on %p ret %d\n", con, ret); 1381 return ret; 1382 1383 bad_tag: 1384 pr_err("try_read bad tag %d\n", con->v1.in_tag); 1385 con->error_msg = "protocol error, garbage tag"; 1386 ret = -1; 1387 goto out; 1388 } 1389 1390 /* 1391 * Write something to the socket. Called in a worker thread when the 1392 * socket appears to be writeable and we have something ready to send. 1393 */ 1394 int ceph_con_v1_try_write(struct ceph_connection *con) 1395 { 1396 int ret = 1; 1397 1398 dout("try_write start %p state %d\n", con, con->state); 1399 if (con->state != CEPH_CON_S_PREOPEN && 1400 con->state != CEPH_CON_S_V1_BANNER && 1401 con->state != CEPH_CON_S_V1_CONNECT_MSG && 1402 con->state != CEPH_CON_S_OPEN) 1403 return 0; 1404 1405 /* open the socket first? */ 1406 if (con->state == CEPH_CON_S_PREOPEN) { 1407 BUG_ON(con->sock); 1408 con->state = CEPH_CON_S_V1_BANNER; 1409 1410 con_out_kvec_reset(con); 1411 prepare_write_banner(con); 1412 prepare_read_banner(con); 1413 1414 BUG_ON(con->in_msg); 1415 con->v1.in_tag = CEPH_MSGR_TAG_READY; 1416 dout("try_write initiating connect on %p new state %d\n", 1417 con, con->state); 1418 ret = ceph_tcp_connect(con); 1419 if (ret < 0) { 1420 con->error_msg = "connect error"; 1421 goto out; 1422 } 1423 } 1424 1425 more: 1426 dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes); 1427 BUG_ON(!con->sock); 1428 1429 /* kvec data queued? */ 1430 if (con->v1.out_kvec_left) { 1431 ret = write_partial_kvec(con); 1432 if (ret <= 0) 1433 goto out; 1434 } 1435 if (con->v1.out_skip) { 1436 ret = write_partial_skip(con); 1437 if (ret <= 0) 1438 goto out; 1439 } 1440 1441 /* msg pages? */ 1442 if (con->out_msg) { 1443 if (con->v1.out_msg_done) { 1444 ceph_msg_put(con->out_msg); 1445 con->out_msg = NULL; /* we're done with this one */ 1446 goto do_next; 1447 } 1448 1449 ret = write_partial_message_data(con); 1450 if (ret == 1) 1451 goto more; /* we need to send the footer, too! */ 1452 if (ret == 0) 1453 goto out; 1454 if (ret < 0) { 1455 dout("try_write write_partial_message_data err %d\n", 1456 ret); 1457 goto out; 1458 } 1459 } 1460 1461 do_next: 1462 if (con->state == CEPH_CON_S_OPEN) { 1463 if (ceph_con_flag_test_and_clear(con, 1464 CEPH_CON_F_KEEPALIVE_PENDING)) { 1465 prepare_write_keepalive(con); 1466 goto more; 1467 } 1468 /* is anything else pending? */ 1469 if (!list_empty(&con->out_queue)) { 1470 prepare_write_message(con); 1471 goto more; 1472 } 1473 if (con->in_seq > con->in_seq_acked) { 1474 prepare_write_ack(con); 1475 goto more; 1476 } 1477 } 1478 1479 /* Nothing to do! */ 1480 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); 1481 dout("try_write nothing else to write.\n"); 1482 ret = 0; 1483 out: 1484 dout("try_write done on %p ret %d\n", con, ret); 1485 return ret; 1486 } 1487 1488 void ceph_con_v1_revoke(struct ceph_connection *con) 1489 { 1490 struct ceph_msg *msg = con->out_msg; 1491 1492 WARN_ON(con->v1.out_skip); 1493 /* footer */ 1494 if (con->v1.out_msg_done) { 1495 con->v1.out_skip += con_out_kvec_skip(con); 1496 } else { 1497 WARN_ON(!msg->data_length); 1498 con->v1.out_skip += sizeof_footer(con); 1499 } 1500 /* data, middle, front */ 1501 if (msg->data_length) 1502 con->v1.out_skip += msg->cursor.total_resid; 1503 if (msg->middle) 1504 con->v1.out_skip += con_out_kvec_skip(con); 1505 con->v1.out_skip += con_out_kvec_skip(con); 1506 1507 dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, 1508 con->v1.out_kvec_bytes, con->v1.out_skip); 1509 } 1510 1511 void ceph_con_v1_revoke_incoming(struct ceph_connection *con) 1512 { 1513 unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len); 1514 unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len); 1515 unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len); 1516 1517 /* skip rest of message */ 1518 con->v1.in_base_pos = con->v1.in_base_pos - 1519 sizeof(struct ceph_msg_header) - 1520 front_len - 1521 middle_len - 1522 data_len - 1523 sizeof(struct ceph_msg_footer); 1524 1525 con->v1.in_tag = CEPH_MSGR_TAG_READY; 1526 con->in_seq++; 1527 1528 dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos); 1529 } 1530 1531 bool ceph_con_v1_opened(struct ceph_connection *con) 1532 { 1533 return con->v1.connect_seq; 1534 } 1535 1536 void ceph_con_v1_reset_session(struct ceph_connection *con) 1537 { 1538 con->v1.connect_seq = 0; 1539 con->v1.peer_global_seq = 0; 1540 } 1541 1542 void ceph_con_v1_reset_protocol(struct ceph_connection *con) 1543 { 1544 con->v1.out_skip = 0; 1545 } 1546