pNFS: Enable layoutreturn operation for return-on-close

Amend the pnfs return on close helper functions to enable sending the
layoutreturn op in CLOSE/DELEGRETURN. This closes a potential race between
CLOSE/DELEGRETURN and parallel OPEN calls to the same file, and allows the
client and the server to agree on whether or not there is an outstanding
layout.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index a93afdd..f61cb81 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -984,6 +984,20 @@ void pnfs_layoutreturn_free_lsegs(struct pnfs_layout_hdr *lo,
 
 }
 
+static void
+pnfs_set_plh_return_info(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode,
+			 u32 seq)
+{
+	if (lo->plh_return_iomode != 0 && lo->plh_return_iomode != iomode)
+		iomode = IOMODE_ANY;
+	lo->plh_return_iomode = iomode;
+	set_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags);
+	if (seq != 0) {
+		WARN_ON_ONCE(lo->plh_return_seq != 0 && lo->plh_return_seq != seq);
+		lo->plh_return_seq = seq;
+	}
+}
+
 static bool
 pnfs_prepare_layoutreturn(struct pnfs_layout_hdr *lo,
 		nfs4_stateid *stateid,
@@ -1188,17 +1202,22 @@ pnfs_commit_and_return_layout(struct inode *inode)
 	return ret;
 }
 
-bool pnfs_roc(struct inode *ino)
+bool pnfs_roc(struct inode *ino,
+		struct nfs4_layoutreturn_args *args,
+		struct nfs4_layoutreturn_res *res,
+		const struct rpc_cred *cred)
 {
 	struct nfs_inode *nfsi = NFS_I(ino);
 	struct nfs_open_context *ctx;
 	struct nfs4_state *state;
 	struct pnfs_layout_hdr *lo;
-	struct pnfs_layout_segment *lseg, *tmp;
+	struct pnfs_layout_segment *lseg, *next;
 	nfs4_stateid stateid;
-	LIST_HEAD(tmp_list);
-	bool found = false, layoutreturn = false, roc = false;
+	enum pnfs_iomode iomode = 0;
+	bool layoutreturn = false, roc = false;
 
+	if (!nfs_have_layout(ino))
+		return false;
 	spin_lock(&ino->i_lock);
 	lo = nfsi->layout;
 	if (!lo || !pnfs_layout_is_valid(lo) ||
@@ -1217,83 +1236,63 @@ bool pnfs_roc(struct inode *ino)
 	}
 
 
-	list_for_each_entry_safe(lseg, tmp, &lo->plh_segs, pls_list) {
+	list_for_each_entry_safe(lseg, next, &lo->plh_segs, pls_list) {
 		/* If we are sending layoutreturn, invalidate all valid lsegs */
-		if (test_bit(NFS_LSEG_ROC, &lseg->pls_flags)) {
-			mark_lseg_invalid(lseg, &tmp_list);
-			found = true;
-		}
+		if (!test_and_clear_bit(NFS_LSEG_ROC, &lseg->pls_flags))
+			continue;
+		/*
+		 * Note: mark lseg for return so pnfs_layout_remove_lseg
+		 * doesn't invalidate the layout for us.
+		 */
+		set_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags);
+		if (!mark_lseg_invalid(lseg, &lo->plh_return_segs))
+			continue;
+		pnfs_set_plh_return_info(lo, lseg->pls_range.iomode, 0);
 	}
 
-	/* always send layoutreturn if being marked so */
-	if (test_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags)) {
-		layoutreturn = pnfs_prepare_layoutreturn(lo,
-				&stateid, NULL);
-		if (layoutreturn)
-			goto out_noroc;
-	}
+	if (!test_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags))
+		goto out_noroc;
 
 	/* ROC in two conditions:
 	 * 1. there are ROC lsegs
 	 * 2. we don't send layoutreturn
 	 */
