messenger.c (a56dd9bf47220c3206f27075af8bdfb219a2a3cf) | messenger.c (cd1a677cad994021b19665ed476aea63f5d54f31) |
---|---|
1// SPDX-License-Identifier: GPL-2.0 2#include <linux/ceph/ceph_debug.h> 3 4#include <linux/crc32c.h> 5#include <linux/ctype.h> 6#include <linux/highmem.h> 7#include <linux/inet.h> 8#include <linux/kthread.h> --- 181 unchanged lines hidden (view full) --- 190 } 191 192 return s; 193} 194EXPORT_SYMBOL(ceph_pr_addr); 195 196void ceph_encode_my_addr(struct ceph_messenger *msgr) 197{ | 1// SPDX-License-Identifier: GPL-2.0 2#include <linux/ceph/ceph_debug.h> 3 4#include <linux/crc32c.h> 5#include <linux/ctype.h> 6#include <linux/highmem.h> 7#include <linux/inet.h> 8#include <linux/kthread.h> --- 181 unchanged lines hidden (view full) --- 190 } 191 192 return s; 193} 194EXPORT_SYMBOL(ceph_pr_addr); 195 196void ceph_encode_my_addr(struct ceph_messenger *msgr) 197{ |
198 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, sizeof(msgr->my_enc_addr)); 199 ceph_encode_banner_addr(&msgr->my_enc_addr); | 198 if (!ceph_msgr2(from_msgr(msgr))) { 199 memcpy(&msgr->my_enc_addr, &msgr->inst.addr, 200 sizeof(msgr->my_enc_addr)); 201 ceph_encode_banner_addr(&msgr->my_enc_addr); 202 } |
200} 201 202/* 203 * work queue for all reading and writing to/from the socket. 204 */ 205static struct workqueue_struct *ceph_msgr_wq; 206 207static int ceph_msgr_slab_init(void) --- 300 unchanged lines hidden (view full) --- 508 con->in_msg = NULL; 509 } 510 if (con->out_msg) { 511 WARN_ON(con->out_msg->con != con); 512 ceph_msg_put(con->out_msg); 513 con->out_msg = NULL; 514 } 515 | 203} 204 205/* 206 * work queue for all reading and writing to/from the socket. 207 */ 208static struct workqueue_struct *ceph_msgr_wq; 209 210static int ceph_msgr_slab_init(void) --- 300 unchanged lines hidden (view full) --- 511 con->in_msg = NULL; 512 } 513 if (con->out_msg) { 514 WARN_ON(con->out_msg->con != con); 515 ceph_msg_put(con->out_msg); 516 con->out_msg = NULL; 517 } 518 |
516 ceph_con_v1_reset_protocol(con); | 519 if (ceph_msgr2(from_msgr(con->msgr))) 520 ceph_con_v2_reset_protocol(con); 521 else 522 ceph_con_v1_reset_protocol(con); |
517} 518 519/* 520 * Reset a connection. Discard all incoming and outgoing messages 521 * and clear *_seq state. 522 */ 523static void ceph_msg_remove(struct ceph_msg *msg) 524{ 525 list_del_init(&msg->list_head); 526 527 ceph_msg_put(msg); 528} | 523} 524 525/* 526 * Reset a connection. Discard all incoming and outgoing messages 527 * and clear *_seq state. 528 */ 529static void ceph_msg_remove(struct ceph_msg *msg) 530{ 531 list_del_init(&msg->list_head); 532 533 ceph_msg_put(msg); 534} |
535 |
|
529static void ceph_msg_remove_list(struct list_head *head) 530{ 531 while (!list_empty(head)) { 532 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 533 list_head); 534 ceph_msg_remove(msg); 535 } 536} --- 5 unchanged lines hidden (view full) --- 542 WARN_ON(con->in_msg); 543 WARN_ON(con->out_msg); 544 ceph_msg_remove_list(&con->out_queue); 545 ceph_msg_remove_list(&con->out_sent); 546 con->out_seq = 0; 547 con->in_seq = 0; 548 con->in_seq_acked = 0; 549 | 536static void ceph_msg_remove_list(struct list_head *head) 537{ 538 while (!list_empty(head)) { 539 struct ceph_msg *msg = list_first_entry(head, struct ceph_msg, 540 list_head); 541 ceph_msg_remove(msg); 542 } 543} --- 5 unchanged lines hidden (view full) --- 549 WARN_ON(con->in_msg); 550 WARN_ON(con->out_msg); 551 ceph_msg_remove_list(&con->out_queue); 552 ceph_msg_remove_list(&con->out_sent); 553 con->out_seq = 0; 554 con->in_seq = 0; 555 con->in_seq_acked = 0; 556 |
550 ceph_con_v1_reset_session(con); | 557 if (ceph_msgr2(from_msgr(con->msgr))) 558 ceph_con_v2_reset_session(con); 559 else 560 ceph_con_v1_reset_session(con); |
551} 552 553/* 554 * mark a peer down. drop any open connections. 555 */ 556void ceph_con_close(struct ceph_connection *con) 557{ 558 mutex_lock(&con->mutex); --- 36 unchanged lines hidden (view full) --- 595} 596EXPORT_SYMBOL(ceph_con_open); 597 598/* 599 * return true if this connection ever successfully opened 600 */ 601bool ceph_con_opened(struct ceph_connection *con) 602{ | 561} 562 563/* 564 * mark a peer down. drop any open connections. 565 */ 566void ceph_con_close(struct ceph_connection *con) 567{ 568 mutex_lock(&con->mutex); --- 36 unchanged lines hidden (view full) --- 605} 606EXPORT_SYMBOL(ceph_con_open); 607 608/* 609 * return true if this connection ever successfully opened 610 */ 611bool ceph_con_opened(struct ceph_connection *con) 612{ |
613 if (ceph_msgr2(from_msgr(con->msgr))) 614 return ceph_con_v2_opened(con); 615 |
|
603 return ceph_con_v1_opened(con); 604} 605 606/* 607 * initialize a new connection. 608 */ 609void ceph_con_init(struct ceph_connection *con, void *private, 610 const struct ceph_connection_operations *ops, --- 686 unchanged lines hidden (view full) --- 1297 port = CEPH_MON_PORT; 1298 else if (port > 65535) 1299 goto bad; 1300 } else { 1301 port = CEPH_MON_PORT; 1302 } 1303 1304 ceph_addr_set_port(&addr[i], port); | 616 return ceph_con_v1_opened(con); 617} 618 619/* 620 * initialize a new connection. 621 */ 622void ceph_con_init(struct ceph_connection *con, void *private, 623 const struct ceph_connection_operations *ops, --- 686 unchanged lines hidden (view full) --- 1310 port = CEPH_MON_PORT; 1311 else if (port > 65535) 1312 goto bad; 1313 } else { 1314 port = CEPH_MON_PORT; 1315 } 1316 1317 ceph_addr_set_port(&addr[i], port); |
1318 /* 1319 * We want the type to be set according to ms_mode 1320 * option, but options are normally parsed after mon 1321 * addresses. Rather than complicating parsing, set 1322 * to LEGACY and override in build_initial_monmap() 1323 * for mon addresses and ceph_messenger_init() for 1324 * ip option. 1325 */ |
|
1305 addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY; | 1326 addr[i].type = CEPH_ENTITY_ADDR_TYPE_LEGACY; |
1327 addr[i].nonce = 0; |
|
1306 1307 dout("parse_ips got %s\n", ceph_pr_addr(&addr[i])); 1308 1309 if (p == end) 1310 break; 1311 if (*p != ',') 1312 goto bad; 1313 p++; --- 91 unchanged lines hidden (view full) --- 1405 con->error_msg = "socket closed (con state " #x ")"; \ 1406 break; 1407 1408 switch (con->state) { 1409 CASE(CLOSED); 1410 CASE(PREOPEN); 1411 CASE(V1_BANNER); 1412 CASE(V1_CONNECT_MSG); | 1328 1329 dout("parse_ips got %s\n", ceph_pr_addr(&addr[i])); 1330 1331 if (p == end) 1332 break; 1333 if (*p != ',') 1334 goto bad; 1335 p++; --- 91 unchanged lines hidden (view full) --- 1427 con->error_msg = "socket closed (con state " #x ")"; \ 1428 break; 1429 1430 switch (con->state) { 1431 CASE(CLOSED); 1432 CASE(PREOPEN); 1433 CASE(V1_BANNER); 1434 CASE(V1_CONNECT_MSG); |
1435 CASE(V2_BANNER_PREFIX); 1436 CASE(V2_BANNER_PAYLOAD); 1437 CASE(V2_HELLO); 1438 CASE(V2_AUTH); 1439 CASE(V2_AUTH_SIGNATURE); 1440 CASE(V2_SESSION_CONNECT); 1441 CASE(V2_SESSION_RECONNECT); |
|
1413 CASE(OPEN); 1414 CASE(STANDBY); 1415 default: 1416 BUG(); 1417 } 1418#undef CASE 1419 1420 return true; --- 68 unchanged lines hidden (view full) --- 1489 BUG_ON(con->sock); 1490 break; 1491 } 1492 if (con->state == CEPH_CON_S_PREOPEN) { 1493 dout("%s: con %p PREOPEN\n", __func__, con); 1494 BUG_ON(con->sock); 1495 } 1496 | 1442 CASE(OPEN); 1443 CASE(STANDBY); 1444 default: 1445 BUG(); 1446 } 1447#undef CASE 1448 1449 return true; --- 68 unchanged lines hidden (view full) --- 1518 BUG_ON(con->sock); 1519 break; 1520 } 1521 if (con->state == CEPH_CON_S_PREOPEN) { 1522 dout("%s: con %p PREOPEN\n", __func__, con); 1523 BUG_ON(con->sock); 1524 } 1525 |
1497 ret = ceph_con_v1_try_read(con); | 1526 if (ceph_msgr2(from_msgr(con->msgr))) 1527 ret = ceph_con_v2_try_read(con); 1528 else 1529 ret = ceph_con_v1_try_read(con); |
1498 if (ret < 0) { 1499 if (ret == -EAGAIN) 1500 continue; 1501 if (!con->error_msg) 1502 con->error_msg = "socket error on read"; 1503 fault = true; 1504 break; 1505 } 1506 | 1530 if (ret < 0) { 1531 if (ret == -EAGAIN) 1532 continue; 1533 if (!con->error_msg) 1534 con->error_msg = "socket error on read"; 1535 fault = true; 1536 break; 1537 } 1538 |
1507 ret = ceph_con_v1_try_write(con); | 1539 if (ceph_msgr2(from_msgr(con->msgr))) 1540 ret = ceph_con_v2_try_write(con); 1541 else 1542 ret = ceph_con_v1_try_write(con); |
1508 if (ret < 0) { 1509 if (ret == -EAGAIN) 1510 continue; 1511 if (!con->error_msg) 1512 con->error_msg = "socket error on write"; 1513 fault = true; 1514 } 1515 --- 17 unchanged lines hidden (view full) --- 1533{ 1534 dout("fault %p state %d to peer %s\n", 1535 con, con->state, ceph_pr_addr(&con->peer_addr)); 1536 1537 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 1538 ceph_pr_addr(&con->peer_addr), con->error_msg); 1539 con->error_msg = NULL; 1540 | 1543 if (ret < 0) { 1544 if (ret == -EAGAIN) 1545 continue; 1546 if (!con->error_msg) 1547 con->error_msg = "socket error on write"; 1548 fault = true; 1549 } 1550 --- 17 unchanged lines hidden (view full) --- 1568{ 1569 dout("fault %p state %d to peer %s\n", 1570 con, con->state, ceph_pr_addr(&con->peer_addr)); 1571 1572 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), 1573 ceph_pr_addr(&con->peer_addr), con->error_msg); 1574 con->error_msg = NULL; 1575 |
1541 WARN_ON(con->state != CEPH_CON_S_V1_BANNER && 1542 con->state != CEPH_CON_S_V1_CONNECT_MSG && 1543 con->state != CEPH_CON_S_OPEN); | 1576 WARN_ON(con->state == CEPH_CON_S_STANDBY || 1577 con->state == CEPH_CON_S_CLOSED); |
1544 1545 ceph_con_reset_protocol(con); 1546 1547 if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) { 1548 dout("fault on LOSSYTX channel, marking CLOSED\n"); 1549 con->state = CEPH_CON_S_CLOSED; 1550 return; 1551 } --- 39 unchanged lines hidden (view full) --- 1591 spin_lock_init(&msgr->global_seq_lock); 1592 1593 if (myaddr) { 1594 memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr, 1595 sizeof(msgr->inst.addr.in_addr)); 1596 ceph_addr_set_port(&msgr->inst.addr, 0); 1597 } 1598 | 1578 1579 ceph_con_reset_protocol(con); 1580 1581 if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) { 1582 dout("fault on LOSSYTX channel, marking CLOSED\n"); 1583 con->state = CEPH_CON_S_CLOSED; 1584 return; 1585 } --- 39 unchanged lines hidden (view full) --- 1625 spin_lock_init(&msgr->global_seq_lock); 1626 1627 if (myaddr) { 1628 memcpy(&msgr->inst.addr.in_addr, &myaddr->in_addr, 1629 sizeof(msgr->inst.addr.in_addr)); 1630 ceph_addr_set_port(&msgr->inst.addr, 0); 1631 } 1632 |
1599 msgr->inst.addr.type = 0; | 1633 /* 1634 * Since nautilus, clients are identified using type ANY. 1635 * For msgr1, ceph_encode_banner_addr() munges it to NONE. 1636 */ 1637 msgr->inst.addr.type = CEPH_ENTITY_ADDR_TYPE_ANY; |
1600 1601 /* generate a random non-zero nonce */ 1602 do { 1603 get_random_bytes(&msgr->inst.addr.nonce, 1604 sizeof(msgr->inst.addr.nonce)); 1605 } while (!msgr->inst.addr.nonce); 1606 ceph_encode_my_addr(msgr); 1607 --- 93 unchanged lines hidden (view full) --- 1701 1702 dout("%s con %p msg %p was linked\n", __func__, con, msg); 1703 msg->hdr.seq = 0; 1704 ceph_msg_remove(msg); 1705 1706 if (con->out_msg == msg) { 1707 WARN_ON(con->state != CEPH_CON_S_OPEN); 1708 dout("%s con %p msg %p was sending\n", __func__, con, msg); | 1638 1639 /* generate a random non-zero nonce */ 1640 do { 1641 get_random_bytes(&msgr->inst.addr.nonce, 1642 sizeof(msgr->inst.addr.nonce)); 1643 } while (!msgr->inst.addr.nonce); 1644 ceph_encode_my_addr(msgr); 1645 --- 93 unchanged lines hidden (view full) --- 1739 1740 dout("%s con %p msg %p was linked\n", __func__, con, msg); 1741 msg->hdr.seq = 0; 1742 ceph_msg_remove(msg); 1743 1744 if (con->out_msg == msg) { 1745 WARN_ON(con->state != CEPH_CON_S_OPEN); 1746 dout("%s con %p msg %p was sending\n", __func__, con, msg); |
1709 ceph_con_v1_revoke(con); | 1747 if (ceph_msgr2(from_msgr(con->msgr))) 1748 ceph_con_v2_revoke(con); 1749 else 1750 ceph_con_v1_revoke(con); |
1710 ceph_msg_put(con->out_msg); 1711 con->out_msg = NULL; 1712 } else { 1713 dout("%s con %p msg %p not current, out_msg %p\n", __func__, 1714 con, msg, con->out_msg); 1715 } 1716 mutex_unlock(&con->mutex); 1717} --- 9 unchanged lines hidden (view full) --- 1727 dout("%s msg %p null con\n", __func__, msg); 1728 return; /* Message not in our possession */ 1729 } 1730 1731 mutex_lock(&con->mutex); 1732 if (con->in_msg == msg) { 1733 WARN_ON(con->state != CEPH_CON_S_OPEN); 1734 dout("%s con %p msg %p was recving\n", __func__, con, msg); | 1751 ceph_msg_put(con->out_msg); 1752 con->out_msg = NULL; 1753 } else { 1754 dout("%s con %p msg %p not current, out_msg %p\n", __func__, 1755 con, msg, con->out_msg); 1756 } 1757 mutex_unlock(&con->mutex); 1758} --- 9 unchanged lines hidden (view full) --- 1768 dout("%s msg %p null con\n", __func__, msg); 1769 return; /* Message not in our possession */ 1770 } 1771 1772 mutex_lock(&con->mutex); 1773 if (con->in_msg == msg) { 1774 WARN_ON(con->state != CEPH_CON_S_OPEN); 1775 dout("%s con %p msg %p was recving\n", __func__, con, msg); |
1735 ceph_con_v1_revoke_incoming(con); | 1776 if (ceph_msgr2(from_msgr(con->msgr))) 1777 ceph_con_v2_revoke_incoming(con); 1778 else 1779 ceph_con_v1_revoke_incoming(con); |
1736 ceph_msg_put(con->in_msg); 1737 con->in_msg = NULL; 1738 } else { 1739 dout("%s con %p msg %p not current, in_msg %p\n", __func__, 1740 con, msg, con->in_msg); 1741 } 1742 mutex_unlock(&con->mutex); 1743} --- 372 unchanged lines hidden --- | 1780 ceph_msg_put(con->in_msg); 1781 con->in_msg = NULL; 1782 } else { 1783 dout("%s con %p msg %p not current, in_msg %p\n", __func__, 1784 con, msg, con->in_msg); 1785 } 1786 mutex_unlock(&con->mutex); 1787} --- 372 unchanged lines hidden --- |