ceph: handle ESTALE properly; on receipt send to authority if it wasn't

Signed-off-by: Greg Farnum <gregf@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a546e0d..34d215f 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1628,6 +1628,15 @@
 
 	req->r_mds = mds;
 	req->r_attempts++;
+	if (req->r_inode) {
+		struct ceph_cap *cap =
+			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
+
+		if (cap)
+			req->r_sent_on_mseq = cap->mseq;
+		else
+			req->r_sent_on_mseq = -1;
+	}
 	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
 	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
 
@@ -1962,21 +1971,39 @@
 	result = le32_to_cpu(head->result);
 
 	/*
-	 * Tolerate 2 consecutive ESTALEs from the same mds.
-	 * FIXME: we should be looking at the cap migrate_seq.
+	 * Handle an ESTALE
+	 * if we're not talking to the authority, send to them
+	 * if the authority has changed while we weren't looking,
+	 * send to new authority
+	 * Otherwise we just have to return an ESTALE
 	 */
 	if (result == -ESTALE) {
-		req->r_direct_mode = USE_AUTH_MDS;
-		req->r_num_stale++;
-		if (req->r_num_stale <= 2) {
+		dout("got ESTALE on request %llu", req->r_tid);
+		if (!req->r_inode) ; //do nothing; not an authority problem
+		else if (req->r_direct_mode != USE_AUTH_MDS) {
+			dout("not using auth, setting for that now");
+			req->r_direct_mode = USE_AUTH_MDS;
 			__do_request(mdsc, req);
 			mutex_unlock(&mdsc->mutex);
 			goto out;
+		} else  {
+			struct ceph_inode_info *ci = ceph_inode(req->r_inode);
+			struct ceph_cap *cap =
+				ceph_get_cap_for_mds(ci, req->r_mds);;
+
+			dout("already using auth");
+			if ((!cap || cap != ci->i_auth_cap) ||
+			    (cap->mseq != req->r_sent_on_mseq)) {
+				dout("but cap changed, so resending");
+				__do_request(mdsc, req);
+				mutex_unlock(&mdsc->mutex);
+				goto out;
+			}
 		}
-	} else {
-		req->r_num_stale = 0;
+		dout("have to return ESTALE on request %llu", req->r_tid);
 	}
 
+
 	if (head->safe) {
 		req->r_got_safe = true;
 		__unregister_request(mdsc, req);