messenger.c (a56dd9bf47220c3206f27075af8bdfb219a2a3cf) messenger.c (cd1a677cad994021b19665ed476aea63f5d54f31)
1// SPDX-License-Identifier: GPL-2.0
2#include <linux/ceph/ceph_debug.h>
3
4#include <linux/crc32c.h>
5#include <linux/ctype.h>
6#include <linux/highmem.h>
7#include <linux/inet.h>
8#include <linux/kthread.h>

--- 181 unchanged lines hidden (view full) ---

190 }
191
192 return s;
193}
194EXPORT_SYMBOL(ceph_pr_addr);
195
196void ceph_encode_my_addr(struct ceph_messenger *msgr)
197{
1// SPDX-License-Identifier: GPL-2.0
2#include <linux/ceph/ceph_debug.h>
3
4#include <linux/crc32c.h>
5#include <linux/ctype.h>
6#include <linux/highmem.h>
7#include <linux/inet.h>
8#include <linux/kthread.h>

--- 181 unchanged lines hidden (view full) ---

190 }
191
192 return s;
193}
194EXPORT_SYMBOL(ceph_pr_addr);
195
196void ceph_encode_my_addr(struct ceph_messenger *msgr)
197{
198 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr));
199 ceph_encode_banner_addr(&msgr->my_enc_addr);
198 if (!ceph_msgr2(from_msgr(msgr))) {
199 memcpy(&msgr->my_enc_addr, &msgr->inst.addr,
200 sizeof(msgr->my_enc_addr));
201 ceph_encode_banner_addr(&msgr->my_enc_addr);
202 }
200}
201
202/*
203 * work queue for all reading and writing to/from the socket.
204 */
205static struct workqueue_struct *ceph_msgr_wq;
206
207static int ceph_msgr_slab_init(void)

--- 300 unchanged lines hidden (view full) ---

508 con->in_msg = NULL;
509 }
510 if (con->out_msg) {
511 WARN_ON(con->out_msg->con != con);
512 ceph_msg_put(con->out_msg);
513 con->out_msg = NULL;
514 }
515
203}
204
205/*
206 * work queue for all reading and writing to/from the socket.
207 */
208static struct workqueue_struct *ceph_msgr_wq;
209
210static int ceph_msgr_slab_init(void)

--- 300 unchanged lines hidden (view full) ---

511 con->in_msg = NULL;
512 }
513 if (con->out_msg) {
514 WARN_ON(con->out_msg->con != con);
515 ceph_msg_put(con->out_msg);
516 con->out_msg = NULL;
517 }
518
516 ceph_con_v1_reset_protocol(con);
519 if (ceph_msgr2(from_msgr(con->msgr)))
520 ceph_con_v2_reset_protocol(con);
521 else
522 ceph_con_v1_reset_protocol(con);
517}
518
519/*
520 * Reset a connection. Discard all incoming and outgoing messages
521 * and clear *_seq state.
522 */
523static void ceph_msg_remove(struct ceph_msg *msg)
524{
525 list_del_init(&msg->list_head);
526
527 ceph_msg_put(msg);
528}
523}
524
525/*
526 * Reset a connection. Discard all incoming and outgoing messages
527 * and clear *_seq state.
528 */
529static void ceph_msg_remove(struct ceph_msg *msg)
530{
531 list_del_init(&msg->list_head);
532
533 ceph_msg_put(msg);
534}
535
529static void ceph_msg_remove_list(struct list_head *head)
530{
531 while (!list_empty(head)) {
532 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
533 list_head);
534 ceph_msg_remove(msg);
535 }
536}

--- 5 unchanged lines hidden (view full) ---

542 WARN_ON(con->in_msg);
543 WARN_ON(con->out_msg);
544 ceph_msg_remove_list(&con->out_queue);
545 ceph_msg_remove_list(&con->out_sent);
546 con->out_seq = 0;
547 con->in_seq = 0;
548 con->in_seq_acked = 0;
549
536static void ceph_msg_remove_list(struct list_head *head)
537{
538 while (!list_empty(head)) {
539 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg,
540 list_head);
541 ceph_msg_remove(msg);
542 }
543}

--- 5 unchanged lines hidden (view full) ---

