nfs41: serialize first layoutget of a file

Per RFC 5661 Errata 3208:
| A client MAY always forget its layout state and associated
| layout stateid at any time (See also section 12.5.5.1).
| In such case, the client MUST use a non-layout stateid for the next
| LAYOUTGET operation. This will signal the server that the client has
| no more layouts on the file and its respective layout state can be
| released before issuing a new layout in response to LAYOUTGET.

In order to make such a signal unique to server, client needs to serialize
all layoutgets using non-layout stateid. We implement this by serializing
layoutgets when client has no layout segments at hand.

Signed-off-by: Peng Tao <tao.peng@primarydata.com>
Signed-off-by: Tom Haynes <Thomas.Haynes@primarydata.com>
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index fa00b56..7e1bac1 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -1288,6 +1288,7 @@
 	struct nfs_client *clp = server->nfs_client;
 	struct pnfs_layout_hdr *lo;
 	struct pnfs_layout_segment *lseg = NULL;
+	bool first;
 
 	if (!pnfs_enabled_sb(NFS_SERVER(ino)))
 		goto out;
@@ -1295,6 +1296,8 @@
 	if (pnfs_within_mdsthreshold(ctx, ino, iomode))
 		goto out;
 
+lookup_again:
+	first = false;
 	spin_lock(&ino->i_lock);
 	lo = pnfs_find_alloc_layout(ino, ctx, gfp_flags);
 	if (lo == NULL) {
@@ -1312,10 +1315,27 @@
 	if (pnfs_layout_io_test_failed(lo, iomode))
 		goto out_unlock;
 
-	/* Check to see if the layout for the given range already exists */
-	lseg = pnfs_find_lseg(lo, &arg);
-	if (lseg)
-		goto out_unlock;
+	first = list_empty(&lo->plh_segs);
+	if (first) {
+		/* The first layoutget for the file. Need to serialize per
+		 * RFC 5661 Errata 3208.
+		 */
+		if (test_and_set_bit(NFS_LAYOUT_FIRST_LAYOUTGET,
+				     &lo->plh_flags)) {
+			spin_unlock(&ino->i_lock);
+			wait_on_bit(&lo->plh_flags, NFS_LAYOUT_FIRST_LAYOUTGET,
+				    TASK_UNINTERRUPTIBLE);
+			pnfs_put_layout_hdr(lo);
+			goto lookup_again;
+		}
+	} else {
+		/* Check to see if the layout for the given range
+		 * already exists
+		 */
+		lseg = pnfs_find_lseg(lo, &arg);
+		if (lseg)
+			goto out_unlock;
+	}
 
 	if (pnfs_layoutgets_blocked(lo, 0))
 		goto out_unlock;
@@ -1343,6 +1363,13 @@
 	lseg = send_layoutget(lo, ctx, &arg, gfp_flags);
 	atomic_dec(&lo->plh_outstanding);
 out_put_layout_hdr:
+	if (first) {
+		unsigned long *bitlock = &lo->plh_flags;
+
+		clear_bit_unlock(NFS_LAYOUT_FIRST_LAYOUTGET, bitlock);
+		smp_mb__after_atomic();
+		wake_up_bit(bitlock, NFS_LAYOUT_FIRST_LAYOUTGET);
+	}
 	pnfs_put_layout_hdr(lo);
 out:
 	dprintk("%s: inode %s/%llu pNFS layout segment %s for "