ceph: use ceph_pagelist for mds reconnect message; change encoding (protocol change)

Use the ceph_pagelist to encode the MDS reconnect message.  We change the
message encoding (protocol change!) at the same time to make our life
easier (we don't know how many snaprealms we have when we start encoding).

An empty message implies the session is closed/does not exist.

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index ec884e2..6e08f48 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -9,6 +9,7 @@
 #include "messenger.h"
 #include "decode.h"
 #include "auth.h"
+#include "pagelist.h"
 
 /*
  * A cluster of MDS (metadata server) daemons is responsible for
@@ -1971,20 +1972,12 @@
 /*
  * Encode information about a cap for a reconnect with the MDS.
  */
-struct encode_caps_data {
-	void **pp;
-	void *end;
-	int *num_caps;
-};
-
 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 			  void *arg)
 {
-	struct ceph_mds_cap_reconnect *rec;
+	struct ceph_mds_cap_reconnect rec;
 	struct ceph_inode_info *ci;
-	struct encode_caps_data *data = (struct encode_caps_data *)arg;
-	void *p = *(data->pp);
-	void *end = data->end;
+	struct ceph_pagelist *pagelist = arg;
 	char *path;
 	int pathlen, err;
 	u64 pathbase;
@@ -1995,8 +1988,9 @@
 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
 	     inode, ceph_vinop(inode), cap, cap->cap_id,
 	     ceph_cap_string(cap->issued));
-	ceph_decode_need(&p, end, sizeof(u64), needmore);
-	ceph_encode_64(&p, ceph_ino(inode));
+	err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
+	if (err)
+		return err;
 
 	dentry = d_find_alias(inode);
 	if (dentry) {
@@ -2009,33 +2003,29 @@
 		path = NULL;
 		pathlen = 0;
 	}
-	ceph_decode_need(&p, end, pathlen+4, needmore);
-	ceph_encode_string(&p, end, path, pathlen);
+	err = ceph_pagelist_encode_string(pagelist, path, pathlen);
+	if (err)
+		goto out;
 
-	ceph_decode_need(&p, end, sizeof(*rec), needmore);
-	rec = p;
-	p += sizeof(*rec);
-	BUG_ON(p > end);
 	spin_lock(&inode->i_lock);
 	cap->seq = 0;        /* reset cap seq */
 	cap->issue_seq = 0;  /* and issue_seq */
-	rec->cap_id = cpu_to_le64(cap->cap_id);
-	rec->pathbase = cpu_to_le64(pathbase);
-	rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci));
-	rec->issued = cpu_to_le32(cap->issued);
-	rec->size = cpu_to_le64(inode->i_size);
-	ceph_encode_timespec(&rec->mtime, &inode->i_mtime);
-	ceph_encode_timespec(&rec->atime, &inode->i_atime);
-	rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+	rec.cap_id = cpu_to_le64(cap->cap_id);
+	rec.pathbase = cpu_to_le64(pathbase);
+	rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+	rec.issued = cpu_to_le32(cap->issued);
+	rec.size = cpu_to_le64(inode->i_size);
+	ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
+	ceph_encode_timespec(&rec.atime, &inode->i_atime);
+	rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
 	spin_unlock(&inode->i_lock);
 
+	err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
+
+out:
 	kfree(path);
 	dput(dentry);
-	(*data->num_caps)++;
-	*(data->pp) = p;
-	return 0;
-needmore:
-	return -ENOSPC;
+	return err;
 }
 
 