549 WARN_ON(con->in_msg);
550 WARN_ON(con->out_msg);
551 ceph_msg_remove_list(&con->out_queue);
552 ceph_msg_remove_list(&con->out_sent);
553 con->out_seq = 0;
554 con->in_seq = 0;
555 con->in_seq_acked = 0;
556
550 ceph_con_v1_reset_session(con);
557 if (ceph_msgr2(from_msgr(con->msgr)))
558 ceph_con_v2_reset_session(con);
559 else
560 ceph_con_v1_reset_session(con);
551}
552
553/*
554 * mark a peer down. drop any open connections.
555 */
556void ceph_con_close(struct ceph_connection *con)
557{
558 mutex_lock(&con->mutex);

--- 36 unchanged lines hidden (view full) ---

595}
596EXPORT_SYMBOL(ceph_con_open);
597
598/*
599 * return true if this connection ever successfully opened
600 */
601bool ceph_con_opened(struct ceph_connection *con)
602{
561}
562
563/*
564 * mark a peer down. drop any open connections.
565 */
566void ceph_con_close(struct ceph_connection *con)
567{
568 mutex_lock(&con->mutex);

--- 36 unchanged lines hidden (view full) ---

605}
606EXPORT_SYMBOL(ceph_con_open);
607
608/*
609 * return true if this connection ever successfully opened
610 */
611bool ceph_con_opened(struct ceph_connection *con)
612{
613 if (ceph_msgr2(from_msgr(con->msgr)))
614 return ceph_con_v2_opened(con);
615
603 return ceph_con_v1_opened(con);
604}
605
606/*
607 * initialize a new connection.
608 */
609void ceph_con_init(struct ceph_connection *con, void *private,
610 const struct ceph_connection_operations *ops,

--- 686 unchanged lines hidden (view full) ---

1297 port = CEPH_MON_PORT;
1298 else if (port > 65535)
1299 goto bad;
1300 } else {
1301 port = CEPH_MON_PORT;
1302 }
1303
1304 ceph_addr_set_port(&addr[i], port);
616 return ceph_con_v1_opened(con);
617}
618
619/*
620 * initialize a new connection.
621 */
622void ceph_con_init(struct ceph_connection *con, void *private,
623 const struct ceph_connection_operations *ops,

--- 686 unchanged lines hidden (view full) ---

1310 port = CEPH_MON_PORT;
1311 else if (port > 65535)
1312 goto bad;
1313 } else {
1314 port = CEPH_MON_PORT;
1315 }
1316
1317 ceph_addr_set_port(&addr[i], port);
1318 /*
1319 * We want the type to be set according to ms_mode
1320 * option, but options are normally parsed after mon
1321 * addresses. Rather than complicating parsing, set
1322 * to LEGACY and override in build_initial_monmap()
1323 * for mon addresses and ceph_messenger_init() for
1324 * ip option.
1325 */
1305 addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
1326 addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY;
1327 addr[i].nonce = 0;
1306
1307 dout("parse_ips got %s\n", ceph_pr_addr(&addr[i]));
1308
1309 if (p == end)
1310 break;
1311 if (*p != ',')
1312 goto bad;
1313 p++;

--- 91 unchanged lines hidden (view full) ---

1405 con->error_msg = "socket closed (con state " #x ")"; \
1406 break;
1407
1408 switch (con->state) {
1409 CASE(CLOSED);
1410 CASE(PREOPEN);
1411 CASE(V1_BANNER);
1412 CASE(V1_CONNECT_MSG);
1328
1329 dout("parse_ips got %s\n", ceph_pr_addr(&addr[i]));
1330
1331 if (p == end)
1332 break;
1333 if (*p != ',')
1334 goto bad;
1335 p++;

--- 91 unchanged lines hidden (view full) ---

1427 con->error_msg = "socket closed (con state " #x ")"; \
1428 break;
1429
1430 switch (con->state) {
1431 CASE(CLOSED);
1432 CASE(PREOPEN);
1433 CASE(V1_BANNER);
1434 CASE(V1_CONNECT_MSG);
1435 CASE(V2_BANNER_PREFIX);
1436 CASE(V2_BANNER_PAYLOAD);
1437 CASE(V2_HELLO);
1438 CASE(V2_AUTH);
1439 CASE(V2_AUTH_SIGNATURE);
1440 CASE(V2_SESSION_CONNECT);
1441 CASE(V2_SESSION_RECONNECT);
1413 CASE(OPEN);
1414 CASE(STANDBY);
1415 default:
1416 BUG();
1417 }
1418#undef CASE
1419
1420 return true;

--- 68 unchanged lines hidden (view full) ---

1489 BUG_ON(con->sock);
1490 break;
1491 }
1492 if (con->state == CEPH_CON_S_PREOPEN) {
1493 dout("%s: con %p PREOPEN\n", __func__, con);
1494 BUG_ON(con->sock);
1495 }
1496
1442 CASE(OPEN);
1443 CASE(STANDBY);
1444 default:
1445 BUG();
1446 }
1447#undef CASE
1448
1449 return true;

