|
@@ -1174,8 +1174,8 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
|
|
|
* Returns true if the result moves the cursor on to the next piece
|
|
|
* of the data item.
|
|
|
*/
|
|
|
-static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
|
|
|
- size_t bytes)
|
|
|
+static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
|
|
|
+ size_t bytes)
|
|
|
{
|
|
|
bool new_piece;
|
|
|
|
|
@@ -1207,8 +1207,6 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
|
|
|
new_piece = true;
|
|
|
}
|
|
|
cursor->need_crc = new_piece;
|
|
|
-
|
|
|
- return new_piece;
|
|
|
}
|
|
|
|
|
|
static size_t sizeof_footer(struct ceph_connection *con)
|
|
@@ -1577,7 +1575,6 @@ static int write_partial_message_data(struct ceph_connection *con)
|
|
|
size_t page_offset;
|
|
|
size_t length;
|
|
|
bool last_piece;
|
|
|
- bool need_crc;
|
|
|
int ret;
|
|
|
|
|
|
page = ceph_msg_data_next(cursor, &page_offset, &length,
|
|
@@ -1592,7 +1589,7 @@ static int write_partial_message_data(struct ceph_connection *con)
|
|
|
}
|
|
|
if (do_datacrc && cursor->need_crc)
|
|
|
crc = ceph_crc32c_page(crc, page, page_offset, length);
|
|
|
- need_crc = ceph_msg_data_advance(cursor, (size_t)ret);
|
|
|
+ ceph_msg_data_advance(cursor, (size_t)ret);
|
|
|
}
|
|
|
|
|
|
dout("%s %p msg %p done\n", __func__, con, msg);
|
|
@@ -2231,10 +2228,18 @@ static void process_ack(struct ceph_connection *con)
|
|
|
struct ceph_msg *m;
|
|
|
u64 ack = le64_to_cpu(con->in_temp_ack);
|
|
|
u64 seq;
|
|
|
+ bool reconnect = (con->in_tag == CEPH_MSGR_TAG_SEQ);
|
|
|
+ struct list_head *list = reconnect ? &con->out_queue : &con->out_sent;
|
|
|
|
|
|
- while (!list_empty(&con->out_sent)) {
|
|
|
- m = list_first_entry(&con->out_sent, struct ceph_msg,
|
|
|
- list_head);
|
|
|
+ /*
|
|
|
+ * In the reconnect case, con_fault() has requeued messages
|
|
|
+ * in out_sent. We should cleanup old messages according to
|
|
|
+ * the reconnect seq.
|
|
|
+ */
|
|
|
+ while (!list_empty(list)) {
|
|
|
+ m = list_first_entry(list, struct ceph_msg, list_head);
|
|
|
+ if (reconnect && m->needs_out_seq)
|
|
|
+ break;
|
|
|
seq = le64_to_cpu(m->hdr.seq);
|
|
|
if (seq > ack)
|
|
|
break;
|
|
@@ -2243,6 +2248,7 @@ static void process_ack(struct ceph_connection *con)
|
|
|
m->ack_stamp = jiffies;
|
|
|
ceph_msg_remove(m);
|
|
|
}
|
|
|
+
|
|
|
prepare_read_tag(con);
|
|
|
}
|
|
|
|
|
@@ -2299,7 +2305,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
|
|
|
|
|
|
if (do_datacrc)
|
|
|
crc = ceph_crc32c_page(crc, page, page_offset, ret);
|
|
|
- (void) ceph_msg_data_advance(cursor, (size_t)ret);
|
|
|
+ ceph_msg_data_advance(cursor, (size_t)ret);
|
|
|
}
|
|
|
if (do_datacrc)
|
|
|
con->in_data_crc = crc;
|