ceph: clean up mds reply, error handling

We would occasionally BUG out in the reply handler because r_reply was
nonzero, due to a race with ceph_mdsc_do_request temporarily setting
r_reply to an ERR_PTR value.  This is unnecessary, messy, and also wrong
in the EIO case.

Clean up by consistently using r_err for errors and r_reply for messages.
Also fix the abort logic to trigger consistently for all errors that return
to the caller early (e.g., EIO from timeout case).  If an abort races with
a reply, use the result from the reply.

Also fix locking for r_err, r_reply update in the reply handler.

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 24561a5..b3b19f0 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1517,7 +1517,7 @@
 	}
 	msg = create_request_message(mdsc, req, mds);
 	if (IS_ERR(msg)) {
-		req->r_reply = ERR_PTR(PTR_ERR(msg));
+		req->r_err = PTR_ERR(msg);
 		complete_request(mdsc, req);
 		return -PTR_ERR(msg);
 	}
@@ -1552,7 +1552,7 @@
 	int mds = -1;
 	int err = -EAGAIN;
 
-	if (req->r_reply)
+	if (req->r_err || req->r_got_result)
 		goto out;
 
 	if (req->r_timeout &&
@@ -1609,7 +1609,7 @@
 	return err;
 
 finish:
-	req->r_reply = ERR_PTR(err);
+	req->r_err = err;
 	complete_request(mdsc, req);
 	goto out;
 }
@@ -1689,59 +1689,53 @@
 	__register_request(mdsc, req, dir);
 	__do_request(mdsc, req);
 
-	/* wait */
-	if (!req->r_reply) {
-		mutex_unlock(&mdsc->mutex);
-		if (req->r_timeout) {
-			err = (long)wait_for_completion_interruptible_timeout(
-				&req->r_completion, req->r_timeout);
-			if (err == 0)
-				req->r_reply = ERR_PTR(-EIO);
-			else if (err < 0)
-				req->r_reply = ERR_PTR(err);
-		} else {
-                        err = wait_for_completion_interruptible(
-                                &req->r_completion);
-                        if (err)
-                                req->r_reply = ERR_PTR(err);
-		}
-		mutex_lock(&mdsc->mutex);
-	}
-
-	if (IS_ERR(req->r_reply)) {
-		err = PTR_ERR(req->r_reply);
-		req->r_reply = NULL;
-
-		if (err == -ERESTARTSYS) {
-			/* aborted */
-			req->r_aborted = true;
-
-			if (req->r_locked_dir &&
-			    (req->r_op & CEPH_MDS_OP_WRITE)) {
-				struct ceph_inode_info *ci =
-					ceph_inode(req->r_locked_dir);
-
-				dout("aborted, clearing I_COMPLETE on %p\n", 
-				     req->r_locked_dir);
-				spin_lock(&req->r_locked_dir->i_lock);
-				ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
-				ci->i_release_count++;
-				spin_unlock(&req->r_locked_dir->i_lock);
-			}
-		} else {
-			/* clean up this request */
-			__unregister_request(mdsc, req);
-			if (!list_empty(&req->r_unsafe_item))
-				list_del_init(&req->r_unsafe_item);
-			complete(&req->r_safe_completion);
-		}
-	} else if (req->r_err) {
+	if (req->r_err) {
 		err = req->r_err;
-	} else {
-		err = le32_to_cpu(req->r_reply_info.head->result);
+		__unregister_request(mdsc, req);
+		dout("do_request early error %d\n", err);
+		goto out;
 	}
-	mutex_unlock(&mdsc->mutex);
 
+	/* wait */
+	mutex_unlock(&mdsc->mutex);
+	dout("do_request waiting\n");
+	if (req->r_timeout) {
+		err = (long)wait_for_completion_interruptible_timeout(
+			&req->r_completion, req->r_timeout);
+		if (err == 0)
+			err = -EIO;
+	} else {
+		err = wait_for_completion_interruptible(&req->r_completion);
+	}
+	dout("do_request waited, got %d\n", err);
+	mutex_lock(&mdsc->mutex);
+
+	/* only abort if we didn't race with a real reply */
+	if (req->r_got_result) {
+		err = le32_to_cpu(req->r_reply_info.head->result);
+	} else if (err < 0) {
+		dout("aborted request %lld with %d\n", req->r_tid, err);
+		req->r_err = err;
+		req->r_aborted = true;
+
+		if (req->r_locked_dir &&
+		    (req->r_op & CEPH_MDS_OP_WRITE)) {
+			struct ceph_inode_info *ci =
+				ceph_inode(req->r_locked_dir);
+
+			dout("aborted, clearing I_COMPLETE on %p\n",
+			     req->r_locked_dir);
+			spin_lock(&req->r_locked_dir->i_lock);
+			ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
+			ci->i_release_count++;
+			spin_unlock(&req->r_locked_dir->i_lock);
+		}
+	} else {
+		err = req->r_err;
+	}
+
+out:
+	mutex_unlock(&mdsc->mutex);
 	dout("do_request %p done, result %d\n", req, err);
 	return err;
 }
@@ -1838,11 +1832,7 @@
 			mutex_unlock(&mdsc->mutex);
 			goto out;
 		}
-	}
-
-	BUG_ON(req->r_reply);
-
-	if (!head->safe) {
+	} else {
 		req->r_got_unsafe = true;
 		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
 	}
@@ -1880,12 +1870,19 @@
 
 	up_read(&mdsc->snap_rwsem);
 out_err:
-	if (err) {
-		req->r_err = err;
+	mutex_lock(&mdsc->mutex);
+	if (!req->r_aborted) {
+		if (err) {
+			req->r_err = err;
+		} else {
+			req->r_reply = msg;
+			ceph_msg_get(msg);
+			req->r_got_result = true;
+		}
 	} else {
-		req->r_reply = msg;
-		ceph_msg_get(msg);
+		dout("reply arrived after request %lld was aborted\n", tid);
 	}
+	mutex_unlock(&mdsc->mutex);
 
 	add_cap_releases(mdsc, req->r_session, -1);
 	mutex_unlock(&session->s_mutex);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 961cc6f..0b1dd10b 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -213,7 +213,7 @@
 	struct completion r_safe_completion;
 	ceph_mds_request_callback_t r_callback;
 	struct list_head  r_unsafe_item;  /* per-session unsafe list item */
-	bool		  r_got_unsafe, r_got_safe;
+	bool		  r_got_unsafe, r_got_safe, r_got_result;
 
 	bool              r_did_prepopulate;
 	u32               r_readdir_offset;