--- 68 unchanged lines hidden (view full) ---

1518 BUG_ON(con->sock);
1519 break;
1520 }
1521 if (con->state == CEPH_CON_S_PREOPEN) {
1522 dout("%s: con %p PREOPEN\n", __func__, con);
1523 BUG_ON(con->sock);
1524 }
1525
1497 ret = ceph_con_v1_try_read(con);
1526 if (ceph_msgr2(from_msgr(con->msgr)))
1527 ret = ceph_con_v2_try_read(con);
1528 else
1529 ret = ceph_con_v1_try_read(con);
1498 if (ret < 0) {
1499 if (ret == -EAGAIN)
1500 continue;
1501 if (!con->error_msg)
1502 con->error_msg = "socket error on read";
1503 fault = true;
1504 break;
1505 }
1506
1530 if (ret < 0) {
1531 if (ret == -EAGAIN)
1532 continue;
1533 if (!con->error_msg)
1534 con->error_msg = "socket error on read";
1535 fault = true;
1536 break;
1537 }
1538
1507 ret = ceph_con_v1_try_write(con);
1539 if (ceph_msgr2(from_msgr(con->msgr)))
1540 ret = ceph_con_v2_try_write(con);
1541 else
1542 ret = ceph_con_v1_try_write(con);
1508 if (ret < 0) {
1509 if (ret == -EAGAIN)
1510 continue;
1511 if (!con->error_msg)
1512 con->error_msg = "socket error on write";
1513 fault = true;
1514 }
1515

--- 17 unchanged lines hidden (view full) ---

1533{
1534 dout("fault %p state %d to peer %s\n",
1535 con, con->state, ceph_pr_addr(&con->peer_addr));
1536
1537 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
1538 ceph_pr_addr(&con->peer_addr), con->error_msg);
1539 con->error_msg = NULL;
1540
1543 if (ret < 0) {
1544 if (ret == -EAGAIN)
1545 continue;
1546 if (!con->error_msg)
1547 con->error_msg = "socket error on write";
1548 fault = true;
1549 }
1550

--- 17 unchanged lines hidden (view full) ---

1568{
1569 dout("fault %p state %d to peer %s\n",
1570 con, con->state, ceph_pr_addr(&con->peer_addr));
1571
1572 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
1573 ceph_pr_addr(&con->peer_addr), con->error_msg);
1574 con->error_msg = NULL;
1575
1541 WARN_ON(con->state != CEPH_CON_S_V1_BANNER &&
1542 con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1543 con->state != CEPH_CON_S_OPEN);
1576 WARN_ON(con->state == CEPH_CON_S_STANDBY ||
1577 con->state == CEPH_CON_S_CLOSED);
1544
1545 ceph_con_reset_protocol(con);
1546
1547 if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
1548 dout("fault on LOSSYTX channel, marking CLOSED\n");
1549 con->state = CEPH_CON_S_CLOSED;
1550 return;
1551 }

--- 39 unchanged lines hidden (view full) ---

1591 spin_lock_init(&msgr->global_seq_lock);
1592
1593 if (myaddr) {
1594 memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr,
1595 sizeof(msgr->inst.addr.in_addr));
1596 ceph_addr_set_port(&msgr->inst.addr, 0);
1597 }
1598
1578
1579 ceph_con_reset_protocol(con);
1580
1581 if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) {
1582 dout("fault on LOSSYTX channel, marking CLOSED\n");
1583 con->state = CEPH_CON_S_CLOSED;
1584 return;
1585 }

--- 39 unchanged lines hidden (view full) ---

