messenger.c (566050e17e53db283d4e26b73b4b50556f97ce7b) | messenger.c (2f713615ddd9d805b6c5e79c52e0e11af99d2bf1) |
---|---|
1// SPDX-License-Identifier: GPL-2.0 2#include <linux/ceph/ceph_debug.h> 3 4#include <linux/crc32c.h> 5#include <linux/ctype.h> 6#include <linux/highmem.h> 7#include <linux/inet.h> 8#include <linux/kthread.h> --- 123 unchanged lines hidden (view full) --- 132 133 return test_and_set_bit(con_flag, &con->flags); 134} 135 136/* Slab caches for frequently-allocated structures */ 137 138static struct kmem_cache *ceph_msg_cache; 139 | 1// SPDX-License-Identifier: GPL-2.0 2#include <linux/ceph/ceph_debug.h> 3 4#include <linux/crc32c.h> 5#include <linux/ctype.h> 6#include <linux/highmem.h> 7#include <linux/inet.h> 8#include <linux/kthread.h> --- 123 unchanged lines hidden (view full) --- 132 133 return test_and_set_bit(con_flag, &con->flags); 134} 135 136/* Slab caches for frequently-allocated structures */ 137 138static struct kmem_cache *ceph_msg_cache; 139 |
140/* static tag bytes (protocol control messages) */ 141static char tag_msg = CEPH_MSGR_TAG_MSG; 142static char tag_ack = CEPH_MSGR_TAG_ACK; 143static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 144static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; 145 | |
146#ifdef CONFIG_LOCKDEP 147static struct lock_class_key socket_class; 148#endif 149 150static void queue_con(struct ceph_connection *con); 151static void cancel_con(struct ceph_connection *con); 152static void ceph_con_workfn(struct work_struct *); 153static void con_fault(struct ceph_connection *con); --- 319 unchanged lines hidden (view full) --- 473 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) 474 tcp_sock_set_nodelay(sock->sk); 475 476 con->sock = sock; 477 return 0; 478} 479 480/* | 140#ifdef CONFIG_LOCKDEP 141static struct lock_class_key socket_class; 142#endif 143 144static void queue_con(struct ceph_connection *con); 145static void cancel_con(struct ceph_connection *con); 146static void ceph_con_workfn(struct work_struct *); 147static void con_fault(struct ceph_connection *con); --- 319 unchanged lines hidden (view full) --- 467 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) 468 tcp_sock_set_nodelay(sock->sk); 469 470 con->sock = sock; 471 return 0; 472} 473 474/* |
481 * If @buf is NULL, discard up to @len bytes. 482 */ 483static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 484{ 485 struct kvec iov = {buf, len}; 486 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 487 int r; 488 489 if (!buf) 490 msg.msg_flags |= MSG_TRUNC; 491 492 iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len); 493 r = sock_recvmsg(sock, &msg, msg.msg_flags); 494 if (r == -EAGAIN) 495 r = 0; 496 return r; 497} 498 499static int ceph_tcp_recvpage(struct socket *sock, struct page *page, 500 int page_offset, size_t length) 501{ 502 struct bio_vec bvec = { 503 .bv_page = page, 504 .bv_offset = page_offset, 505 .bv_len = length 506 }; 507 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 508 int r; 509 510 BUG_ON(page_offset + length > PAGE_SIZE); 511 iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length); 512 r = sock_recvmsg(sock, &msg, msg.msg_flags); 513 if (r == -EAGAIN) 514 r = 0; 515 return r; 516} 517 518/* 519 * write something. @more is true if caller will be sending more data 520 * shortly. 521 */ 522static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 523 size_t kvlen, size_t len, bool more) 524{ 525 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 526 int r; 527 528 if (more) 529 msg.msg_flags |= MSG_MORE; 530 else 531 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 532 533 r = kernel_sendmsg(sock, &msg, iov, kvlen, len); 534 if (r == -EAGAIN) 535 r = 0; 536 return r; 537} 538 539/* 540 * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST 541 */ 542static int ceph_tcp_sendpage(struct socket *sock, struct page *page, 543 int offset, size_t size, int more) 544{ 545 ssize_t (*sendpage)(struct socket *sock, struct page *page, 546 int offset, size_t size, int flags); 547 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more; 548 int ret; 549 550 /* 551 * sendpage cannot properly handle pages with page_count == 0, 552 * we need to fall back to sendmsg if that's the case. 553 * 554 * Same goes for slab pages: skb_can_coalesce() allows 555 * coalescing neighboring slab objects into a single frag which 556 * triggers one of hardened usercopy checks. 557 */ 558 if (sendpage_ok(page)) 559 sendpage = sock->ops->sendpage; 560 else 561 sendpage = sock_no_sendpage; 562 563 ret = sendpage(sock, page, offset, size, flags); 564 if (ret == -EAGAIN) 565 ret = 0; 566 567 return ret; 568} 569 570/* | |
571 * Shutdown/close the socket for the given connection. 572 */ 573int ceph_con_close_socket(struct ceph_connection *con) 574{ 575 int rc = 0; 576 577 dout("%s con %p sock %p\n", __func__, con, con->sock); 578 if (con->sock) { --- 9 unchanged lines hidden (view full) --- 588 * shut the socket down. 589 */ 590 ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); 591 592 con_sock_state_closed(con); 593 return rc; 594} 595 | 475 * Shutdown/close the socket for the given connection. 476 */ 477int ceph_con_close_socket(struct ceph_connection *con) 478{ 479 int rc = 0; 480 481 dout("%s con %p sock %p\n", __func__, con, con->sock); 482 if (con->sock) { --- 9 unchanged lines hidden (view full) --- 492 * shut the socket down. 493 */ 494 ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); 495 496 con_sock_state_closed(con); 497 return rc; 498} 499 |
596void ceph_con_v1_reset_protocol(struct ceph_connection *con) 597{ 598 con->out_skip = 0; 599} 600 | |
601static void ceph_con_reset_protocol(struct ceph_connection *con) 602{ 603 dout("%s con %p\n", __func__, con); 604 605 ceph_con_close_socket(con); 606 if (con->in_msg) { 607 WARN_ON(con->in_msg->con != con); 608 ceph_msg_put(con->in_msg); --- 22 unchanged lines hidden (view full) --- 631{ 632 while (!list_empty(head)) { 633 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 634 list_head); 635 ceph_msg_remove(msg); 636 } 637} 638 | 500static void ceph_con_reset_protocol(struct ceph_connection *con) 501{ 502 dout("%s con %p\n", __func__, con); 503 504 ceph_con_close_socket(con); 505 if (con->in_msg) { 506 WARN_ON(con->in_msg->con != con); 507 ceph_msg_put(con->in_msg); --- 22 unchanged lines hidden (view full) --- 530{ 531 while (!list_empty(head)) { 532 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 533 list_head); 534 ceph_msg_remove(msg); 535 } 536} 537 |
639void ceph_con_v1_reset_session(struct ceph_connection *con) 640{ 641 con->connect_seq = 0; 642 con->peer_global_seq = 0; 643} 644 | |
645void ceph_con_reset_session(struct ceph_connection *con) 646{ 647 dout("%s con %p\n", __func__, con); 648 649 WARN_ON(con->in_msg); 650 WARN_ON(con->out_msg); 651 ceph_msg_remove_list(&con->out_queue); 652 ceph_msg_remove_list(&con->out_sent); --- 44 unchanged lines hidden (view full) --- 697 698 memcpy(&con->peer_addr, addr, sizeof(*addr)); 699 con->delay = 0; /* reset backoff memory */ 700 mutex_unlock(&con->mutex); 701 queue_con(con); 702} 703EXPORT_SYMBOL(ceph_con_open); 704 | 538void ceph_con_reset_session(struct ceph_connection *con) 539{ 540 dout("%s con %p\n", __func__, con); 541 542 WARN_ON(con->in_msg); 543 WARN_ON(con->out_msg); 544 ceph_msg_remove_list(&con->out_queue); 545 ceph_msg_remove_list(&con->out_sent); --- 44 unchanged lines hidden (view full) --- 590 591 memcpy(&con->peer_addr, addr, sizeof(*addr)); 592 con->delay = 0; /* reset backoff memory */ 593 mutex_unlock(&con->mutex); 594 queue_con(con); 595} 596EXPORT_SYMBOL(ceph_con_open); 597 |
705bool ceph_con_v1_opened(struct ceph_connection *con) 706{ 707 return con->connect_seq; 708} 709 | |
710/* 711 * return true if this connection ever successfully opened 712 */ 713bool ceph_con_opened(struct ceph_connection *con) 714{ 715 return ceph_con_v1_opened(con); 716} 717 --- 16 unchanged lines hidden (view full) --- 734 INIT_LIST_HEAD(&con->out_queue); 735 INIT_LIST_HEAD(&con->out_sent); 736 INIT_DELAYED_WORK(&con->work, ceph_con_workfn); 737 738 con->state = CEPH_CON_S_CLOSED; 739} 740EXPORT_SYMBOL(ceph_con_init); 741 | 598/* 599 * return true if this connection ever successfully opened 600 */ 601bool ceph_con_opened(struct ceph_connection *con) 602{ 603 return ceph_con_v1_opened(con); 604} 605 --- 16 unchanged lines hidden (view full) --- 622 INIT_LIST_HEAD(&con->out_queue); 623 INIT_LIST_HEAD(&con->out_sent); 624 INIT_DELAYED_WORK(&con->work, ceph_con_workfn); 625 626 con->state = CEPH_CON_S_CLOSED; 627} 628EXPORT_SYMBOL(ceph_con_init); 629 |
742 | |
743/* 744 * We maintain a global counter to order connection attempts. Get 745 * a unique seq greater than @gt. 746 */ 747u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt) 748{ 749 u32 ret; 750 --- 49 unchanged lines hidden (view full) --- 800 break; 801 802 dout("%s con %p discarding msg %p seq %llu\n", __func__, con, 803 msg, seq); 804 ceph_msg_remove(msg); 805 } 806} 807 | 630/* 631 * We maintain a global counter to order connection attempts. Get 632 * a unique seq greater than @gt. 633 */ 634u32 ceph_get_global_seq(struct ceph_messenger *msgr, u32 gt) 635{ 636 u32 ret; 637 --- 49 unchanged lines hidden (view full) --- 687 break; 688 689 dout("%s con %p discarding msg %p seq %llu\n", __func__, con, 690 msg, seq); 691 ceph_msg_remove(msg); 692 } 693} 694 |
808static void con_out_kvec_reset(struct ceph_connection *con) 809{ 810 BUG_ON(con->out_skip); 811 812 con->out_kvec_left = 0; 813 con->out_kvec_bytes = 0; 814 con->out_kvec_cur = &con->out_kvec[0]; 815} 816 817static void con_out_kvec_add(struct ceph_connection *con, 818 size_t size, void *data) 819{ 820 int index = con->out_kvec_left; 821 822 BUG_ON(con->out_skip); 823 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); 824 825 con->out_kvec[index].iov_len = size; 826 con->out_kvec[index].iov_base = data; 827 con->out_kvec_left++; 828 con->out_kvec_bytes += size; 829} 830 831/* 832 * Chop off a kvec from the end. Return residual number of bytes for 833 * that kvec, i.e. how many bytes would have been written if the kvec 834 * hadn't been nuked. 835 */ 836static int con_out_kvec_skip(struct ceph_connection *con) 837{ 838 int off = con->out_kvec_cur - con->out_kvec; 839 int skip = 0; 840 841 if (con->out_kvec_bytes > 0) { 842 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; 843 BUG_ON(con->out_kvec_bytes < skip); 844 BUG_ON(!con->out_kvec_left); 845 con->out_kvec_bytes -= skip; 846 con->out_kvec_left--; 847 } 848 849 return skip; 850} 851 | |
852#ifdef CONFIG_BLOCK 853 854/* 855 * For a bio data item, a piece is whatever remains of the next 856 * entry in the current bio iovec, or the first entry in the next 857 * bio in the list. 858 */ 859static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor, --- 395 unchanged lines hidden (view full) --- 1255 WARN_ON(!cursor->last_piece); 1256 cursor->data++; 1257 __ceph_msg_data_cursor_init(cursor); 1258 new_piece = true; 1259 } 1260 cursor->need_crc = new_piece; 1261} 1262 | 695#ifdef CONFIG_BLOCK 696 697/* 698 * For a bio data item, a piece is whatever remains of the next 699 * entry in the current bio iovec, or the first entry in the next 700 * bio in the list. 701 */ 702static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor, --- 395 unchanged lines hidden (view full) --- 1098 WARN_ON(!cursor->last_piece); 1099 cursor->data++; 1100 __ceph_msg_data_cursor_init(cursor); 1101 new_piece = true; 1102 } 1103 cursor->need_crc = new_piece; 1104} 1105 |
1263static size_t sizeof_footer(struct ceph_connection *con) 1264{ 1265 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? 1266 sizeof(struct ceph_msg_footer) : 1267 sizeof(struct ceph_msg_footer_old); 1268} 1269 1270static void prepare_message_data(struct ceph_msg *msg, u32 data_len) 1271{ 1272 /* Initialize data cursor */ 1273 1274 ceph_msg_data_cursor_init(&msg->cursor, msg, data_len); 1275} 1276 1277/* 1278 * Prepare footer for currently outgoing message, and finish things 1279 * off. Assumes out_kvec* are already valid.. we just add on to the end. 1280 */ 1281static void prepare_write_message_footer(struct ceph_connection *con) 1282{ 1283 struct ceph_msg *m = con->out_msg; 1284 1285 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; 1286 1287 dout("prepare_write_message_footer %p\n", con); 1288 con_out_kvec_add(con, sizeof_footer(con), &m->footer); 1289 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { 1290 if (con->ops->sign_message) 1291 con->ops->sign_message(m); 1292 else 1293 m->footer.sig = 0; 1294 } else { 1295 m->old_footer.flags = m->footer.flags; 1296 } 1297 con->out_more = m->more_to_follow; 1298 con->out_msg_done = true; 1299} 1300 1301/* 1302 * Prepare headers for the next outgoing message. 1303 */ 1304static void prepare_write_message(struct ceph_connection *con) 1305{ 1306 struct ceph_msg *m; 1307 u32 crc; 1308 1309 con_out_kvec_reset(con); 1310 con->out_msg_done = false; 1311 1312 /* Sneak an ack in there first? If we can get it into the same 1313 * TCP packet that's a good thing. */ 1314 if (con->in_seq > con->in_seq_acked) { 1315 con->in_seq_acked = con->in_seq; 1316 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 1317 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 1318 con_out_kvec_add(con, sizeof (con->out_temp_ack), 1319 &con->out_temp_ack); 1320 } 1321 1322 ceph_con_get_out_msg(con); 1323 m = con->out_msg; 1324 1325 dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n", 1326 m, con->out_seq, le16_to_cpu(m->hdr.type), 1327 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 1328 m->data_length); 1329 WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len)); 1330 WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len)); 1331 1332 /* tag + hdr + front + middle */ 1333 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 1334 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); 1335 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 1336 1337 if (m->middle) 1338 con_out_kvec_add(con, m->middle->vec.iov_len, 1339 m->middle->vec.iov_base); 1340 1341 /* fill in hdr crc and finalize hdr */ 1342 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 1343 con->out_msg->hdr.crc = cpu_to_le32(crc); 1344 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); 1345 1346 /* fill in front and middle crc, footer */ 1347 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 1348 con->out_msg->footer.front_crc = cpu_to_le32(crc); 1349 if (m->middle) { 1350 crc = crc32c(0, m->middle->vec.iov_base, 1351 m->middle->vec.iov_len); 1352 con->out_msg->footer.middle_crc = cpu_to_le32(crc); 1353 } else 1354 con->out_msg->footer.middle_crc = 0; 1355 dout("%s front_crc %u middle_crc %u\n", __func__, 1356 le32_to_cpu(con->out_msg->footer.front_crc), 1357 le32_to_cpu(con->out_msg->footer.middle_crc)); 1358 con->out_msg->footer.flags = 0; 1359 1360 /* is there a data payload? */ 1361 con->out_msg->footer.data_crc = 0; 1362 if (m->data_length) { 1363 prepare_message_data(con->out_msg, m->data_length); 1364 con->out_more = 1; /* data + footer will follow */ 1365 } else { 1366 /* no, queue up footer too and be done */ 1367 prepare_write_message_footer(con); 1368 } 1369 1370 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1371} 1372 1373/* 1374 * Prepare an ack. 1375 */ 1376static void prepare_write_ack(struct ceph_connection *con) 1377{ 1378 dout("prepare_write_ack %p %llu -> %llu\n", con, 1379 con->in_seq_acked, con->in_seq); 1380 con->in_seq_acked = con->in_seq; 1381 1382 con_out_kvec_reset(con); 1383 1384 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 1385 1386 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 1387 con_out_kvec_add(con, sizeof (con->out_temp_ack), 1388 &con->out_temp_ack); 1389 1390 con->out_more = 1; /* more will follow.. eventually.. */ 1391 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1392} 1393 1394/* 1395 * Prepare to share the seq during handshake 1396 */ 1397static void prepare_write_seq(struct ceph_connection *con) 1398{ 1399 dout("prepare_write_seq %p %llu -> %llu\n", con, 1400 con->in_seq_acked, con->in_seq); 1401 con->in_seq_acked = con->in_seq; 1402 1403 con_out_kvec_reset(con); 1404 1405 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 1406 con_out_kvec_add(con, sizeof (con->out_temp_ack), 1407 &con->out_temp_ack); 1408 1409 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1410} 1411 1412/* 1413 * Prepare to write keepalive byte. 1414 */ 1415static void prepare_write_keepalive(struct ceph_connection *con) 1416{ 1417 dout("prepare_write_keepalive %p\n", con); 1418 con_out_kvec_reset(con); 1419 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { 1420 struct timespec64 now; 1421 1422 ktime_get_real_ts64(&now); 1423 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); 1424 ceph_encode_timespec64(&con->out_temp_keepalive2, &now); 1425 con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), 1426 &con->out_temp_keepalive2); 1427 } else { 1428 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); 1429 } 1430 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1431} 1432 1433/* 1434 * Connection negotiation. 1435 */ 1436 1437static int get_connect_authorizer(struct ceph_connection *con) 1438{ 1439 struct ceph_auth_handshake *auth; 1440 int auth_proto; 1441 1442 if (!con->ops->get_authorizer) { 1443 con->auth = NULL; 1444 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 1445 con->out_connect.authorizer_len = 0; 1446 return 0; 1447 } 1448 1449 auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry); 1450 if (IS_ERR(auth)) 1451 return PTR_ERR(auth); 1452 1453 con->auth = auth; 1454 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); 1455 con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len); 1456 return 0; 1457} 1458 1459/* 1460 * We connected to a peer and are saying hello. 1461 */ 1462static void prepare_write_banner(struct ceph_connection *con) 1463{ 1464 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 1465 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), 1466 &con->msgr->my_enc_addr); 1467 1468 con->out_more = 0; 1469 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1470} 1471 1472static void __prepare_write_connect(struct ceph_connection *con) 1473{ 1474 con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect); 1475 if (con->auth) 1476 con_out_kvec_add(con, con->auth->authorizer_buf_len, 1477 con->auth->authorizer_buf); 1478 1479 con->out_more = 0; 1480 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 1481} 1482 1483static int prepare_write_connect(struct ceph_connection *con) 1484{ 1485 unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); 1486 int proto; 1487 int ret; 1488 1489 switch (con->peer_name.type) { 1490 case CEPH_ENTITY_TYPE_MON: 1491 proto = CEPH_MONC_PROTOCOL; 1492 break; 1493 case CEPH_ENTITY_TYPE_OSD: 1494 proto = CEPH_OSDC_PROTOCOL; 1495 break; 1496 case CEPH_ENTITY_TYPE_MDS: 1497 proto = CEPH_MDSC_PROTOCOL; 1498 break; 1499 default: 1500 BUG(); 1501 } 1502 1503 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 1504 con->connect_seq, global_seq, proto); 1505 1506 con->out_connect.features = 1507 cpu_to_le64(from_msgr(con->msgr)->supported_features); 1508 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 1509 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 1510 con->out_connect.global_seq = cpu_to_le32(global_seq); 1511 con->out_connect.protocol_version = cpu_to_le32(proto); 1512 con->out_connect.flags = 0; 1513 1514 ret = get_connect_authorizer(con); 1515 if (ret) 1516 return ret; 1517 1518 __prepare_write_connect(con); 1519 return 0; 1520} 1521 1522/* 1523 * write as much of pending kvecs to the socket as we can. 1524 * 1 -> done 1525 * 0 -> socket full, but more to do 1526 * <0 -> error 1527 */ 1528static int write_partial_kvec(struct ceph_connection *con) 1529{ 1530 int ret; 1531 1532 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 1533 while (con->out_kvec_bytes > 0) { 1534 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 1535 con->out_kvec_left, con->out_kvec_bytes, 1536 con->out_more); 1537 if (ret <= 0) 1538 goto out; 1539 con->out_kvec_bytes -= ret; 1540 if (con->out_kvec_bytes == 0) 1541 break; /* done */ 1542 1543 /* account for full iov entries consumed */ 1544 while (ret >= con->out_kvec_cur->iov_len) { 1545 BUG_ON(!con->out_kvec_left); 1546 ret -= con->out_kvec_cur->iov_len; 1547 con->out_kvec_cur++; 1548 con->out_kvec_left--; 1549 } 1550 /* and for a partially-consumed entry */ 1551 if (ret) { 1552 con->out_kvec_cur->iov_len -= ret; 1553 con->out_kvec_cur->iov_base += ret; 1554 } 1555 } 1556 con->out_kvec_left = 0; 1557 ret = 1; 1558out: 1559 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 1560 con->out_kvec_bytes, con->out_kvec_left, ret); 1561 return ret; /* done! */ 1562} 1563 | |
1564u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, 1565 unsigned int length) 1566{ 1567 char *kaddr; 1568 1569 kaddr = kmap(page); 1570 BUG_ON(kaddr == NULL); 1571 crc = crc32c(crc, kaddr + page_offset, length); 1572 kunmap(page); 1573 1574 return crc; 1575} | 1106u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, 1107 unsigned int length) 1108{ 1109 char *kaddr; 1110 1111 kaddr = kmap(page); 1112 BUG_ON(kaddr == NULL); 1113 crc = crc32c(crc, kaddr + page_offset, length); 1114 kunmap(page); 1115 1116 return crc; 1117} |
1576/* 1577 * Write as much message data payload as we can. If we finish, queue 1578 * up the footer. 1579 * 1 -> done, footer is now queued in out_kvec[]. 1580 * 0 -> socket full, but more to do 1581 * <0 -> error 1582 */ 1583static int write_partial_message_data(struct ceph_connection *con) 1584{ 1585 struct ceph_msg *msg = con->out_msg; 1586 struct ceph_msg_data_cursor *cursor = &msg->cursor; 1587 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 1588 int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; 1589 u32 crc; | |
1590 | 1118 |
1591 dout("%s %p msg %p\n", __func__, con, msg); 1592 1593 if (!msg->num_data_items) 1594 return -EINVAL; 1595 1596 /* 1597 * Iterate through each page that contains data to be 1598 * written, and send as much as possible for each. 1599 * 1600 * If we are calculating the data crc (the default), we will 1601 * need to map the page. If we have no pages, they have 1602 * been revoked, so use the zero page. 1603 */ 1604 crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; 1605 while (cursor->total_resid) { 1606 struct page *page; 1607 size_t page_offset; 1608 size_t length; 1609 int ret; 1610 1611 if (!cursor->resid) { 1612 ceph_msg_data_advance(cursor, 0); 1613 continue; 1614 } 1615 1616 page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); 1617 if (length == cursor->total_resid) 1618 more = MSG_MORE; 1619 ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, 1620 more); 1621 if (ret <= 0) { 1622 if (do_datacrc) 1623 msg->footer.data_crc = cpu_to_le32(crc); 1624 1625 return ret; 1626 } 1627 if (do_datacrc && cursor->need_crc) 1628 crc = ceph_crc32c_page(crc, page, page_offset, length); 1629 ceph_msg_data_advance(cursor, (size_t)ret); 1630 } 1631 1632 dout("%s %p msg %p done\n", __func__, con, msg); 1633 1634 /* prepare and queue up footer, too */ 1635 if (do_datacrc) 1636 msg->footer.data_crc = cpu_to_le32(crc); 1637 else 1638 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 1639 con_out_kvec_reset(con); 1640 prepare_write_message_footer(con); 1641 1642 return 1; /* must return > 0 to indicate success */ 1643} 1644 1645/* 1646 * write some zeros 1647 */ 1648static int write_partial_skip(struct ceph_connection *con) 1649{ 1650 int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; 1651 int ret; 1652 1653 dout("%s %p %d left\n", __func__, con, con->out_skip); 1654 while (con->out_skip > 0) { 1655 size_t size = min(con->out_skip, (int) PAGE_SIZE); 1656 1657 if (size == con->out_skip) 1658 more = MSG_MORE; 1659 ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, 1660 more); 1661 if (ret <= 0) 1662 goto out; 1663 con->out_skip -= ret; 1664 } 1665 ret = 1; 1666out: 1667 return ret; 1668} 1669 1670/* 1671 * Prepare to read connection handshake, or an ack. 1672 */ 1673static void prepare_read_banner(struct ceph_connection *con) 1674{ 1675 dout("prepare_read_banner %p\n", con); 1676 con->in_base_pos = 0; 1677} 1678 1679static void prepare_read_connect(struct ceph_connection *con) 1680{ 1681 dout("prepare_read_connect %p\n", con); 1682 con->in_base_pos = 0; 1683} 1684 1685static void prepare_read_ack(struct ceph_connection *con) 1686{ 1687 dout("prepare_read_ack %p\n", con); 1688 con->in_base_pos = 0; 1689} 1690 1691static void prepare_read_seq(struct ceph_connection *con) 1692{ 1693 dout("prepare_read_seq %p\n", con); 1694 con->in_base_pos = 0; 1695 con->in_tag = CEPH_MSGR_TAG_SEQ; 1696} 1697 1698static void prepare_read_tag(struct ceph_connection *con) 1699{ 1700 dout("prepare_read_tag %p\n", con); 1701 con->in_base_pos = 0; 1702 con->in_tag = CEPH_MSGR_TAG_READY; 1703} 1704 1705static void prepare_read_keepalive_ack(struct ceph_connection *con) 1706{ 1707 dout("prepare_read_keepalive_ack %p\n", con); 1708 con->in_base_pos = 0; 1709} 1710 1711/* 1712 * Prepare to read a message. 1713 */ 1714static int prepare_read_message(struct ceph_connection *con) 1715{ 1716 dout("prepare_read_message %p\n", con); 1717 BUG_ON(con->in_msg != NULL); 1718 con->in_base_pos = 0; 1719 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 1720 return 0; 1721} 1722 1723 1724static int read_partial(struct ceph_connection *con, 1725 int end, int size, void *object) 1726{ 1727 while (con->in_base_pos < end) { 1728 int left = end - con->in_base_pos; 1729 int have = size - left; 1730 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 1731 if (ret <= 0) 1732 return ret; 1733 con->in_base_pos += ret; 1734 } 1735 return 1; 1736} 1737 1738 1739/* 1740 * Read all or part of the connect-side handshake on a new connection 1741 */ 1742static int read_partial_banner(struct ceph_connection *con) 1743{ 1744 int size; 1745 int end; 1746 int ret; 1747 1748 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 1749 1750 /* peer's banner */ 1751 size = strlen(CEPH_BANNER); 1752 end = size; 1753 ret = read_partial(con, end, size, con->in_banner); 1754 if (ret <= 0) 1755 goto out; 1756 1757 size = sizeof (con->actual_peer_addr); 1758 end += size; 1759 ret = read_partial(con, end, size, &con->actual_peer_addr); 1760 if (ret <= 0) 1761 goto out; 1762 ceph_decode_banner_addr(&con->actual_peer_addr); 1763 1764 size = sizeof (con->peer_addr_for_me); 1765 end += size; 1766 ret = read_partial(con, end, size, &con->peer_addr_for_me); 1767 if (ret <= 0) 1768 goto out; 1769 ceph_decode_banner_addr(&con->peer_addr_for_me); 1770 1771out: 1772 return ret; 1773} 1774 1775static int read_partial_connect(struct ceph_connection *con) 1776{ 1777 int size; 1778 int end; 1779 int ret; 1780 1781 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 1782 1783 size = sizeof (con->in_reply); 1784 end = size; 1785 ret = read_partial(con, end, size, &con->in_reply); 1786 if (ret <= 0) 1787 goto out; 1788 1789 if (con->auth) { 1790 size = le32_to_cpu(con->in_reply.authorizer_len); 1791 if (size > con->auth->authorizer_reply_buf_len) { 1792 pr_err("authorizer reply too big: %d > %zu\n", size, 1793 con->auth->authorizer_reply_buf_len); 1794 ret = -EINVAL; 1795 goto out; 1796 } 1797 1798 end += size; 1799 ret = read_partial(con, end, size, 1800 con->auth->authorizer_reply_buf); 1801 if (ret <= 0) 1802 goto out; 1803 } 1804 1805 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 1806 con, (int)con->in_reply.tag, 1807 le32_to_cpu(con->in_reply.connect_seq), 1808 le32_to_cpu(con->in_reply.global_seq)); 1809out: 1810 return ret; 1811} 1812 1813/* 1814 * Verify the hello banner looks okay. 1815 */ 1816static int verify_hello(struct ceph_connection *con) 1817{ 1818 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 1819 pr_err("connect to %s got bad banner\n", 1820 ceph_pr_addr(&con->peer_addr)); 1821 con->error_msg = "protocol error, bad banner"; 1822 return -1; 1823 } 1824 return 0; 1825} 1826 | |
1827bool ceph_addr_is_blank(const struct ceph_entity_addr *addr) 1828{ 1829 struct sockaddr_storage ss = addr->in_addr; /* align */ 1830 struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr; 1831 struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr; 1832 1833 switch (ss.ss_family) { 1834 case AF_INET: --- 192 unchanged lines hidden (view full) --- 2027 if (count) 2028 *count = i + 1; 2029 return 0; 2030 2031bad: 2032 return ret; 2033} 2034 | 1119bool ceph_addr_is_blank(const struct ceph_entity_addr *addr) 1120{ 1121 struct sockaddr_storage ss = addr->in_addr; /* align */ 1122 struct in_addr *addr4 = &((struct sockaddr_in *)&ss)->sin_addr; 1123 struct in6_addr *addr6 = &((struct sockaddr_in6 *)&ss)->sin6_addr; 1124 1125 switch (ss.ss_family) { 1126 case AF_INET: --- 192 unchanged lines hidden (view full) --- 1319 if (count) 1320 *count = i + 1; 1321 return 0; 1322 1323bad: 1324 return ret; 1325} 1326 |
2035static int process_banner(struct ceph_connection *con) 2036{ 2037 struct ceph_entity_addr *my_addr = &con->msgr->inst.addr; 2038 2039 dout("process_banner on %p\n", con); 2040 2041 if (verify_hello(con) < 0) 2042 return -1; 2043 2044 /* 2045 * Make sure the other end is who we wanted. note that the other 2046 * end may not yet know their ip address, so if it's 0.0.0.0, give 2047 * them the benefit of the doubt. 2048 */ 2049 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 2050 sizeof(con->peer_addr)) != 0 && 2051 !(ceph_addr_is_blank(&con->actual_peer_addr) && 2052 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 2053 pr_warn("wrong peer, want %s/%u, got %s/%u\n", 2054 ceph_pr_addr(&con->peer_addr), 2055 le32_to_cpu(con->peer_addr.nonce), 2056 ceph_pr_addr(&con->actual_peer_addr), 2057 le32_to_cpu(con->actual_peer_addr.nonce)); 2058 con->error_msg = "wrong peer at address"; 2059 return -1; 2060 } 2061 2062 /* 2063 * did we learn our address? 2064 */ 2065 if (ceph_addr_is_blank(my_addr)) { 2066 memcpy(&my_addr->in_addr, 2067 &con->peer_addr_for_me.in_addr, 2068 sizeof(con->peer_addr_for_me.in_addr)); 2069 ceph_addr_set_port(my_addr, 0); 2070 ceph_encode_my_addr(con->msgr); 2071 dout("process_banner learned my addr is %s\n", 2072 ceph_pr_addr(my_addr)); 2073 } 2074 2075 return 0; 2076} 2077 2078static int process_connect(struct ceph_connection *con) 2079{ 2080 u64 sup_feat = from_msgr(con->msgr)->supported_features; 2081 u64 req_feat = from_msgr(con->msgr)->required_features; 2082 u64 server_feat = le64_to_cpu(con->in_reply.features); 2083 int ret; 2084 2085 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 2086 2087 if (con->auth) { 2088 int len = le32_to_cpu(con->in_reply.authorizer_len); 2089 2090 /* 2091 * Any connection that defines ->get_authorizer() 2092 * should also define ->add_authorizer_challenge() and 2093 * ->verify_authorizer_reply(). 2094 * 2095 * See get_connect_authorizer(). 2096 */ 2097 if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { 2098 ret = con->ops->add_authorizer_challenge( 2099 con, con->auth->authorizer_reply_buf, len); 2100 if (ret < 0) 2101 return ret; 2102 2103 con_out_kvec_reset(con); 2104 __prepare_write_connect(con); 2105 prepare_read_connect(con); 2106 return 0; 2107 } 2108 2109 if (len) { 2110 ret = con->ops->verify_authorizer_reply(con); 2111 if (ret < 0) { 2112 con->error_msg = "bad authorize reply"; 2113 return ret; 2114 } 2115 } 2116 } 2117 2118 switch (con->in_reply.tag) { 2119 case CEPH_MSGR_TAG_FEATURES: 2120 pr_err("%s%lld %s feature set mismatch," 2121 " my %llx < server's %llx, missing %llx\n", 2122 ENTITY_NAME(con->peer_name), 2123 ceph_pr_addr(&con->peer_addr), 2124 sup_feat, server_feat, server_feat & ~sup_feat); 2125 con->error_msg = "missing required protocol features"; 2126 return -1; 2127 2128 case CEPH_MSGR_TAG_BADPROTOVER: 2129 pr_err("%s%lld %s protocol version mismatch," 2130 " my %d != server's %d\n", 2131 ENTITY_NAME(con->peer_name), 2132 ceph_pr_addr(&con->peer_addr), 2133 le32_to_cpu(con->out_connect.protocol_version), 2134 le32_to_cpu(con->in_reply.protocol_version)); 2135 con->error_msg = "protocol version mismatch"; 2136 return -1; 2137 2138 case CEPH_MSGR_TAG_BADAUTHORIZER: 2139 con->auth_retry++; 2140 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 2141 con->auth_retry); 2142 if (con->auth_retry == 2) { 2143 con->error_msg = "connect authorization failure"; 2144 return -1; 2145 } 2146 con_out_kvec_reset(con); 2147 ret = prepare_write_connect(con); 2148 if (ret < 0) 2149 return ret; 2150 prepare_read_connect(con); 2151 break; 2152 2153 case CEPH_MSGR_TAG_RESETSESSION: 2154 /* 2155 * If we connected with a large connect_seq but the peer 2156 * has no record of a session with us (no connection, or 2157 * connect_seq == 0), they will send RESETSESION to indicate 2158 * that they must have reset their session, and may have 2159 * dropped messages. 2160 */ 2161 dout("process_connect got RESET peer seq %u\n", 2162 le32_to_cpu(con->in_reply.connect_seq)); 2163 pr_info("%s%lld %s session reset\n", 2164 ENTITY_NAME(con->peer_name), 2165 ceph_pr_addr(&con->peer_addr)); 2166 ceph_con_reset_session(con); 2167 con_out_kvec_reset(con); 2168 ret = prepare_write_connect(con); 2169 if (ret < 0) 2170 return ret; 2171 prepare_read_connect(con); 2172 2173 /* Tell ceph about it. */ 2174 mutex_unlock(&con->mutex); 2175 if (con->ops->peer_reset) 2176 con->ops->peer_reset(con); 2177 mutex_lock(&con->mutex); 2178 if (con->state != CEPH_CON_S_V1_CONNECT_MSG) 2179 return -EAGAIN; 2180 break; 2181 2182 case CEPH_MSGR_TAG_RETRY_SESSION: 2183 /* 2184 * If we sent a smaller connect_seq than the peer has, try 2185 * again with a larger value. 2186 */ 2187 dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", 2188 le32_to_cpu(con->out_connect.connect_seq), 2189 le32_to_cpu(con->in_reply.connect_seq)); 2190 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); 2191 con_out_kvec_reset(con); 2192 ret = prepare_write_connect(con); 2193 if (ret < 0) 2194 return ret; 2195 prepare_read_connect(con); 2196 break; 2197 2198 case CEPH_MSGR_TAG_RETRY_GLOBAL: 2199 /* 2200 * If we sent a smaller global_seq than the peer has, try 2201 * again with a larger value. 2202 */ 2203 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 2204 con->peer_global_seq, 2205 le32_to_cpu(con->in_reply.global_seq)); 2206 ceph_get_global_seq(con->msgr, 2207 le32_to_cpu(con->in_reply.global_seq)); 2208 con_out_kvec_reset(con); 2209 ret = prepare_write_connect(con); 2210 if (ret < 0) 2211 return ret; 2212 prepare_read_connect(con); 2213 break; 2214 2215 case CEPH_MSGR_TAG_SEQ: 2216 case CEPH_MSGR_TAG_READY: 2217 if (req_feat & ~server_feat) { 2218 pr_err("%s%lld %s protocol feature mismatch," 2219 " my required %llx > server's %llx, need %llx\n", 2220 ENTITY_NAME(con->peer_name), 2221 ceph_pr_addr(&con->peer_addr), 2222 req_feat, server_feat, req_feat & ~server_feat); 2223 con->error_msg = "missing required protocol features"; 2224 return -1; 2225 } 2226 2227 WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG); 2228 con->state = CEPH_CON_S_OPEN; 2229 con->auth_retry = 0; /* we authenticated; clear flag */ 2230 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 2231 con->connect_seq++; 2232 con->peer_features = server_feat; 2233 dout("process_connect got READY gseq %d cseq %d (%d)\n", 2234 con->peer_global_seq, 2235 le32_to_cpu(con->in_reply.connect_seq), 2236 con->connect_seq); 2237 WARN_ON(con->connect_seq != 2238 le32_to_cpu(con->in_reply.connect_seq)); 2239 2240 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 2241 ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); 2242 2243 con->delay = 0; /* reset backoff memory */ 2244 2245 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { 2246 prepare_write_seq(con); 2247 prepare_read_seq(con); 2248 } else { 2249 prepare_read_tag(con); 2250 } 2251 break; 2252 2253 case CEPH_MSGR_TAG_WAIT: 2254 /* 2255 * If there is a connection race (we are opening 2256 * connections to each other), one of us may just have 2257 * to WAIT. This shouldn't happen if we are the 2258 * client. 2259 */ 2260 con->error_msg = "protocol error, got WAIT as client"; 2261 return -1; 2262 2263 default: 2264 con->error_msg = "protocol error, garbage tag during connect"; 2265 return -1; 2266 } 2267 return 0; 2268} 2269 2270 | |
2271/* | 1327/* |
2272 * read (part of) an ack 2273 */ 2274static int read_partial_ack(struct ceph_connection *con) 2275{ 2276 int size = sizeof (con->in_temp_ack); 2277 int end = size; 2278 2279 return read_partial(con, end, size, &con->in_temp_ack); 2280} 2281 2282/* 2283 * We can finally discard anything that's been acked. 2284 */ 2285static void process_ack(struct ceph_connection *con) 2286{ 2287 u64 ack = le64_to_cpu(con->in_temp_ack); 2288 2289 if (con->in_tag == CEPH_MSGR_TAG_ACK) 2290 ceph_con_discard_sent(con, ack); 2291 else 2292 ceph_con_discard_requeued(con, ack); 2293 2294 prepare_read_tag(con); 2295} 2296 2297 2298static int read_partial_message_section(struct ceph_connection *con, 2299 struct kvec *section, 2300 unsigned int sec_len, u32 *crc) 2301{ 2302 int ret, left; 2303 2304 BUG_ON(!section); 2305 2306 while (section->iov_len < sec_len) { 2307 BUG_ON(section->iov_base == NULL); 2308 left = sec_len - section->iov_len; 2309 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 2310 section->iov_len, left); 2311 if (ret <= 0) 2312 return ret; 2313 section->iov_len += ret; 2314 } 2315 if (section->iov_len == sec_len) 2316 *crc = crc32c(0, section->iov_base, section->iov_len); 2317 2318 return 1; 2319} 2320 2321static int read_partial_msg_data(struct ceph_connection *con) 2322{ 2323 struct ceph_msg *msg = con->in_msg; 2324 struct ceph_msg_data_cursor *cursor = &msg->cursor; 2325 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 2326 struct page *page; 2327 size_t page_offset; 2328 size_t length; 2329 u32 crc = 0; 2330 int ret; 2331 2332 if (!msg->num_data_items) 2333 return -EIO; 2334 2335 if (do_datacrc) 2336 crc = con->in_data_crc; 2337 while (cursor->total_resid) { 2338 if (!cursor->resid) { 2339 ceph_msg_data_advance(cursor, 0); 2340 continue; 2341 } 2342 2343 page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); 2344 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); 2345 if (ret <= 0) { 2346 if (do_datacrc) 2347 con->in_data_crc = crc; 2348 2349 return ret; 2350 } 2351 2352 if (do_datacrc) 2353 crc = ceph_crc32c_page(crc, page, page_offset, ret); 2354 ceph_msg_data_advance(cursor, (size_t)ret); 2355 } 2356 if (do_datacrc) 2357 con->in_data_crc = crc; 2358 2359 return 1; /* must return > 0 to indicate success */ 2360} 2361 2362/* 2363 * read (part of) a message. 2364 */ 2365static int read_partial_message(struct ceph_connection *con) 2366{ 2367 struct ceph_msg *m = con->in_msg; 2368 int size; 2369 int end; 2370 int ret; 2371 unsigned int front_len, middle_len, data_len; 2372 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 2373 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); 2374 u64 seq; 2375 u32 crc; 2376 2377 dout("read_partial_message con %p msg %p\n", con, m); 2378 2379 /* header */ 2380 size = sizeof (con->in_hdr); 2381 end = size; 2382 ret = read_partial(con, end, size, &con->in_hdr); 2383 if (ret <= 0) 2384 return ret; 2385 2386 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); 2387 if (cpu_to_le32(crc) != con->in_hdr.crc) { 2388 pr_err("read_partial_message bad hdr crc %u != expected %u\n", 2389 crc, con->in_hdr.crc); 2390 return -EBADMSG; 2391 } 2392 2393 front_len = le32_to_cpu(con->in_hdr.front_len); 2394 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 2395 return -EIO; 2396 middle_len = le32_to_cpu(con->in_hdr.middle_len); 2397 if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN) 2398 return -EIO; 2399 data_len = le32_to_cpu(con->in_hdr.data_len); 2400 if (data_len > CEPH_MSG_MAX_DATA_LEN) 2401 return -EIO; 2402 2403 /* verify seq# */ 2404 seq = le64_to_cpu(con->in_hdr.seq); 2405 if ((s64)seq - (s64)con->in_seq < 1) { 2406 pr_info("skipping %s%lld %s seq %lld expected %lld\n", 2407 ENTITY_NAME(con->peer_name), 2408 ceph_pr_addr(&con->peer_addr), 2409 seq, con->in_seq + 1); 2410 con->in_base_pos = -front_len - middle_len - data_len - 2411 sizeof_footer(con); 2412 con->in_tag = CEPH_MSGR_TAG_READY; 2413 return 1; 2414 } else if ((s64)seq - (s64)con->in_seq > 1) { 2415 pr_err("read_partial_message bad seq %lld expected %lld\n", 2416 seq, con->in_seq + 1); 2417 con->error_msg = "bad message sequence # for incoming message"; 2418 return -EBADE; 2419 } 2420 2421 /* allocate message? */ 2422 if (!con->in_msg) { 2423 int skip = 0; 2424 2425 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 2426 front_len, data_len); 2427 ret = ceph_con_in_msg_alloc(con, &con->in_hdr, &skip); 2428 if (ret < 0) 2429 return ret; 2430 2431 BUG_ON(!con->in_msg ^ skip); 2432 if (skip) { 2433 /* skip this message */ 2434 dout("alloc_msg said skip message\n"); 2435 con->in_base_pos = -front_len - middle_len - data_len - 2436 sizeof_footer(con); 2437 con->in_tag = CEPH_MSGR_TAG_READY; 2438 con->in_seq++; 2439 return 1; 2440 } 2441 2442 BUG_ON(!con->in_msg); 2443 BUG_ON(con->in_msg->con != con); 2444 m = con->in_msg; 2445 m->front.iov_len = 0; /* haven't read it yet */ 2446 if (m->middle) 2447 m->middle->vec.iov_len = 0; 2448 2449 /* prepare for data payload, if any */ 2450 2451 if (data_len) 2452 prepare_message_data(con->in_msg, data_len); 2453 } 2454 2455 /* front */ 2456 ret = read_partial_message_section(con, &m->front, front_len, 2457 &con->in_front_crc); 2458 if (ret <= 0) 2459 return ret; 2460 2461 /* middle */ 2462 if (m->middle) { 2463 ret = read_partial_message_section(con, &m->middle->vec, 2464 middle_len, 2465 &con->in_middle_crc); 2466 if (ret <= 0) 2467 return ret; 2468 } 2469 2470 /* (page) data */ 2471 if (data_len) { 2472 ret = read_partial_msg_data(con); 2473 if (ret <= 0) 2474 return ret; 2475 } 2476 2477 /* footer */ 2478 size = sizeof_footer(con); 2479 end += size; 2480 ret = read_partial(con, end, size, &m->footer); 2481 if (ret <= 0) 2482 return ret; 2483 2484 if (!need_sign) { 2485 m->footer.flags = m->old_footer.flags; 2486 m->footer.sig = 0; 2487 } 2488 2489 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 2490 m, front_len, m->footer.front_crc, middle_len, 2491 m->footer.middle_crc, data_len, m->footer.data_crc); 2492 2493 /* crc ok? */ 2494 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 2495 pr_err("read_partial_message %p front crc %u != exp. %u\n", 2496 m, con->in_front_crc, m->footer.front_crc); 2497 return -EBADMSG; 2498 } 2499 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 2500 pr_err("read_partial_message %p middle crc %u != exp %u\n", 2501 m, con->in_middle_crc, m->footer.middle_crc); 2502 return -EBADMSG; 2503 } 2504 if (do_datacrc && 2505 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 2506 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 2507 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 2508 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 2509 return -EBADMSG; 2510 } 2511 2512 if (need_sign && con->ops->check_message_signature && 2513 con->ops->check_message_signature(m)) { 2514 pr_err("read_partial_message %p signature check failed\n", m); 2515 return -EBADMSG; 2516 } 2517 2518 return 1; /* done! */ 2519} 2520 2521/* | |
2522 * Process message. This happens in the worker thread. The callback should 2523 * be careful not to do anything that waits on other incoming messages or it 2524 * may deadlock. 2525 */ 2526void ceph_con_process_message(struct ceph_connection *con) 2527{ 2528 struct ceph_msg *msg = con->in_msg; 2529 --- 16 unchanged lines hidden (view full) --- 2546 le32_to_cpu(msg->hdr.middle_len), 2547 le32_to_cpu(msg->hdr.data_len), 2548 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 2549 con->ops->dispatch(con, msg); 2550 2551 mutex_lock(&con->mutex); 2552} 2553 | 1328 * Process message. This happens in the worker thread. The callback should 1329 * be careful not to do anything that waits on other incoming messages or it 1330 * may deadlock. 1331 */ 1332void ceph_con_process_message(struct ceph_connection *con) 1333{ 1334 struct ceph_msg *msg = con->in_msg; 1335 --- 16 unchanged lines hidden (view full) --- 1352 le32_to_cpu(msg->hdr.middle_len), 1353 le32_to_cpu(msg->hdr.data_len), 1354 con->in_front_crc, con->in_middle_crc, con->in_data_crc); 1355 con->ops->dispatch(con, msg); 1356 1357 mutex_lock(&con->mutex); 1358} 1359 |
2554static int read_keepalive_ack(struct ceph_connection *con) 2555{ 2556 struct ceph_timespec ceph_ts; 2557 size_t size = sizeof(ceph_ts); 2558 int ret = read_partial(con, size, size, &ceph_ts); 2559 if (ret <= 0) 2560 return ret; 2561 ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); 2562 prepare_read_tag(con); 2563 return 1; 2564} 2565 | |
2566/* | 1360/* |
2567 * Write something to the socket. Called in a worker thread when the 2568 * socket appears to be writeable and we have something ready to send. 2569 */ 2570int ceph_con_v1_try_write(struct ceph_connection *con) 2571{ 2572 int ret = 1; 2573 2574 dout("try_write start %p state %d\n", con, con->state); 2575 if (con->state != CEPH_CON_S_PREOPEN && 2576 con->state != CEPH_CON_S_V1_BANNER && 2577 con->state != CEPH_CON_S_V1_CONNECT_MSG && 2578 con->state != CEPH_CON_S_OPEN) 2579 return 0; 2580 2581 /* open the socket first? */ 2582 if (con->state == CEPH_CON_S_PREOPEN) { 2583 BUG_ON(con->sock); 2584 con->state = CEPH_CON_S_V1_BANNER; 2585 2586 con_out_kvec_reset(con); 2587 prepare_write_banner(con); 2588 prepare_read_banner(con); 2589 2590 BUG_ON(con->in_msg); 2591 con->in_tag = CEPH_MSGR_TAG_READY; 2592 dout("try_write initiating connect on %p new state %d\n", 2593 con, con->state); 2594 ret = ceph_tcp_connect(con); 2595 if (ret < 0) { 2596 con->error_msg = "connect error"; 2597 goto out; 2598 } 2599 } 2600 2601more: 2602 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 2603 BUG_ON(!con->sock); 2604 2605 /* kvec data queued? */ 2606 if (con->out_kvec_left) { 2607 ret = write_partial_kvec(con); 2608 if (ret <= 0) 2609 goto out; 2610 } 2611 if (con->out_skip) { 2612 ret = write_partial_skip(con); 2613 if (ret <= 0) 2614 goto out; 2615 } 2616 2617 /* msg pages? */ 2618 if (con->out_msg) { 2619 if (con->out_msg_done) { 2620 ceph_msg_put(con->out_msg); 2621 con->out_msg = NULL; /* we're done with this one */ 2622 goto do_next; 2623 } 2624 2625 ret = write_partial_message_data(con); 2626 if (ret == 1) 2627 goto more; /* we need to send the footer, too! */ 2628 if (ret == 0) 2629 goto out; 2630 if (ret < 0) { 2631 dout("try_write write_partial_message_data err %d\n", 2632 ret); 2633 goto out; 2634 } 2635 } 2636 2637do_next: 2638 if (con->state == CEPH_CON_S_OPEN) { 2639 if (ceph_con_flag_test_and_clear(con, 2640 CEPH_CON_F_KEEPALIVE_PENDING)) { 2641 prepare_write_keepalive(con); 2642 goto more; 2643 } 2644 /* is anything else pending? */ 2645 if (!list_empty(&con->out_queue)) { 2646 prepare_write_message(con); 2647 goto more; 2648 } 2649 if (con->in_seq > con->in_seq_acked) { 2650 prepare_write_ack(con); 2651 goto more; 2652 } 2653 } 2654 2655 /* Nothing to do! */ 2656 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); 2657 dout("try_write nothing else to write.\n"); 2658 ret = 0; 2659out: 2660 dout("try_write done on %p ret %d\n", con, ret); 2661 return ret; 2662} 2663 2664/* 2665 * Read what we can from the socket. 2666 */ 2667int ceph_con_v1_try_read(struct ceph_connection *con) 2668{ 2669 int ret = -1; 2670 2671more: 2672 dout("try_read start %p state %d\n", con, con->state); 2673 if (con->state != CEPH_CON_S_V1_BANNER && 2674 con->state != CEPH_CON_S_V1_CONNECT_MSG && 2675 con->state != CEPH_CON_S_OPEN) 2676 return 0; 2677 2678 BUG_ON(!con->sock); 2679 2680 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 2681 con->in_base_pos); 2682 2683 if (con->state == CEPH_CON_S_V1_BANNER) { 2684 ret = read_partial_banner(con); 2685 if (ret <= 0) 2686 goto out; 2687 ret = process_banner(con); 2688 if (ret < 0) 2689 goto out; 2690 2691 con->state = CEPH_CON_S_V1_CONNECT_MSG; 2692 2693 /* 2694 * Received banner is good, exchange connection info. 2695 * Do not reset out_kvec, as sending our banner raced 2696 * with receiving peer banner after connect completed. 2697 */ 2698 ret = prepare_write_connect(con); 2699 if (ret < 0) 2700 goto out; 2701 prepare_read_connect(con); 2702 2703 /* Send connection info before awaiting response */ 2704 goto out; 2705 } 2706 2707 if (con->state == CEPH_CON_S_V1_CONNECT_MSG) { 2708 ret = read_partial_connect(con); 2709 if (ret <= 0) 2710 goto out; 2711 ret = process_connect(con); 2712 if (ret < 0) 2713 goto out; 2714 goto more; 2715 } 2716 2717 WARN_ON(con->state != CEPH_CON_S_OPEN); 2718 2719 if (con->in_base_pos < 0) { 2720 /* 2721 * skipping + discarding content. 2722 */ 2723 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); 2724 if (ret <= 0) 2725 goto out; 2726 dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); 2727 con->in_base_pos += ret; 2728 if (con->in_base_pos) 2729 goto more; 2730 } 2731 if (con->in_tag == CEPH_MSGR_TAG_READY) { 2732 /* 2733 * what's next? 2734 */ 2735 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 2736 if (ret <= 0) 2737 goto out; 2738 dout("try_read got tag %d\n", (int)con->in_tag); 2739 switch (con->in_tag) { 2740 case CEPH_MSGR_TAG_MSG: 2741 prepare_read_message(con); 2742 break; 2743 case CEPH_MSGR_TAG_ACK: 2744 prepare_read_ack(con); 2745 break; 2746 case CEPH_MSGR_TAG_KEEPALIVE2_ACK: 2747 prepare_read_keepalive_ack(con); 2748 break; 2749 case CEPH_MSGR_TAG_CLOSE: 2750 ceph_con_close_socket(con); 2751 con->state = CEPH_CON_S_CLOSED; 2752 goto out; 2753 default: 2754 goto bad_tag; 2755 } 2756 } 2757 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 2758 ret = read_partial_message(con); 2759 if (ret <= 0) { 2760 switch (ret) { 2761 case -EBADMSG: 2762 con->error_msg = "bad crc/signature"; 2763 fallthrough; 2764 case -EBADE: 2765 ret = -EIO; 2766 break; 2767 case -EIO: 2768 con->error_msg = "io error"; 2769 break; 2770 } 2771 goto out; 2772 } 2773 if (con->in_tag == CEPH_MSGR_TAG_READY) 2774 goto more; 2775 ceph_con_process_message(con); 2776 if (con->state == CEPH_CON_S_OPEN) 2777 prepare_read_tag(con); 2778 goto more; 2779 } 2780 if (con->in_tag == CEPH_MSGR_TAG_ACK || 2781 con->in_tag == CEPH_MSGR_TAG_SEQ) { 2782 /* 2783 * the final handshake seq exchange is semantically 2784 * equivalent to an ACK 2785 */ 2786 ret = read_partial_ack(con); 2787 if (ret <= 0) 2788 goto out; 2789 process_ack(con); 2790 goto more; 2791 } 2792 if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { 2793 ret = read_keepalive_ack(con); 2794 if (ret <= 0) 2795 goto out; 2796 goto more; 2797 } 2798 2799out: 2800 dout("try_read done on %p ret %d\n", con, ret); 2801 return ret; 2802 2803bad_tag: 2804 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 2805 con->error_msg = "protocol error, garbage tag"; 2806 ret = -1; 2807 goto out; 2808} 2809 2810 2811/* | |
2812 * Atomically queue work on a connection after the specified delay. 2813 * Bump @con reference to avoid races with connection teardown. 2814 * Returns 0 if work was queued, or an error code otherwise. 2815 */ 2816static int queue_con_delay(struct ceph_connection *con, unsigned long delay) 2817{ 2818 if (!con->ops->get(con)) { 2819 dout("%s %p ref count 0\n", __func__, con); --- 201 unchanged lines hidden (view full) --- 3021 if (con->delay > MAX_DELAY_INTERVAL) 3022 con->delay = MAX_DELAY_INTERVAL; 3023 } 3024 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); 3025 queue_con(con); 3026 } 3027} 3028 | 1361 * Atomically queue work on a connection after the specified delay. 1362 * Bump @con reference to avoid races with connection teardown. 1363 * Returns 0 if work was queued, or an error code otherwise. 1364 */ 1365static int queue_con_delay(struct ceph_connection *con, unsigned long delay) 1366{ 1367 if (!con->ops->get(con)) { 1368 dout("%s %p ref count 0\n", __func__, con); --- 201 unchanged lines hidden (view full) --- 1570 if (con->delay > MAX_DELAY_INTERVAL) 1571 con->delay = MAX_DELAY_INTERVAL; 1572 } 1573 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); 1574 queue_con(con); 1575 } 1576} 1577 |
3029 | |
3030void ceph_messenger_reset_nonce(struct ceph_messenger *msgr) 3031{ 3032 u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000; 3033 msgr->inst.addr.nonce = cpu_to_le32(nonce); 3034 ceph_encode_my_addr(msgr); 3035} 3036 3037/* --- 88 unchanged lines hidden (view full) --- 3126 3127 /* if there wasn't anything waiting to send before, queue 3128 * new work */ 3129 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) 3130 queue_con(con); 3131} 3132EXPORT_SYMBOL(ceph_con_send); 3133 | 1578void ceph_messenger_reset_nonce(struct ceph_messenger *msgr) 1579{ 1580 u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000; 1581 msgr->inst.addr.nonce = cpu_to_le32(nonce); 1582 ceph_encode_my_addr(msgr); 1583} 1584 1585/* --- 88 unchanged lines hidden (view full) --- 1674 1675 /* if there wasn't anything waiting to send before, queue 1676 * new work */ 1677 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) 1678 queue_con(con); 1679} 1680EXPORT_SYMBOL(ceph_con_send); 1681 |
3134void ceph_con_v1_revoke(struct ceph_connection *con) 3135{ 3136 struct ceph_msg *msg = con->out_msg; 3137 3138 WARN_ON(con->out_skip); 3139 /* footer */ 3140 if (con->out_msg_done) { 3141 con->out_skip += con_out_kvec_skip(con); 3142 } else { 3143 WARN_ON(!msg->data_length); 3144 con->out_skip += sizeof_footer(con); 3145 } 3146 /* data, middle, front */ 3147 if (msg->data_length) 3148 con->out_skip += msg->cursor.total_resid; 3149 if (msg->middle) 3150 con->out_skip += con_out_kvec_skip(con); 3151 con->out_skip += con_out_kvec_skip(con); 3152 3153 dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, 3154 con->out_kvec_bytes, con->out_skip); 3155} 3156 | |
3157/* 3158 * Revoke a message that was previously queued for send 3159 */ 3160void ceph_msg_revoke(struct ceph_msg *msg) 3161{ 3162 struct ceph_connection *con = msg->con; 3163 3164 if (!con) { --- 21 unchanged lines hidden (view full) --- 3186 con->out_msg = NULL; 3187 } else { 3188 dout("%s con %p msg %p not current, out_msg %p\n", __func__, 3189 con, msg, con->out_msg); 3190 } 3191 mutex_unlock(&con->mutex); 3192} 3193 | 1682/* 1683 * Revoke a message that was previously queued for send 1684 */ 1685void ceph_msg_revoke(struct ceph_msg *msg) 1686{ 1687 struct ceph_connection *con = msg->con; 1688 1689 if (!con) { --- 21 unchanged lines hidden (view full) --- 1711 con->out_msg = NULL; 1712 } else { 1713 dout("%s con %p msg %p not current, out_msg %p\n", __func__, 1714 con, msg, con->out_msg); 1715 } 1716 mutex_unlock(&con->mutex); 1717} 1718 |
3194void ceph_con_v1_revoke_incoming(struct ceph_connection *con) 3195{ 3196 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); 3197 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); 3198 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); 3199 3200 /* skip rest of message */ 3201 con->in_base_pos = con->in_base_pos - 3202 sizeof(struct ceph_msg_header) - 3203 front_len - 3204 middle_len - 3205 data_len - 3206 sizeof(struct ceph_msg_footer); 3207 3208 con->in_tag = CEPH_MSGR_TAG_READY; 3209 con->in_seq++; 3210 3211 dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos); 3212} 3213 | |
3214/* 3215 * Revoke a message that we may be reading data into 3216 */ 3217void ceph_msg_revoke_incoming(struct ceph_msg *msg) 3218{ 3219 struct ceph_connection *con = msg->con; 3220 3221 if (!con) { --- 389 unchanged lines hidden --- | 1719/* 1720 * Revoke a message that we may be reading data into 1721 */ 1722void ceph_msg_revoke_incoming(struct ceph_msg *msg) 1723{ 1724 struct ceph_connection *con = msg->con; 1725 1726 if (!con) { --- 389 unchanged lines hidden --- |