-	if (found) {
-		/* lo ref dropped in pnfs_roc_release() */
-		pnfs_get_layout_hdr(lo);
-		roc = true;
-	}
+	/* lo ref dropped in pnfs_roc_release() */
+	layoutreturn = pnfs_prepare_layoutreturn(lo, &stateid, &iomode);
+	/* If the creds don't match, we can't compound the layoutreturn */
+	if (!layoutreturn || cred != lo->plh_lc_cred)
+		goto out_noroc;
+
+	roc = layoutreturn;
+	pnfs_init_layoutreturn_args(args, lo, &stateid, iomode);
+	res->lrs_present = 0;
+	layoutreturn = false;
 
 out_noroc:
 	spin_unlock(&ino->i_lock);
-	pnfs_free_lseg_list(&tmp_list);
 	pnfs_layoutcommit_inode(ino, true);
 	if (layoutreturn)
-		pnfs_send_layoutreturn(lo, &stateid, IOMODE_ANY, true);
+		pnfs_send_layoutreturn(lo, &stateid, iomode, true);
 	return roc;
 }
 
-void pnfs_roc_release(struct inode *ino)
+void pnfs_roc_release(struct nfs4_layoutreturn_args *args,
+		struct nfs4_layoutreturn_res *res,
+		int ret)
 {
-	struct pnfs_layout_hdr *lo;
+	struct pnfs_layout_hdr *lo = args->layout;
+	const nfs4_stateid *arg_stateid = NULL;
+	const nfs4_stateid *res_stateid = NULL;
 
-	spin_lock(&ino->i_lock);
-	lo = NFS_I(ino)->layout;
-	pnfs_clear_layoutreturn_waitbit(lo);
-	if (atomic_dec_and_test(&lo->plh_refcount)) {
-		pnfs_detach_layout_hdr(lo);
-		spin_unlock(&ino->i_lock);
-		pnfs_free_layout_hdr(lo);
-	} else
-		spin_unlock(&ino->i_lock);
-}
-
-void pnfs_roc_set_barrier(struct inode *ino, u32 barrier)
-{
-	struct pnfs_layout_hdr *lo;
-
-	spin_lock(&ino->i_lock);
-	lo = NFS_I(ino)->layout;
-	if (pnfs_seqid_is_newer(barrier, lo->plh_barrier))
-		lo->plh_barrier = barrier;
-	spin_unlock(&ino->i_lock);
-	trace_nfs4_layoutreturn_on_close(ino, 0);
-}
-
-void pnfs_roc_get_barrier(struct inode *ino, u32 *barrier)
-{
-	struct nfs_inode *nfsi = NFS_I(ino);
-	struct pnfs_layout_hdr *lo;
-	u32 current_seqid;
-
-	spin_lock(&ino->i_lock);
-	lo = nfsi->layout;
-	current_seqid = be32_to_cpu(lo->plh_stateid.seqid);
-
-	/* Since close does not return a layout stateid for use as
-	 * a barrier, we choose the worst-case barrier.
-	 */
-	*barrier = current_seqid + atomic_read(&lo->plh_outstanding);
-	spin_unlock(&ino->i_lock);
+	if (ret == 0) {
+		arg_stateid = &args->stateid;
+		if (res->lrs_present)
+			res_stateid = &res->stateid;
+	}
+	pnfs_layoutreturn_free_lsegs(lo, arg_stateid, &args->range,
+			res_stateid);
+	pnfs_put_layout_hdr(lo);
+	trace_nfs4_layoutreturn_on_close(args->inode, 0);
 }
 
 bool pnfs_wait_on_layoutreturn(struct inode *ino, struct rpc_task *task)
@@ -1931,20 +1930,6 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
 	return ERR_PTR(-EAGAIN);
 }
 
-static void
-pnfs_set_plh_return_info(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode,
-			 u32 seq)
-{
-	if (lo->plh_return_iomode != 0 && lo->plh_return_iomode != iomode)
-		iomode = IOMODE_ANY;
-	lo->plh_return_iomode = iomode;
-	set_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags);
-	if (seq != 0) {
-		WARN_ON_ONCE(lo->plh_return_seq != 0 && lo->plh_return_seq != seq);
-		lo->plh_return_seq = seq;
-	}
-}
-
 /**
  * pnfs_mark_matching_lsegs_return - Free or return matching layout segments
  * @lo: pointer to layout header