1625 spin_lock_init(&msgr->global_seq_lock);
1626
1627 if (myaddr) {
1628 memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr,
1629 sizeof(msgr->inst.addr.in_addr));
1630 ceph_addr_set_port(&msgr->inst.addr, 0);
1631 }
1632
1599 msgr->inst.addr.type = 0;
1633 /*
1634 * Since nautilus, clients are identified using type ANY.
1635 * For msgr1, ceph_encode_banner_addr() munges it to NONE.
1636 */
1637 msgr->inst.addr.type = CEPH_ENTITY_ADDR_TYPE_ANY;
1600
1601 /* generate a random non-zero nonce */
1602 do {
1603 get_random_bytes(&msgr->inst.addr.nonce,
1604 sizeof(msgr->inst.addr.nonce));
1605 } while (!msgr->inst.addr.nonce);
1606 ceph_encode_my_addr(msgr);
1607

--- 93 unchanged lines hidden (view full) ---

1701
1702 dout("%s con %p msg %p was linked\n", __func__, con, msg);
1703 msg->hdr.seq = 0;
1704 ceph_msg_remove(msg);
1705
1706 if (con->out_msg == msg) {
1707 WARN_ON(con->state != CEPH_CON_S_OPEN);
1708 dout("%s con %p msg %p was sending\n", __func__, con, msg);
1638
1639 /* generate a random non-zero nonce */
1640 do {
1641 get_random_bytes(&msgr->inst.addr.nonce,
1642 sizeof(msgr->inst.addr.nonce));
1643 } while (!msgr->inst.addr.nonce);
1644 ceph_encode_my_addr(msgr);
1645

--- 93 unchanged lines hidden (view full) ---

1739
1740 dout("%s con %p msg %p was linked\n", __func__, con, msg);
1741 msg->hdr.seq = 0;
1742 ceph_msg_remove(msg);
1743
1744 if (con->out_msg == msg) {
1745 WARN_ON(con->state != CEPH_CON_S_OPEN);
1746 dout("%s con %p msg %p was sending\n", __func__, con, msg);
1709 ceph_con_v1_revoke(con);
1747 if (ceph_msgr2(from_msgr(con->msgr)))
1748 ceph_con_v2_revoke(con);
1749 else
1750 ceph_con_v1_revoke(con);
1710 ceph_msg_put(con->out_msg);
1711 con->out_msg = NULL;
1712 } else {
1713 dout("%s con %p msg %p not current, out_msg %p\n", __func__,
1714 con, msg, con->out_msg);
1715 }
1716 mutex_unlock(&con->mutex);
1717}

--- 9 unchanged lines hidden (view full) ---

1727 dout("%s msg %p null con\n", __func__, msg);
1728 return; /* Message not in our possession */
1729 }
1730
1731 mutex_lock(&con->mutex);
1732 if (con->in_msg == msg) {
1733 WARN_ON(con->state != CEPH_CON_S_OPEN);
1734 dout("%s con %p msg %p was recving\n", __func__, con, msg);
1751 ceph_msg_put(con->out_msg);
1752 con->out_msg = NULL;
1753 } else {
1754 dout("%s con %p msg %p not current, out_msg %p\n", __func__,
1755 con, msg, con->out_msg);
1756 }
1757 mutex_unlock(&con->mutex);
1758}

--- 9 unchanged lines hidden (view full) ---

1768 dout("%s msg %p null con\n", __func__, msg);
1769 return; /* Message not in our possession */
1770 }
1771
1772 mutex_lock(&con->mutex);
1773 if (con->in_msg == msg) {
1774 WARN_ON(con->state != CEPH_CON_S_OPEN);
1775 dout("%s con %p msg %p was recving\n", __func__, con, msg);
1735 ceph_con_v1_revoke_incoming(con);
1776 if (ceph_msgr2(from_msgr(con->msgr)))
1777 ceph_con_v2_revoke_incoming(con);
1778 else
1779 ceph_con_v1_revoke_incoming(con);
1736 ceph_msg_put(con->in_msg);
1737 con->in_msg = NULL;
1738 } else {
1739 dout("%s con %p msg %p not current, in_msg %p\n", __func__,
1740 con, msg, con->in_msg);
1741 }
1742 mutex_unlock(&con->mutex);
1743}

--- 372 unchanged lines hidden ---
1780 ceph_msg_put(con->in_msg);
1781 con->in_msg = NULL;
1782 } else {
1783 dout("%s con %p msg %p not current, in_msg %p\n", __func__,
1784 con, msg, con->in_msg);
1785 }
1786 mutex_unlock(&con->mutex);
1787}

--- 372 unchanged lines hidden ---