libceph: use data cursor for message pagelist
Switch to using the message cursor for the (non-trail) outgoing
pagelist data item in a message if present.
Notes on the logic changes in out_msg_pos_next():
- only the mds client uses a ceph pagelist for message data;
- if the mds client ever uses a pagelist, it never uses a page
array (or anything else, for that matter) for data in the same
message;
- only the osd client uses the trail portion of a message data,
and when it does, it never uses any other data fields for
outgoing data in the same message; and finally
- only the rbd client uses bio message data (never pagelist).
Therefore out_msg_pos_next() can assume:
- if we're in the trail portion of a message, the message data
pagelist, data, and bio can be ignored; and
- if there is a page list, there will never be any a bio or page
array data, and vice-versa.
Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 4cc27a1..30c8792 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -931,8 +931,10 @@
#endif
msg_pos->data_pos = 0;
- /* If there's a trail, initialize its cursor */
+ /* Initialize data cursors */
+ if (ceph_msg_has_pagelist(msg))
+ ceph_msg_data_cursor_init(&msg->l);
if (ceph_msg_has_trail(msg))
ceph_msg_data_cursor_init(&msg->t);
@@ -1220,18 +1222,19 @@
{
struct ceph_msg *msg = con->out_msg;
struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
+ bool need_crc = false;
BUG_ON(!msg);
BUG_ON(!sent);
msg_pos->data_pos += sent;
msg_pos->page_pos += sent;
- if (in_trail) {
- bool need_crc;
-
+ if (in_trail)
need_crc = ceph_msg_data_advance(&msg->t, sent);
- BUG_ON(need_crc && sent != len);
- }
+ else if (ceph_msg_has_pagelist(msg))
+ need_crc = ceph_msg_data_advance(&msg->l, sent);
+ BUG_ON(need_crc && sent != len);
+
if (sent < len)
return;
@@ -1239,13 +1242,10 @@
msg_pos->page_pos = 0;
msg_pos->page++;
msg_pos->did_page_crc = false;
- if (ceph_msg_has_pagelist(msg)) {
- list_rotate_left(&msg->l.pagelist->head);
#ifdef CONFIG_BLOCK
- } else if (ceph_msg_has_bio(msg)) {
+ if (ceph_msg_has_bio(msg))
iter_bio_next(&msg->b.bio_iter, &msg->b.bio_seg);
#endif
- }
}
static void in_msg_pos_next(struct ceph_connection *con, size_t len,
@@ -1340,8 +1340,9 @@
} else if (ceph_msg_has_pages(msg)) {
page = msg->p.pages[msg_pos->page];
} else if (ceph_msg_has_pagelist(msg)) {
- page = list_first_entry(&msg->l.pagelist->head,
- struct page, lru);
+ use_cursor = true;
+ page = ceph_msg_data_next(&msg->l, &page_offset,
+ &length, &last_piece);
#ifdef CONFIG_BLOCK
} else if (ceph_msg_has_bio(msg)) {
struct bio_vec *bv;