@@ -2053,19 +2043,26 @@
  */
 static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
 {
-	struct ceph_mds_session *session;
+	struct ceph_mds_session *session = NULL;
 	struct ceph_msg *reply;
-	int newlen, len = 4 + 1;
-	void *p, *end;
 	int err;
-	int num_caps, num_realms = 0;
 	int got;
 	u64 next_snap_ino = 0;
-	__le32 *pnum_caps, *pnum_realms;
-	struct encode_caps_data iter_args;
+	struct ceph_pagelist *pagelist;
 
 	pr_info("reconnect to recovering mds%d\n", mds);
 
+	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+	if (!pagelist)
+		goto fail_nopagelist;
+	ceph_pagelist_init(pagelist);
+
+	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
+	if (IS_ERR(reply)) {
+		err = PTR_ERR(reply);
+		goto fail_nomsg;
+	}
+
 	/* find session */
 	session = __ceph_lookup_mds_session(mdsc, mds);
 	mutex_unlock(&mdsc->mutex);    /* drop lock for duration */
@@ -2081,12 +2078,6 @@
 
 		/* replay unsafe requests */
 		replay_unsafe_requests(mdsc, session);
-
-		/* estimate needed space */
-		len += session->s_nr_caps *
-			(100+sizeof(struct ceph_mds_cap_reconnect));
-		pr_info("estimating i need %d bytes for %d caps\n",
-		     len, session->s_nr_caps);
 	} else {
 		dout("no session for mds%d, will send short reconnect\n",
 		     mds);
@@ -2094,41 +2085,18 @@
 
 	down_read(&mdsc->snap_rwsem);
 
-retry:
-	/* build reply */
-	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
-	if (IS_ERR(reply)) {
-		err = PTR_ERR(reply);
-		pr_err("send_mds_reconnect ENOMEM on %d for mds%d\n",
-		       len, mds);
-		goto out;
-	}
-	p = reply->front.iov_base;
-	end = p + len;
-
-	if (!session) {
-		ceph_encode_8(&p, 1); /* session was closed */
-		ceph_encode_32(&p, 0);
+	if (!session)
 		goto send;
-	}
 	dout("session %p state %s\n", session,
 	     session_state_name(session->s_state));
 
 	/* traverse this session's caps */
-	ceph_encode_8(&p, 0);
-	pnum_caps = p;
-	ceph_encode_32(&p, session->s_nr_caps);
-	num_caps = 0;
-
-	iter_args.pp = &p;
-	iter_args.end = end;
-	iter_args.num_caps = &num_caps;
-	err = iterate_session_caps(session, encode_caps_cb, &iter_args);
-	if (err == -ENOSPC)
-		goto needmore;
+	err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
+	if (err)
+		goto fail;
+	err = iterate_session_caps(session, encode_caps_cb, pagelist);
 	if (err < 0)
 		goto out;
-	*pnum_caps = cpu_to_le32(num_caps);
 
 	/*
 	 * snaprealms.  we provide mds with the ino, seq (version), and
@@ -2136,14 +2104,9 @@
 	 * it will tell us.
 	 */
 	next_snap_ino = 0;
-	/* save some space for the snaprealm count */
-	pnum_realms = p;
-	ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
-	p += sizeof(*pnum_realms);
-	num_realms = 0;
 	while (1) {
 		struct ceph_snap_realm *realm;
-		struct ceph_mds_snaprealm_reconnect *sr_rec;
+		struct ceph_mds_snaprealm_reconnect sr_rec;
 		got = radix_tree_gang_lookup(&mdsc->snap_realms,
 					     (void **)&realm, next_snap_ino, 1);
 		if (!got)
@@ -2151,22 +2114,19 @@
 
 		dout(" adding snap realm %llx seq %lld parent %llx\n",
 		     realm->ino, realm->seq, realm->parent_ino);
-		ceph_decode_need(&p, end, sizeof(*sr_rec), needmore);
-		sr_rec = p;
-		sr_rec->ino = cpu_to_le64(realm->ino);
-		sr_rec->seq = cpu_to_le64(realm->seq);
-		sr_rec->parent = cpu_to_le64(realm->parent_ino);
-		p += sizeof(*sr_rec);
-		num_realms++;
+		sr_rec.ino = cpu_to_le64(realm->ino);
+		sr_rec.seq = cpu_to_le64(realm->seq);
+		sr_rec.parent = cpu_to_le64(realm->parent_ino);
+		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
+		if (err)
+			goto fail;
 		next_snap_ino = realm->ino + 1;
 	}
-	*pnum_realms = cpu_to_le32(num_realms);
 
 send:
-	reply->front.iov_len = p - reply->front.iov_base;
-	reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
-	dout("final len was %u (guessed %d)\n",
-	     (unsigned)reply->front.iov_len, len);
+	reply->pagelist = pagelist;
+	reply->hdr.data_len = cpu_to_le32(pagelist->length);
+	reply->nr_pages = calc_pages_for(0, pagelist->length);
 	ceph_con_send(&session->s_con, reply);
 
 	if (session) {
@@ -2183,18 +2143,14 @@
 	mutex_lock(&mdsc->mutex);
 	return;
 
-needmore:
-	/*
-	 * we need a larger buffer.  this doesn't very accurately
-	 * factor in snap realms, but it's safe.
-	 */
-	num_caps += num_realms;
-	newlen = len * ((100 * (session->s_nr_caps+3)) / (num_caps + 1)) / 100;
-	pr_info("i guessed %d, and did %d of %d caps, retrying with %d\n",
-	     len, num_caps, session->s_nr_caps, newlen);
-	len = newlen;
+fail:
 	ceph_msg_put(reply);
-	goto retry;
+fail_nomsg:
+	ceph_pagelist_release(pagelist);
+	kfree(pagelist);
+fail_nopagelist:
+	pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
+	goto out;
 }