|
@@ -509,7 +509,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
|
|
- if (con->msgr->tcp_nodelay) {
|
|
|
|
|
|
+ if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) {
|
|
int optval = 1;
|
|
int optval = 1;
|
|
|
|
|
|
ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
|
|
ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
|
|
@@ -637,9 +637,6 @@ static int con_close_socket(struct ceph_connection *con)
|
|
static void ceph_msg_remove(struct ceph_msg *msg)
|
|
static void ceph_msg_remove(struct ceph_msg *msg)
|
|
{
|
|
{
|
|
list_del_init(&msg->list_head);
|
|
list_del_init(&msg->list_head);
|
|
- BUG_ON(msg->con == NULL);
|
|
|
|
- msg->con->ops->put(msg->con);
|
|
|
|
- msg->con = NULL;
|
|
|
|
|
|
|
|
ceph_msg_put(msg);
|
|
ceph_msg_put(msg);
|
|
}
|
|
}
|
|
@@ -662,15 +659,14 @@ static void reset_connection(struct ceph_connection *con)
|
|
|
|
|
|
if (con->in_msg) {
|
|
if (con->in_msg) {
|
|
BUG_ON(con->in_msg->con != con);
|
|
BUG_ON(con->in_msg->con != con);
|
|
- con->in_msg->con = NULL;
|
|
|
|
ceph_msg_put(con->in_msg);
|
|
ceph_msg_put(con->in_msg);
|
|
con->in_msg = NULL;
|
|
con->in_msg = NULL;
|
|
- con->ops->put(con);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
con->connect_seq = 0;
|
|
con->connect_seq = 0;
|
|
con->out_seq = 0;
|
|
con->out_seq = 0;
|
|
if (con->out_msg) {
|
|
if (con->out_msg) {
|
|
|
|
+ BUG_ON(con->out_msg->con != con);
|
|
ceph_msg_put(con->out_msg);
|
|
ceph_msg_put(con->out_msg);
|
|
con->out_msg = NULL;
|
|
con->out_msg = NULL;
|
|
}
|
|
}
|
|
@@ -1205,7 +1201,7 @@ static void prepare_write_message_footer(struct ceph_connection *con)
|
|
con->out_kvec[v].iov_base = &m->footer;
|
|
con->out_kvec[v].iov_base = &m->footer;
|
|
if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
|
|
if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
|
|
if (con->ops->sign_message)
|
|
if (con->ops->sign_message)
|
|
- con->ops->sign_message(con, m);
|
|
|
|
|
|
+ con->ops->sign_message(m);
|
|
else
|
|
else
|
|
m->footer.sig = 0;
|
|
m->footer.sig = 0;
|
|
con->out_kvec[v].iov_len = sizeof(m->footer);
|
|
con->out_kvec[v].iov_len = sizeof(m->footer);
|
|
@@ -1432,7 +1428,8 @@ static int prepare_write_connect(struct ceph_connection *con)
|
|
dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
|
|
dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
|
|
con->connect_seq, global_seq, proto);
|
|
con->connect_seq, global_seq, proto);
|
|
|
|
|
|
- con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
|
|
|
|
|
|
+ con->out_connect.features =
|
|
|
|
+ cpu_to_le64(from_msgr(con->msgr)->supported_features);
|
|
con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
|
|
con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
|
|
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
|
|
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
|
|
con->out_connect.global_seq = cpu_to_le32(global_seq);
|
|
con->out_connect.global_seq = cpu_to_le32(global_seq);
|
|
@@ -1527,7 +1524,7 @@ static int write_partial_message_data(struct ceph_connection *con)
|
|
{
|
|
{
|
|
struct ceph_msg *msg = con->out_msg;
|
|
struct ceph_msg *msg = con->out_msg;
|
|
struct ceph_msg_data_cursor *cursor = &msg->cursor;
|
|
struct ceph_msg_data_cursor *cursor = &msg->cursor;
|
|
- bool do_datacrc = !con->msgr->nocrc;
|
|
|
|
|
|
+ bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
|
|
u32 crc;
|
|
u32 crc;
|
|
|
|
|
|
dout("%s %p msg %p\n", __func__, con, msg);
|
|
dout("%s %p msg %p\n", __func__, con, msg);
|
|
@@ -1552,8 +1549,8 @@ static int write_partial_message_data(struct ceph_connection *con)
|
|
bool need_crc;
|
|
bool need_crc;
|
|
int ret;
|
|
int ret;
|
|
|
|
|
|
- page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
|
|
|
|
- &last_piece);
|
|
|
|
|
|
+ page = ceph_msg_data_next(cursor, &page_offset, &length,
|
|
|
|
+ &last_piece);
|
|
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
|
|
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
|
|
length, !last_piece);
|
|
length, !last_piece);
|
|
if (ret <= 0) {
|
|
if (ret <= 0) {
|
|
@@ -1564,7 +1561,7 @@ static int write_partial_message_data(struct ceph_connection *con)
|
|
}
|
|
}
|
|
if (do_datacrc && cursor->need_crc)
|
|
if (do_datacrc && cursor->need_crc)
|
|
crc = ceph_crc32c_page(crc, page, page_offset, length);
|
|
crc = ceph_crc32c_page(crc, page, page_offset, length);
|
|
- need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
|
|
|
|
|
|
+ need_crc = ceph_msg_data_advance(cursor, (size_t)ret);
|
|
}
|
|
}
|
|
|
|
|
|
dout("%s %p msg %p done\n", __func__, con, msg);
|
|
dout("%s %p msg %p done\n", __func__, con, msg);
|
|
@@ -2005,8 +2002,8 @@ static int process_banner(struct ceph_connection *con)
|
|
|
|
|
|
static int process_connect(struct ceph_connection *con)
|
|
static int process_connect(struct ceph_connection *con)
|
|
{
|
|
{
|
|
- u64 sup_feat = con->msgr->supported_features;
|
|
|
|
- u64 req_feat = con->msgr->required_features;
|
|
|
|
|
|
+ u64 sup_feat = from_msgr(con->msgr)->supported_features;
|
|
|
|
+ u64 req_feat = from_msgr(con->msgr)->required_features;
|
|
u64 server_feat = ceph_sanitize_features(
|
|
u64 server_feat = ceph_sanitize_features(
|
|
le64_to_cpu(con->in_reply.features));
|
|
le64_to_cpu(con->in_reply.features));
|
|
int ret;
|
|
int ret;
|
|
@@ -2232,7 +2229,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
|
|
{
|
|
{
|
|
struct ceph_msg *msg = con->in_msg;
|
|
struct ceph_msg *msg = con->in_msg;
|
|
struct ceph_msg_data_cursor *cursor = &msg->cursor;
|
|
struct ceph_msg_data_cursor *cursor = &msg->cursor;
|
|
- const bool do_datacrc = !con->msgr->nocrc;
|
|
|
|
|
|
+ bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
|
|
struct page *page;
|
|
struct page *page;
|
|
size_t page_offset;
|
|
size_t page_offset;
|
|
size_t length;
|
|
size_t length;
|
|
@@ -2246,8 +2243,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
|
|
if (do_datacrc)
|
|
if (do_datacrc)
|
|
crc = con->in_data_crc;
|
|
crc = con->in_data_crc;
|
|
while (cursor->resid) {
|
|
while (cursor->resid) {
|
|
- page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
|
|
|
|
- NULL);
|
|
|
|
|
|
+ page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
|
|
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
|
|
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
|
|
if (ret <= 0) {
|
|
if (ret <= 0) {
|
|
if (do_datacrc)
|
|
if (do_datacrc)
|
|
@@ -2258,7 +2254,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
|
|
|
|
|
|
if (do_datacrc)
|
|
if (do_datacrc)
|
|
crc = ceph_crc32c_page(crc, page, page_offset, ret);
|
|
crc = ceph_crc32c_page(crc, page, page_offset, ret);
|
|
- (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
|
|
|
|
|
|
+ (void) ceph_msg_data_advance(cursor, (size_t)ret);
|
|
}
|
|
}
|
|
if (do_datacrc)
|
|
if (do_datacrc)
|
|
con->in_data_crc = crc;
|
|
con->in_data_crc = crc;
|
|
@@ -2278,7 +2274,7 @@ static int read_partial_message(struct ceph_connection *con)
|
|
int end;
|
|
int end;
|
|
int ret;
|
|
int ret;
|
|
unsigned int front_len, middle_len, data_len;
|
|
unsigned int front_len, middle_len, data_len;
|
|
- bool do_datacrc = !con->msgr->nocrc;
|
|
|
|
|
|
+ bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
|
|
bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
|
|
bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
|
|
u64 seq;
|
|
u64 seq;
|
|
u32 crc;
|
|
u32 crc;
|
|
@@ -2423,7 +2419,7 @@ static int read_partial_message(struct ceph_connection *con)
|
|
}
|
|
}
|
|
|
|
|
|
if (need_sign && con->ops->check_message_signature &&
|
|
if (need_sign && con->ops->check_message_signature &&
|
|
- con->ops->check_message_signature(con, m)) {
|
|
|
|
|
|
+ con->ops->check_message_signature(m)) {
|
|
pr_err("read_partial_message %p signature check failed\n", m);
|
|
pr_err("read_partial_message %p signature check failed\n", m);
|
|
return -EBADMSG;
|
|
return -EBADMSG;
|
|
}
|
|
}
|
|
@@ -2438,13 +2434,10 @@ static int read_partial_message(struct ceph_connection *con)
|
|
*/
|
|
*/
|
|
static void process_message(struct ceph_connection *con)
|
|
static void process_message(struct ceph_connection *con)
|
|
{
|
|
{
|
|
- struct ceph_msg *msg;
|
|
|
|
|
|
+ struct ceph_msg *msg = con->in_msg;
|
|
|
|
|
|
BUG_ON(con->in_msg->con != con);
|
|
BUG_ON(con->in_msg->con != con);
|
|
- con->in_msg->con = NULL;
|
|
|
|
- msg = con->in_msg;
|
|
|
|
con->in_msg = NULL;
|
|
con->in_msg = NULL;
|
|
- con->ops->put(con);
|
|
|
|
|
|
|
|
/* if first message, set peer_name */
|
|
/* if first message, set peer_name */
|
|
if (con->peer_name.type == 0)
|
|
if (con->peer_name.type == 0)
|
|
@@ -2677,7 +2670,7 @@ more:
|
|
if (ret <= 0) {
|
|
if (ret <= 0) {
|
|
switch (ret) {
|
|
switch (ret) {
|
|
case -EBADMSG:
|
|
case -EBADMSG:
|
|
- con->error_msg = "bad crc";
|
|
|
|
|
|
+ con->error_msg = "bad crc/signature";
|
|
/* fall through */
|
|
/* fall through */
|
|
case -EBADE:
|
|
case -EBADE:
|
|
ret = -EIO;
|
|
ret = -EIO;
|
|
@@ -2918,10 +2911,8 @@ static void con_fault(struct ceph_connection *con)
|
|
|
|
|
|
if (con->in_msg) {
|
|
if (con->in_msg) {
|
|
BUG_ON(con->in_msg->con != con);
|
|
BUG_ON(con->in_msg->con != con);
|
|
- con->in_msg->con = NULL;
|
|
|
|
ceph_msg_put(con->in_msg);
|
|
ceph_msg_put(con->in_msg);
|
|
con->in_msg = NULL;
|
|
con->in_msg = NULL;
|
|
- con->ops->put(con);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/* Requeue anything that hasn't been acked */
|
|
/* Requeue anything that hasn't been acked */
|
|
@@ -2952,15 +2943,8 @@ static void con_fault(struct ceph_connection *con)
|
|
* initialize a new messenger instance
|
|
* initialize a new messenger instance
|
|
*/
|
|
*/
|
|
void ceph_messenger_init(struct ceph_messenger *msgr,
|
|
void ceph_messenger_init(struct ceph_messenger *msgr,
|
|
- struct ceph_entity_addr *myaddr,
|
|
|
|
- u64 supported_features,
|
|
|
|
- u64 required_features,
|
|
|
|
- bool nocrc,
|
|
|
|
- bool tcp_nodelay)
|
|
|
|
|
|
+ struct ceph_entity_addr *myaddr)
|
|
{
|
|
{
|
|
- msgr->supported_features = supported_features;
|
|
|
|
- msgr->required_features = required_features;
|
|
|
|
-
|
|
|
|
spin_lock_init(&msgr->global_seq_lock);
|
|
spin_lock_init(&msgr->global_seq_lock);
|
|
|
|
|
|
if (myaddr)
|
|
if (myaddr)
|
|
@@ -2970,8 +2954,6 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
|
|
msgr->inst.addr.type = 0;
|
|
msgr->inst.addr.type = 0;
|
|
get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
|
|
get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
|
|
encode_my_addr(msgr);
|
|
encode_my_addr(msgr);
|
|
- msgr->nocrc = nocrc;
|
|
|
|
- msgr->tcp_nodelay = tcp_nodelay;
|
|
|
|
|
|
|
|
atomic_set(&msgr->stopping, 0);
|
|
atomic_set(&msgr->stopping, 0);
|
|
write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
|
|
write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
|
|
@@ -2986,6 +2968,15 @@ void ceph_messenger_fini(struct ceph_messenger *msgr)
|
|
}
|
|
}
|
|
EXPORT_SYMBOL(ceph_messenger_fini);
|
|
EXPORT_SYMBOL(ceph_messenger_fini);
|
|
|
|
|
|
|
|
+static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
|
|
|
|
+{
|
|
|
|
+ if (msg->con)
|
|
|
|
+ msg->con->ops->put(msg->con);
|
|
|
|
+
|
|
|
|
+ msg->con = con ? con->ops->get(con) : NULL;
|
|
|
|
+ BUG_ON(msg->con != con);
|
|
|
|
+}
|
|
|
|
+
|
|
static void clear_standby(struct ceph_connection *con)
|
|
static void clear_standby(struct ceph_connection *con)
|
|
{
|
|
{
|
|
/* come back from STANDBY? */
|
|
/* come back from STANDBY? */
|
|
@@ -3017,9 +3008,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- BUG_ON(msg->con != NULL);
|
|
|
|
- msg->con = con->ops->get(con);
|
|
|
|
- BUG_ON(msg->con == NULL);
|
|
|
|
|
|
+ msg_con_set(msg, con);
|
|
|
|
|
|
BUG_ON(!list_empty(&msg->list_head));
|
|
BUG_ON(!list_empty(&msg->list_head));
|
|
list_add_tail(&msg->list_head, &con->out_queue);
|
|
list_add_tail(&msg->list_head, &con->out_queue);
|
|
@@ -3047,16 +3036,15 @@ void ceph_msg_revoke(struct ceph_msg *msg)
|
|
{
|
|
{
|
|
struct ceph_connection *con = msg->con;
|
|
struct ceph_connection *con = msg->con;
|
|
|
|
|
|
- if (!con)
|
|
|
|
|
|
+ if (!con) {
|
|
|
|
+ dout("%s msg %p null con\n", __func__, msg);
|
|
return; /* Message not in our possession */
|
|
return; /* Message not in our possession */
|
|
|
|
+ }
|
|
|
|
|
|
mutex_lock(&con->mutex);
|
|
mutex_lock(&con->mutex);
|
|
if (!list_empty(&msg->list_head)) {
|
|
if (!list_empty(&msg->list_head)) {
|
|
dout("%s %p msg %p - was on queue\n", __func__, con, msg);
|
|
dout("%s %p msg %p - was on queue\n", __func__, con, msg);
|
|
list_del_init(&msg->list_head);
|
|
list_del_init(&msg->list_head);
|
|
- BUG_ON(msg->con == NULL);
|
|
|
|
- msg->con->ops->put(msg->con);
|
|
|
|
- msg->con = NULL;
|
|
|
|
msg->hdr.seq = 0;
|
|
msg->hdr.seq = 0;
|
|
|
|
|
|
ceph_msg_put(msg);
|
|
ceph_msg_put(msg);
|
|
@@ -3080,16 +3068,13 @@ void ceph_msg_revoke(struct ceph_msg *msg)
|
|
*/
|
|
*/
|
|
void ceph_msg_revoke_incoming(struct ceph_msg *msg)
|
|
void ceph_msg_revoke_incoming(struct ceph_msg *msg)
|
|
{
|
|
{
|
|
- struct ceph_connection *con;
|
|
|
|
|
|
+ struct ceph_connection *con = msg->con;
|
|
|
|
|
|
- BUG_ON(msg == NULL);
|
|
|
|
- if (!msg->con) {
|
|
|
|
|
|
+ if (!con) {
|
|
dout("%s msg %p null con\n", __func__, msg);
|
|
dout("%s msg %p null con\n", __func__, msg);
|
|
-
|
|
|
|
return; /* Message not in our possession */
|
|
return; /* Message not in our possession */
|
|
}
|
|
}
|
|
|
|
|
|
- con = msg->con;
|
|
|
|
mutex_lock(&con->mutex);
|
|
mutex_lock(&con->mutex);
|
|
if (con->in_msg == msg) {
|
|
if (con->in_msg == msg) {
|
|
unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
|
|
unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
|
|
@@ -3335,9 +3320,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
|
|
}
|
|
}
|
|
if (msg) {
|
|
if (msg) {
|
|
BUG_ON(*skip);
|
|
BUG_ON(*skip);
|
|
|
|
+ msg_con_set(msg, con);
|
|
con->in_msg = msg;
|
|
con->in_msg = msg;
|
|
- con->in_msg->con = con->ops->get(con);
|
|
|
|
- BUG_ON(con->in_msg->con == NULL);
|
|
|
|
} else {
|
|
} else {
|
|
/*
|
|
/*
|
|
* Null message pointer means either we should skip
|
|
* Null message pointer means either we should skip
|
|
@@ -3384,6 +3368,8 @@ static void ceph_msg_release(struct kref *kref)
|
|
dout("%s %p\n", __func__, m);
|
|
dout("%s %p\n", __func__, m);
|
|
WARN_ON(!list_empty(&m->list_head));
|
|
WARN_ON(!list_empty(&m->list_head));
|
|
|
|
|
|
|
|
+ msg_con_set(m, NULL);
|
|
|
|
+
|
|
/* drop middle, data, if any */
|
|
/* drop middle, data, if any */
|
|
if (m->middle) {
|
|
if (m->middle) {
|
|
ceph_buffer_put(m->middle);
|
|
ceph_buffer_put(m->middle);
|