Merge branch 'nfs-rdma'
diff --git a/fs/nfs/Makefile b/fs/nfs/Makefile
index 8664417..6abdda2 100644
--- a/fs/nfs/Makefile
+++ b/fs/nfs/Makefile
@@ -6,7 +6,7 @@
 
 CFLAGS_nfstrace.o += -I$(src)
 nfs-y 			:= client.o dir.o file.o getroot.o inode.o super.o \
-			   direct.o pagelist.o read.o symlink.o unlink.o \
+			   io.o direct.o pagelist.o read.o symlink.o unlink.o \
 			   write.o namespace.o mount_clnt.o nfstrace.o
 nfs-$(CONFIG_ROOT_NFS)	+= nfsroot.o
 nfs-$(CONFIG_SYSCTL)	+= sysctl.o
diff --git a/fs/nfs/blocklayout/dev.c b/fs/nfs/blocklayout/dev.c
index e5b8967..a69ef4e 100644
--- a/fs/nfs/blocklayout/dev.c
+++ b/fs/nfs/blocklayout/dev.c
@@ -65,8 +65,8 @@
 		if (!p)
 			return -EIO;
 		b->simple.nr_sigs = be32_to_cpup(p++);
-		if (!b->simple.nr_sigs) {
-			dprintk("no signature\n");
+		if (!b->simple.nr_sigs || b->simple.nr_sigs > PNFS_BLOCK_MAX_UUIDS) {
+			dprintk("Bad signature count: %d\n", b->simple.nr_sigs);
 			return -EIO;
 		}
 
@@ -89,7 +89,8 @@
 			memcpy(&b->simple.sigs[i].sig, p,
 				b->simple.sigs[i].sig_len);
 
-			b->simple.len += 8 + 4 + b->simple.sigs[i].sig_len;
+			b->simple.len += 8 + 4 + \
+				(XDR_QUADLEN(b->simple.sigs[i].sig_len) << 2);
 		}
 		break;
 	case PNFS_BLOCK_VOLUME_SLICE:
@@ -104,7 +105,12 @@
 		p = xdr_inline_decode(xdr, 4);
 		if (!p)
 			return -EIO;
+
 		b->concat.volumes_count = be32_to_cpup(p++);
+		if (b->concat.volumes_count > PNFS_BLOCK_MAX_DEVICES) {
+			dprintk("Too many volumes: %d\n", b->concat.volumes_count);
+			return -EIO;
+		}
 
 		p = xdr_inline_decode(xdr, b->concat.volumes_count * 4);
 		if (!p)
@@ -116,8 +122,13 @@
 		p = xdr_inline_decode(xdr, 8 + 4);
 		if (!p)
 			return -EIO;
+
 		p = xdr_decode_hyper(p, &b->stripe.chunk_size);
 		b->stripe.volumes_count = be32_to_cpup(p++);
+		if (b->stripe.volumes_count > PNFS_BLOCK_MAX_DEVICES) {
+			dprintk("Too many volumes: %d\n", b->stripe.volumes_count);
+			return -EIO;
+		}
 
 		p = xdr_inline_decode(xdr, b->stripe.volumes_count * 4);
 		if (!p)
@@ -224,18 +235,20 @@
 		struct pnfs_block_volume *volumes, int idx, gfp_t gfp_mask)
 {
 	struct pnfs_block_volume *v = &volumes[idx];
+	struct block_device *bdev;
 	dev_t dev;
 
 	dev = bl_resolve_deviceid(server, v, gfp_mask);
 	if (!dev)
 		return -EIO;
 
-	d->bdev = blkdev_get_by_dev(dev, FMODE_READ | FMODE_WRITE, NULL);
-	if (IS_ERR(d->bdev)) {
+	bdev = blkdev_get_by_dev(dev, FMODE_READ | FMODE_WRITE, NULL);
+	if (IS_ERR(bdev)) {
 		printk(KERN_WARNING "pNFS: failed to open device %d:%d (%ld)\n",
-			MAJOR(dev), MINOR(dev), PTR_ERR(d->bdev));
-		return PTR_ERR(d->bdev);
+			MAJOR(dev), MINOR(dev), PTR_ERR(bdev));
+		return PTR_ERR(bdev);
 	}
+	d->bdev = bdev;
 
 
 	d->len = i_size_read(d->bdev->bd_inode);
@@ -287,44 +300,71 @@
 	}
 }
 
+/*
+ * Try to open the udev path for the WWN.  At least on Debian the udev
+ * by-id path will always point to the dm-multipath device if one exists.
+ */
+static struct block_device *
+bl_open_udev_path(struct pnfs_block_volume *v)
+{
+	struct block_device *bdev;
+	const char *devname;
+
+	devname = kasprintf(GFP_KERNEL, "/dev/disk/by-id/wwn-0x%*phN",
+				v->scsi.designator_len, v->scsi.designator);
+	if (!devname)
+		return ERR_PTR(-ENOMEM);
+
+	bdev = blkdev_get_by_path(devname, FMODE_READ | FMODE_WRITE, NULL);
+	if (IS_ERR(bdev)) {
+		pr_warn("pNFS: failed to open device %s (%ld)\n",
+			devname, PTR_ERR(bdev));
+	}
+
+	kfree(devname);
+	return bdev;
+}
+
+/*
+ * Try to open the RH/Fedora specific dm-mpath udev path for this WWN, as the
+ * wwn- links will only point to the first discovered SCSI device there.
+ */
+static struct block_device *
+bl_open_dm_mpath_udev_path(struct pnfs_block_volume *v)
+{
+	struct block_device *bdev;
+	const char *devname;
+
+	devname = kasprintf(GFP_KERNEL,
+			"/dev/disk/by-id/dm-uuid-mpath-%d%*phN",
+			v->scsi.designator_type,
+			v->scsi.designator_len, v->scsi.designator);
+	if (!devname)
+		return ERR_PTR(-ENOMEM);
+
+	bdev = blkdev_get_by_path(devname, FMODE_READ | FMODE_WRITE, NULL);
+	kfree(devname);
+	return bdev;
+}
+
 static int
 bl_parse_scsi(struct nfs_server *server, struct pnfs_block_dev *d,
 		struct pnfs_block_volume *volumes, int idx, gfp_t gfp_mask)
 {
 	struct pnfs_block_volume *v = &volumes[idx];
+	struct block_device *bdev;
 	const struct pr_ops *ops;
-	const char *devname;
 	int error;
 
 	if (!bl_validate_designator(v))
 		return -EINVAL;
 
-	switch (v->scsi.designator_len) {
-	case 8:
-		devname = kasprintf(GFP_KERNEL, "/dev/disk/by-id/wwn-0x%8phN",
-				v->scsi.designator);
-		break;
-	case 12:
-		devname = kasprintf(GFP_KERNEL, "/dev/disk/by-id/wwn-0x%12phN",
-				v->scsi.designator);
-		break;
-	case 16:
-		devname = kasprintf(GFP_KERNEL, "/dev/disk/by-id/wwn-0x%16phN",
-				v->scsi.designator);
-		break;
-	default:
-		return -EINVAL;
-	}
-
-	d->bdev = blkdev_get_by_path(devname, FMODE_READ, NULL);
-	if (IS_ERR(d->bdev)) {
-		pr_warn("pNFS: failed to open device %s (%ld)\n",
-			devname, PTR_ERR(d->bdev));
-		kfree(devname);
-		return PTR_ERR(d->bdev);
-	}
-
-	kfree(devname);
+	bdev = bl_open_dm_mpath_udev_path(v);
+	if (IS_ERR(bdev))
+		bdev = bl_open_udev_path(v);
+	if (IS_ERR(bdev))
+		return PTR_ERR(bdev);
+	d->bdev = bdev;
 
 	d->len = i_size_read(d->bdev->bd_inode);
 	d->map = bl_map_simple;
@@ -352,7 +392,7 @@
 	return 0;
 
 out_blkdev_put:
-	blkdev_put(d->bdev, FMODE_READ);
+	blkdev_put(d->bdev, FMODE_READ | FMODE_WRITE);
 	return error;
 }
 
diff --git a/fs/nfs/blocklayout/extent_tree.c b/fs/nfs/blocklayout/extent_tree.c
index 720b3ff..992bcb1 100644
--- a/fs/nfs/blocklayout/extent_tree.c
+++ b/fs/nfs/blocklayout/extent_tree.c
@@ -121,6 +121,16 @@
 	return be;
 }
 
+static void __ext_put_deviceids(struct list_head *head)
+{
+	struct pnfs_block_extent *be, *tmp;
+
+	list_for_each_entry_safe(be, tmp, head, be_list) {
+		nfs4_put_deviceid_node(be->be_device);
+		kfree(be);
+	}
+}
+
 static void
 __ext_tree_insert(struct rb_root *root,
 		struct pnfs_block_extent *new, bool merge_ok)
@@ -163,7 +173,8 @@
 }
 
 static int
-__ext_tree_remove(struct rb_root *root, sector_t start, sector_t end)
+__ext_tree_remove(struct rb_root *root,
+		sector_t start, sector_t end, struct list_head *tmp)
 {
 	struct pnfs_block_extent *be;
 	sector_t len1 = 0, len2 = 0;
@@ -223,8 +234,7 @@
 			struct pnfs_block_extent *next = ext_tree_next(be);
 
 			rb_erase(&be->be_node, root);
-			nfs4_put_deviceid_node(be->be_device);
-			kfree(be);
+			list_add_tail(&be->be_list, tmp);
 			be = next;
 		}
 
@@ -350,16 +360,18 @@
 		sector_t start, sector_t end)
 {
 	int err, err2;
+	LIST_HEAD(tmp);
 
 	spin_lock(&bl->bl_ext_lock);
-	err = __ext_tree_remove(&bl->bl_ext_ro, start, end);
+	err = __ext_tree_remove(&bl->bl_ext_ro, start, end, &tmp);
 	if (rw) {
-		err2 = __ext_tree_remove(&bl->bl_ext_rw, start, end);
+		err2 = __ext_tree_remove(&bl->bl_ext_rw, start, end, &tmp);
 		if (!err)
 			err = err2;
 	}
 	spin_unlock(&bl->bl_ext_lock);
 
+	__ext_put_deviceids(&tmp);
 	return err;
 }
 
@@ -396,12 +408,13 @@
 	sector_t end = start + len;
 	struct pnfs_block_extent *be;
 	int err = 0;
+	LIST_HEAD(tmp);
 
 	spin_lock(&bl->bl_ext_lock);
 	/*
 	 * First remove all COW extents or holes from written to range.
 	 */
-	err = __ext_tree_remove(&bl->bl_ext_ro, start, end);
+	err = __ext_tree_remove(&bl->bl_ext_ro, start, end, &tmp);
 	if (err)
 		goto out;
 
@@ -459,6 +472,8 @@
 	}
 out:
 	spin_unlock(&bl->bl_ext_lock);
+
+	__ext_put_deviceids(&tmp);
 	return err;
 }
 
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index aaa2e8d..c92a75e 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -119,27 +119,30 @@
  * hashed by filehandle.
  */
 static struct pnfs_layout_hdr * get_layout_by_fh_locked(struct nfs_client *clp,
-		struct nfs_fh *fh, nfs4_stateid *stateid)
+		struct nfs_fh *fh)
 {
 	struct nfs_server *server;
+	struct nfs_inode *nfsi;
 	struct inode *ino;
 	struct pnfs_layout_hdr *lo;
 
+restart:
 	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
 		list_for_each_entry(lo, &server->layouts, plh_layouts) {
-			if (!nfs4_stateid_match_other(&lo->plh_stateid, stateid))
+			nfsi = NFS_I(lo->plh_inode);
+			if (nfs_compare_fh(fh, &nfsi->fh))
 				continue;
-			if (nfs_compare_fh(fh, &NFS_I(lo->plh_inode)->fh))
+			if (nfsi->layout != lo)
 				continue;
 			ino = igrab(lo->plh_inode);
 			if (!ino)
 				break;
 			spin_lock(&ino->i_lock);
 			/* Is this layout in the process of being freed? */
-			if (NFS_I(ino)->layout != lo) {
+			if (nfsi->layout != lo) {
 				spin_unlock(&ino->i_lock);
 				iput(ino);
-				break;
+				goto restart;
 			}
 			pnfs_get_layout_hdr(lo);
 			spin_unlock(&ino->i_lock);
@@ -151,13 +154,13 @@
 }
 
 static struct pnfs_layout_hdr * get_layout_by_fh(struct nfs_client *clp,
-		struct nfs_fh *fh, nfs4_stateid *stateid)
+		struct nfs_fh *fh)
 {
 	struct pnfs_layout_hdr *lo;
 
 	spin_lock(&clp->cl_lock);
 	rcu_read_lock();
-	lo = get_layout_by_fh_locked(clp, fh, stateid);
+	lo = get_layout_by_fh_locked(clp, fh);
 	rcu_read_unlock();
 	spin_unlock(&clp->cl_lock);
 
@@ -167,17 +170,39 @@
 /*
  * Enforce RFC5661 section 12.5.5.2.1. (Layout Recall and Return Sequencing)
  */
-static bool pnfs_check_stateid_sequence(struct pnfs_layout_hdr *lo,
+static u32 pnfs_check_callback_stateid(struct pnfs_layout_hdr *lo,
 					const nfs4_stateid *new)
 {
 	u32 oldseq, newseq;
 
-	oldseq = be32_to_cpu(lo->plh_stateid.seqid);
-	newseq = be32_to_cpu(new->seqid);
+	/* Is the stateid still not initialised? */
+	if (!pnfs_layout_is_valid(lo))
+		return NFS4ERR_DELAY;
 
+	/* Mismatched stateid? */
+	if (!nfs4_stateid_match_other(&lo->plh_stateid, new))
+		return NFS4ERR_BAD_STATEID;
+
+	newseq = be32_to_cpu(new->seqid);
+	/* Are we already in a layout recall situation? */
+	if (test_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags) &&
+	    lo->plh_return_seq != 0) {
+		if (newseq < lo->plh_return_seq)
+			return NFS4ERR_OLD_STATEID;
+		if (newseq > lo->plh_return_seq)
+			return NFS4ERR_DELAY;
+		goto out;
+	}
+
+	/* Check that the stateid matches what we think it should be. */
+	oldseq = be32_to_cpu(lo->plh_stateid.seqid);
 	if (newseq > oldseq + 1)
-		return false;
-	return true;
+		return NFS4ERR_DELAY;
+	/* Crazy server! */
+	if (newseq <= oldseq)
+		return NFS4ERR_OLD_STATEID;
+out:
+	return NFS_OK;
 }
 
 static u32 initiate_file_draining(struct nfs_client *clp,
@@ -188,7 +213,7 @@
 	u32 rv = NFS4ERR_NOMATCHING_LAYOUT;
 	LIST_HEAD(free_me_list);
 
-	lo = get_layout_by_fh(clp, &args->cbl_fh, &args->cbl_stateid);
+	lo = get_layout_by_fh(clp, &args->cbl_fh);
 	if (!lo) {
 		trace_nfs4_cb_layoutrecall_file(clp, &args->cbl_fh, NULL,
 				&args->cbl_stateid, -rv);
@@ -196,18 +221,15 @@
 	}
 
 	ino = lo->plh_inode;
-
-	spin_lock(&ino->i_lock);
-	if (!pnfs_check_stateid_sequence(lo, &args->cbl_stateid)) {
-		rv = NFS4ERR_DELAY;
-		goto unlock;
-	}
-	pnfs_set_layout_stateid(lo, &args->cbl_stateid, true);
-	spin_unlock(&ino->i_lock);
-
 	pnfs_layoutcommit_inode(ino, false);
 
+
 	spin_lock(&ino->i_lock);
+	rv = pnfs_check_callback_stateid(lo, &args->cbl_stateid);
+	if (rv != NFS_OK)
+		goto unlock;
+	pnfs_set_layout_stateid(lo, &args->cbl_stateid, true);
+
 	/*
 	 * Enforce RFC5661 Section 12.5.5.2.1.5 (Bulk Recall and Return)
 	 */
@@ -223,11 +245,13 @@
 		goto unlock;
 	}
 
+	/* Embrace your forgetfulness! */
+	rv = NFS4ERR_NOMATCHING_LAYOUT;
+
 	if (NFS_SERVER(ino)->pnfs_curr_ld->return_range) {
 		NFS_SERVER(ino)->pnfs_curr_ld->return_range(lo,
 			&args->cbl_range);
 	}
-	pnfs_mark_layout_returned_if_empty(lo);
 unlock:
 	spin_unlock(&ino->i_lock);
 	pnfs_free_lseg_list(&free_me_list);
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index 0c96528..4849d0f 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -367,8 +367,6 @@
  */
 struct nfs_client *
 nfs_get_client(const struct nfs_client_initdata *cl_init,
-	       const struct rpc_timeout *timeparms,
-	       const char *ip_addr,
 	       rpc_authflavor_t authflavour)
 {
 	struct nfs_client *clp, *new = NULL;
@@ -399,7 +397,7 @@
 					&nn->nfs_client_list);
 			spin_unlock(&nn->nfs_client_lock);
 			new->cl_flags = cl_init->init_flags;
-			return rpc_ops->init_client(new, timeparms, ip_addr);
+			return rpc_ops->init_client(new, cl_init);
 		}
 
 		spin_unlock(&nn->nfs_client_lock);
@@ -470,7 +468,7 @@
  * Create an RPC client handle
  */
 int nfs_create_rpc_client(struct nfs_client *clp,
-			  const struct rpc_timeout *timeparms,
+			  const struct nfs_client_initdata *cl_init,
 			  rpc_authflavor_t flavor)
 {
 	struct rpc_clnt		*clnt = NULL;
@@ -479,8 +477,9 @@
 		.protocol	= clp->cl_proto,
 		.address	= (struct sockaddr *)&clp->cl_addr,
 		.addrsize	= clp->cl_addrlen,
-		.timeout	= timeparms,
+		.timeout	= cl_init->timeparms,
 		.servername	= clp->cl_hostname,
+		.nodename	= cl_init->nodename,
 		.program	= &nfs_program,
 		.version	= clp->rpc_ops->version,
 		.authflavor	= flavor,
@@ -591,14 +590,12 @@
  * nfs_init_client - Initialise an NFS2 or NFS3 client
  *
  * @clp: nfs_client to initialise
- * @timeparms: timeout parameters for underlying RPC transport
- * @ip_addr: IP presentation address (not used)
+ * @cl_init: Initialisation parameters
  *
  * Returns pointer to an NFS client, or an ERR_PTR value.
  */
 struct nfs_client *nfs_init_client(struct nfs_client *clp,
-		    const struct rpc_timeout *timeparms,
-		    const char *ip_addr)
+				   const struct nfs_client_initdata *cl_init)
 {
 	int error;
 
@@ -612,7 +609,7 @@
 	 * Create a client RPC handle for doing FSSTAT with UNIX auth only
 	 * - RFC 2623, sec 2.3.2
 	 */
-	error = nfs_create_rpc_client(clp, timeparms, RPC_AUTH_UNIX);
+	error = nfs_create_rpc_client(clp, cl_init, RPC_AUTH_UNIX);
 	if (error < 0)
 		goto error;
 	nfs_mark_client_ready(clp, NFS_CS_READY);
@@ -633,6 +630,7 @@
 			   const struct nfs_parsed_mount_data *data,
 			   struct nfs_subversion *nfs_mod)
 {
+	struct rpc_timeout timeparms;
 	struct nfs_client_initdata cl_init = {
 		.hostname = data->nfs_server.hostname,
 		.addr = (const struct sockaddr *)&data->nfs_server.address,
@@ -640,8 +638,8 @@
 		.nfs_mod = nfs_mod,
 		.proto = data->nfs_server.protocol,
 		.net = data->net,
+		.timeparms = &timeparms,
 	};
-	struct rpc_timeout timeparms;
 	struct nfs_client *clp;
 	int error;
 
@@ -653,7 +651,7 @@
 		set_bit(NFS_CS_NORESVPORT, &cl_init.init_flags);
 
 	/* Allocate or find a client reference we can use */
-	clp = nfs_get_client(&cl_init, &timeparms, NULL, RPC_AUTH_UNIX);
+	clp = nfs_get_client(&cl_init, RPC_AUTH_UNIX);
 	if (IS_ERR(clp)) {
 		dprintk("<-- nfs_init_server() = error %ld\n", PTR_ERR(clp));
 		return PTR_ERR(clp);
diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c
index d8015a0..e2d606a 100644
--- a/fs/nfs/dir.c
+++ b/fs/nfs/dir.c
@@ -2231,21 +2231,37 @@
 	return NULL;
 }
 
-static int nfs_access_get_cached(struct inode *inode, struct rpc_cred *cred, struct nfs_access_entry *res)
+static int nfs_access_get_cached(struct inode *inode, struct rpc_cred *cred, struct nfs_access_entry *res, bool may_block)
 {
 	struct nfs_inode *nfsi = NFS_I(inode);
 	struct nfs_access_entry *cache;
-	int err = -ENOENT;
+	bool retry = true;
+	int err;
 
 	spin_lock(&inode->i_lock);
-	if (nfsi->cache_validity & NFS_INO_INVALID_ACCESS)
-		goto out_zap;
-	cache = nfs_access_search_rbtree(inode, cred);
-	if (cache == NULL)
-		goto out;
-	if (!nfs_have_delegated_attributes(inode) &&
-	    !time_in_range_open(jiffies, cache->jiffies, cache->jiffies + nfsi->attrtimeo))
-		goto out_stale;
+	for(;;) {
+		if (nfsi->cache_validity & NFS_INO_INVALID_ACCESS)
+			goto out_zap;
+		cache = nfs_access_search_rbtree(inode, cred);
+		err = -ENOENT;
+		if (cache == NULL)
+			goto out;
+		/* Found an entry, is our attribute cache valid? */
+		if (!nfs_attribute_cache_expired(inode) &&
+		    !(nfsi->cache_validity & NFS_INO_INVALID_ATTR))
+			break;
+		err = -ECHILD;
+		if (!may_block)
+			goto out;
+		if (!retry)
+			goto out_zap;
+		spin_unlock(&inode->i_lock);
+		err = __nfs_revalidate_inode(NFS_SERVER(inode), inode);
+		if (err)
+			return err;
+		spin_lock(&inode->i_lock);
+		retry = false;
+	}
 	res->jiffies = cache->jiffies;
 	res->cred = cache->cred;
 	res->mask = cache->mask;
@@ -2254,12 +2270,6 @@
 out:
 	spin_unlock(&inode->i_lock);
 	return err;
-out_stale:
-	rb_erase(&cache->rb_node, &nfsi->access_cache);
-	list_del(&cache->lru);
-	spin_unlock(&inode->i_lock);
-	nfs_access_free_entry(cache);
-	return -ENOENT;
 out_zap:
 	spin_unlock(&inode->i_lock);
 	nfs_access_zap_cache(inode);
@@ -2286,13 +2296,12 @@
 		cache = NULL;
 	if (cache == NULL)
 		goto out;
-	if (!nfs_have_delegated_attributes(inode) &&
-	    !time_in_range_open(jiffies, cache->jiffies, cache->jiffies + nfsi->attrtimeo))
+	err = nfs_revalidate_inode_rcu(NFS_SERVER(inode), inode);
+	if (err)
 		goto out;
 	res->jiffies = cache->jiffies;
 	res->cred = cache->cred;
 	res->mask = cache->mask;
-	err = 0;
 out:
 	rcu_read_unlock();
 	return err;
@@ -2381,18 +2390,19 @@
 static int nfs_do_access(struct inode *inode, struct rpc_cred *cred, int mask)
 {
 	struct nfs_access_entry cache;
+	bool may_block = (mask & MAY_NOT_BLOCK) == 0;
 	int status;
 
 	trace_nfs_access_enter(inode);
 
 	status = nfs_access_get_cached_rcu(inode, cred, &cache);
 	if (status != 0)
-		status = nfs_access_get_cached(inode, cred, &cache);
+		status = nfs_access_get_cached(inode, cred, &cache, may_block);
 	if (status == 0)
 		goto out_cached;
 
 	status = -ECHILD;
-	if (mask & MAY_NOT_BLOCK)
+	if (!may_block)
 		goto out;
 
 	/* Be clever: ask server to check for all possible rights */
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index c7326c2..f0f4b8d 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -196,6 +196,12 @@
 	WARN_ON_ONCE(verfp->committed < 0);
 }
 
+static int nfs_direct_cmp_verf(const struct nfs_writeverf *v1,
+		const struct nfs_writeverf *v2)
+{
+	return nfs_write_verifier_cmp(&v1->verifier, &v2->verifier);
+}
+
 /*
  * nfs_direct_cmp_hdr_verf - compare verifier for pgio header
  * @dreq - direct request possibly spanning multiple servers
@@ -215,7 +221,7 @@
 		nfs_direct_set_hdr_verf(dreq, hdr);
 		return 0;
 	}
-	return memcmp(verfp, &hdr->verf, sizeof(struct nfs_writeverf));
+	return nfs_direct_cmp_verf(verfp, &hdr->verf);
 }
 
 /*
@@ -238,7 +244,7 @@
 	if (verfp->committed < 0)
 		return 1;
 
-	return memcmp(verfp, &data->verf, sizeof(struct nfs_writeverf));
+	return nfs_direct_cmp_verf(verfp, &data->verf);
 }
 
 /**
@@ -368,22 +374,10 @@
  * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
  * the iocb is still valid here if this is a synchronous request.
  */
-static void nfs_direct_complete(struct nfs_direct_req *dreq, bool write)
+static void nfs_direct_complete(struct nfs_direct_req *dreq)
 {
 	struct inode *inode = dreq->inode;
 
-	if (dreq->iocb && write) {
-		loff_t pos = dreq->iocb->ki_pos + dreq->count;
-
-		spin_lock(&inode->i_lock);
-		if (i_size_read(inode) < pos)
-			i_size_write(inode, pos);
-		spin_unlock(&inode->i_lock);
-	}
-
-	if (write)
-		nfs_zap_mapping(inode, inode->i_mapping);
-
 	inode_dio_end(inode);
 
 	if (dreq->iocb) {
@@ -438,7 +432,7 @@
 	}
 out_put:
 	if (put_dreq(dreq))
-		nfs_direct_complete(dreq, false);
+		nfs_direct_complete(dreq);
 	hdr->release(hdr);
 }
 
@@ -544,7 +538,7 @@
 	}
 
 	if (put_dreq(dreq))
-		nfs_direct_complete(dreq, false);
+		nfs_direct_complete(dreq);
 	return 0;
 }
 
@@ -585,17 +579,12 @@
 	if (!count)
 		goto out;
 
-	inode_lock(inode);
-	result = nfs_sync_mapping(mapping);
-	if (result)
-		goto out_unlock;
-
 	task_io_account_read(count);
 
 	result = -ENOMEM;
 	dreq = nfs_direct_req_alloc();
 	if (dreq == NULL)
-		goto out_unlock;
+		goto out;
 
 	dreq->inode = inode;
 	dreq->bytes_left = dreq->max_count = count;
@@ -610,10 +599,12 @@
 	if (!is_sync_kiocb(iocb))
 		dreq->iocb = iocb;
 
+	nfs_start_io_direct(inode);
+
 	NFS_I(inode)->read_io += count;
 	result = nfs_direct_read_schedule_iovec(dreq, iter, iocb->ki_pos);
 
-	inode_unlock(inode);
+	nfs_end_io_direct(inode);
 
 	if (!result) {
 		result = nfs_direct_wait(dreq);
@@ -621,13 +612,8 @@
 			iocb->ki_pos += result;
 	}
 
-	nfs_direct_req_release(dreq);
-	return result;
-
 out_release:
 	nfs_direct_req_release(dreq);
-out_unlock:
-	inode_unlock(inode);
 out:
 	return result;
 }
@@ -659,6 +645,8 @@
 	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
 
 	dreq->count = 0;
+	dreq->verf.committed = NFS_INVALID_STABLE_HOW;
+	nfs_clear_pnfs_ds_commit_verifiers(&dreq->ds_cinfo);
 	for (i = 0; i < dreq->mirror_count; i++)
 		dreq->mirrors[i].count = 0;
 	get_dreq(dreq);
@@ -777,7 +765,8 @@
 			nfs_direct_write_reschedule(dreq);
 			break;
 		default:
-			nfs_direct_complete(dreq, true);
+			nfs_zap_mapping(dreq->inode, dreq->inode->i_mapping);
+			nfs_direct_complete(dreq);
 	}
 }
 
@@ -993,6 +982,7 @@
 ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter)
 {
 	ssize_t result = -EINVAL;
+	size_t count;
 	struct file *file = iocb->ki_filp;
 	struct address_space *mapping = file->f_mapping;
 	struct inode *inode = mapping->host;
@@ -1003,34 +993,24 @@
 	dfprintk(FILE, "NFS: direct write(%pD2, %zd@%Ld)\n",
 		file, iov_iter_count(iter), (long long) iocb->ki_pos);
 
-	nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES,
-		      iov_iter_count(iter));
+	result = generic_write_checks(iocb, iter);
+	if (result <= 0)
+		return result;
+	count = result;
+	nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count);
 
 	pos = iocb->ki_pos;
 	end = (pos + iov_iter_count(iter) - 1) >> PAGE_SHIFT;
 
-	inode_lock(inode);
-
-	result = nfs_sync_mapping(mapping);
-	if (result)
-		goto out_unlock;
-
-	if (mapping->nrpages) {
-		result = invalidate_inode_pages2_range(mapping,
-					pos >> PAGE_SHIFT, end);
-		if (result)
-			goto out_unlock;
-	}
-
-	task_io_account_write(iov_iter_count(iter));
+	task_io_account_write(count);
 
 	result = -ENOMEM;
 	dreq = nfs_direct_req_alloc();
 	if (!dreq)
-		goto out_unlock;
+		goto out;
 
 	dreq->inode = inode;
-	dreq->bytes_left = dreq->max_count = iov_iter_count(iter);
+	dreq->bytes_left = dreq->max_count = count;
 	dreq->io_start = pos;
 	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
 	l_ctx = nfs_get_lock_context(dreq->ctx);
@@ -1042,6 +1022,8 @@
 	if (!is_sync_kiocb(iocb))
 		dreq->iocb = iocb;
 
+	nfs_start_io_direct(inode);
+
 	result = nfs_direct_write_schedule_iovec(dreq, iter, pos);
 
 	if (mapping->nrpages) {
@@ -1049,30 +1031,19 @@
 					      pos >> PAGE_SHIFT, end);
 	}
 
-	inode_unlock(inode);
+	nfs_end_io_direct(inode);
 
 	if (!result) {
 		result = nfs_direct_wait(dreq);
 		if (result > 0) {
-			struct inode *inode = mapping->host;
-
 			iocb->ki_pos = pos + result;
-			spin_lock(&inode->i_lock);
-			if (i_size_read(inode) < iocb->ki_pos)
-				i_size_write(inode, iocb->ki_pos);
-			spin_unlock(&inode->i_lock);
-
 			/* XXX: should check the generic_write_sync retval */
 			generic_write_sync(iocb, result);
 		}
 	}
-	nfs_direct_req_release(dreq);
-	return result;
-
 out_release:
 	nfs_direct_req_release(dreq);
-out_unlock:
-	inode_unlock(inode);
+out:
 	return result;
 }
 
diff --git a/fs/nfs/file.c b/fs/nfs/file.c
index 717a8d6..7d62097 100644
--- a/fs/nfs/file.c
+++ b/fs/nfs/file.c
@@ -170,12 +170,14 @@
 		iocb->ki_filp,
 		iov_iter_count(to), (unsigned long) iocb->ki_pos);
 
-	result = nfs_revalidate_mapping_protected(inode, iocb->ki_filp->f_mapping);
+	nfs_start_io_read(inode);
+	result = nfs_revalidate_mapping(inode, iocb->ki_filp->f_mapping);
 	if (!result) {
 		result = generic_file_read_iter(iocb, to);
 		if (result > 0)
 			nfs_add_stats(inode, NFSIOS_NORMALREADBYTES, result);
 	}
+	nfs_end_io_read(inode);
 	return result;
 }
 EXPORT_SYMBOL_GPL(nfs_file_read);
@@ -191,12 +193,14 @@
 	dprintk("NFS: splice_read(%pD2, %lu@%Lu)\n",
 		filp, (unsigned long) count, (unsigned long long) *ppos);
 
-	res = nfs_revalidate_mapping_protected(inode, filp->f_mapping);
+	nfs_start_io_read(inode);
+	res = nfs_revalidate_mapping(inode, filp->f_mapping);
 	if (!res) {
 		res = generic_file_splice_read(filp, ppos, pipe, count, flags);
 		if (res > 0)
 			nfs_add_stats(inode, NFSIOS_NORMALREADBYTES, res);
 	}
+	nfs_end_io_read(inode);
 	return res;
 }
 EXPORT_SYMBOL_GPL(nfs_file_splice_read);
@@ -272,16 +276,13 @@
 
 	trace_nfs_fsync_enter(inode);
 
-	inode_dio_wait(inode);
 	do {
 		ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
 		if (ret != 0)
 			break;
-		inode_lock(inode);
 		ret = nfs_file_fsync_commit(file, start, end, datasync);
 		if (!ret)
 			ret = pnfs_sync_inode(inode, !!datasync);
-		inode_unlock(inode);
 		/*
 		 * If nfs_file_fsync_commit detected a server reboot, then
 		 * resend all dirty pages that might have been covered by
@@ -359,19 +360,6 @@
 		file, mapping->host->i_ino, len, (long long) pos);
 
 start:
-	/*
-	 * Prevent starvation issues if someone is doing a consistency
-	 * sync-to-disk
-	 */
-	ret = wait_on_bit_action(&NFS_I(mapping->host)->flags, NFS_INO_FLUSHING,
-				 nfs_wait_bit_killable, TASK_KILLABLE);
-	if (ret)
-		return ret;
-	/*
-	 * Wait for O_DIRECT to complete
-	 */
-	inode_dio_wait(mapping->host);
-
 	page = grab_cache_page_write_begin(mapping, index, flags);
 	if (!page)
 		return -ENOMEM;
@@ -432,7 +420,7 @@
 		return status;
 	NFS_I(mapping->host)->write_io += copied;
 
-	if (nfs_ctx_key_to_expire(ctx)) {
+	if (nfs_ctx_key_to_expire(ctx, mapping->host)) {
 		status = nfs_wb_all(mapping->host);
 		if (status < 0)
 			return status;
@@ -470,31 +458,8 @@
  */
 static int nfs_release_page(struct page *page, gfp_t gfp)
 {
-	struct address_space *mapping = page->mapping;
-
 	dfprintk(PAGECACHE, "NFS: release_page(%p)\n", page);
 
-	/* Always try to initiate a 'commit' if relevant, but only
-	 * wait for it if the caller allows blocking.  Even then,
-	 * only wait 1 second and only if the 'bdi' is not congested.
-	 * Waiting indefinitely can cause deadlocks when the NFS
-	 * server is on this machine, when a new TCP connection is
-	 * needed and in other rare cases.  There is no particular
-	 * need to wait extensively here.  A short wait has the
-	 * benefit that someone else can worry about the freezer.
-	 */
-	if (mapping) {
-		struct nfs_server *nfss = NFS_SERVER(mapping->host);
-		nfs_commit_inode(mapping->host, 0);
-		if (gfpflags_allow_blocking(gfp) &&
-		    !bdi_write_congested(&nfss->backing_dev_info)) {
-			wait_on_page_bit_killable_timeout(page, PG_private,
-							  HZ);
-			if (PagePrivate(page))
-				set_bdi_congested(&nfss->backing_dev_info,
-						  BLK_RW_ASYNC);
-		}
-	}
 	/* If PagePrivate() is set, then the page is not freeable */
 	if (PagePrivate(page))
 		return 0;
@@ -604,6 +569,8 @@
 		filp, filp->f_mapping->host->i_ino,
 		(long long)page_offset(page));
 
+	sb_start_pagefault(inode->i_sb);
+
 	/* make sure the cache has finished storing the page */
 	nfs_fscache_wait_on_page_write(NFS_I(inode), page);
 
@@ -630,6 +597,7 @@
 out_unlock:
 	unlock_page(page);
 out:
+	sb_end_pagefault(inode->i_sb);
 	return ret;
 }
 
@@ -645,7 +613,7 @@
 
 	ctx = nfs_file_open_context(filp);
 	if (test_bit(NFS_CONTEXT_ERROR_WRITE, &ctx->flags) ||
-	    nfs_ctx_key_to_expire(ctx))
+	    nfs_ctx_key_to_expire(ctx, inode))
 		return 1;
 	return 0;
 }
@@ -656,23 +624,17 @@
 	struct inode *inode = file_inode(file);
 	unsigned long written = 0;
 	ssize_t result;
-	size_t count = iov_iter_count(from);
 
 	result = nfs_key_timeout_notify(file, inode);
 	if (result)
 		return result;
 
-	if (iocb->ki_flags & IOCB_DIRECT) {
-		result = generic_write_checks(iocb, from);
-		if (result <= 0)
-			return result;
+	if (iocb->ki_flags & IOCB_DIRECT)
 		return nfs_file_direct_write(iocb, from);
-	}
 
 	dprintk("NFS: write(%pD2, %zu@%Ld)\n",
-		file, count, (long long) iocb->ki_pos);
+		file, iov_iter_count(from), (long long) iocb->ki_pos);
 
-	result = -EBUSY;
 	if (IS_SWAPFILE(inode))
 		goto out_swapfile;
 	/*
@@ -684,28 +646,33 @@
 			goto out;
 	}
 
-	result = count;
-	if (!count)
+	nfs_start_io_write(inode);
+	result = generic_write_checks(iocb, from);
+	if (result > 0) {
+		current->backing_dev_info = inode_to_bdi(inode);
+		result = generic_perform_write(file, from, iocb->ki_pos);
+		current->backing_dev_info = NULL;
+	}
+	nfs_end_io_write(inode);
+	if (result <= 0)
 		goto out;
 
-	result = generic_file_write_iter(iocb, from);
-	if (result > 0)
-		written = result;
+	written = generic_write_sync(iocb, result);
+	iocb->ki_pos += written;
 
 	/* Return error values */
-	if (result >= 0 && nfs_need_check_write(file, inode)) {
+	if (nfs_need_check_write(file, inode)) {
 		int err = vfs_fsync(file, 0);
 		if (err < 0)
 			result = err;
 	}
-	if (result > 0)
-		nfs_add_stats(inode, NFSIOS_NORMALWRITTENBYTES, written);
+	nfs_add_stats(inode, NFSIOS_NORMALWRITTENBYTES, written);
 out:
 	return result;
 
 out_swapfile:
 	printk(KERN_INFO "NFS: attempt to write to active swap file!\n");
-	goto out;
+	return -EBUSY;
 }
 EXPORT_SYMBOL_GPL(nfs_file_write);
 
@@ -780,11 +747,6 @@
 }
 
 static int
-is_time_granular(struct timespec *ts) {
-	return ((ts->tv_sec == 0) && (ts->tv_nsec <= 1000));
-}
-
-static int
 do_setlk(struct file *filp, int cmd, struct file_lock *fl, int is_local)
 {
 	struct inode *inode = filp->f_mapping->host;
@@ -817,12 +779,8 @@
 	 * This makes locking act as a cache coherency point.
 	 */
 	nfs_sync_mapping(filp->f_mapping);
-	if (!NFS_PROTO(inode)->have_delegation(inode, FMODE_READ)) {
-		if (is_time_granular(&NFS_SERVER(inode)->time_delta))
-			__nfs_revalidate_inode(NFS_SERVER(inode), inode);
-		else
-			nfs_zap_caches(inode);
-	}
+	if (!NFS_PROTO(inode)->have_delegation(inode, FMODE_READ))
+		nfs_zap_mapping(inode, filp->f_mapping);
 out:
 	return status;
 }
diff --git a/fs/nfs/filelayout/filelayout.c b/fs/nfs/filelayout/filelayout.c
index aa59757..a3fc48b 100644
--- a/fs/nfs/filelayout/filelayout.c
+++ b/fs/nfs/filelayout/filelayout.c
@@ -255,13 +255,16 @@
 static void
 filelayout_set_layoutcommit(struct nfs_pgio_header *hdr)
 {
+	loff_t end_offs = 0;
 
 	if (FILELAYOUT_LSEG(hdr->lseg)->commit_through_mds ||
-	    hdr->res.verf->committed != NFS_DATA_SYNC)
+	    hdr->res.verf->committed == NFS_FILE_SYNC)
 		return;
+	if (hdr->res.verf->committed == NFS_DATA_SYNC)
+		end_offs = hdr->mds_offset + (loff_t)hdr->res.count;
 
-	pnfs_set_layoutcommit(hdr->inode, hdr->lseg,
-			hdr->mds_offset + hdr->res.count);
+	/* Note: if the write is unstable, don't set end_offs until commit */
+	pnfs_set_layoutcommit(hdr->inode, hdr->lseg, end_offs);
 	dprintk("%s inode %lu pls_end_pos %lu\n", __func__, hdr->inode->i_ino,
 		(unsigned long) NFS_I(hdr->inode)->layout->plh_lwb);
 }
@@ -354,6 +357,12 @@
 	}
 
 	filelayout_set_layoutcommit(hdr);
+
+	/* zero out the fattr */
+	hdr->fattr.valid = 0;
+	if (task->tk_status >= 0)
+		nfs_writeback_update_inode(hdr);
+
 	return 0;
 }
 
@@ -375,8 +384,7 @@
 		return -EAGAIN;
 	}
 
-	if (data->verf.committed == NFS_UNSTABLE)
-		pnfs_set_layoutcommit(data->inode, data->lseg, data->lwb);
+	pnfs_set_layoutcommit(data->inode, data->lseg, data->lwb);
 
 	return 0;
 }
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c
index 0e8018b..e6206ea 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.c
+++ b/fs/nfs/flexfilelayout/flexfilelayout.c
@@ -1325,15 +1325,16 @@
  * we always send layoutcommit after DS writes.
  */
 static void
-ff_layout_set_layoutcommit(struct nfs_pgio_header *hdr)
+ff_layout_set_layoutcommit(struct inode *inode,
+		struct pnfs_layout_segment *lseg,
+		loff_t end_offset)
 {
-	if (!ff_layout_need_layoutcommit(hdr->lseg))
+	if (!ff_layout_need_layoutcommit(lseg))
 		return;
 
-	pnfs_set_layoutcommit(hdr->inode, hdr->lseg,
-			hdr->mds_offset + hdr->res.count);
-	dprintk("%s inode %lu pls_end_pos %lu\n", __func__, hdr->inode->i_ino,
-		(unsigned long) NFS_I(hdr->inode)->layout->plh_lwb);
+	pnfs_set_layoutcommit(inode, lseg, end_offset);
+	dprintk("%s inode %lu pls_end_pos %llu\n", __func__, inode->i_ino,
+		(unsigned long long) NFS_I(inode)->layout->plh_lwb);
 }
 
 static bool
@@ -1469,6 +1470,7 @@
 static int ff_layout_write_done_cb(struct rpc_task *task,
 				struct nfs_pgio_header *hdr)
 {
+	loff_t end_offs = 0;
 	int err;
 
 	trace_nfs4_pnfs_write(hdr, task->tk_status);
@@ -1494,7 +1496,10 @@
 
 	if (hdr->res.verf->committed == NFS_FILE_SYNC ||
 	    hdr->res.verf->committed == NFS_DATA_SYNC)
-		ff_layout_set_layoutcommit(hdr);
+		end_offs = hdr->mds_offset + (loff_t)hdr->res.count;
+
+	/* Note: if the write is unstable, don't set end_offs until commit */
+	ff_layout_set_layoutcommit(hdr->inode, hdr->lseg, end_offs);
 
 	/* zero out fattr since we don't care DS attr at all */
 	hdr->fattr.valid = 0;
@@ -1530,9 +1535,7 @@
 		return -EAGAIN;
 	}
 
-	if (data->verf.committed == NFS_UNSTABLE
-	    && ff_layout_need_layoutcommit(data->lseg))
-		pnfs_set_layoutcommit(data->inode, data->lseg, data->lwb);
+	ff_layout_set_layoutcommit(data->inode, data->lseg, data->lwb);
 
 	return 0;
 }
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index dda689d..f108d58 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -662,9 +662,7 @@
 	trace_nfs_getattr_enter(inode);
 	/* Flush out writes to the server in order to update c/mtime.  */
 	if (S_ISREG(inode->i_mode)) {
-		inode_lock(inode);
-		err = nfs_sync_inode(inode);
-		inode_unlock(inode);
+		err = filemap_write_and_wait(inode->i_mapping);
 		if (err)
 			goto out;
 	}
@@ -879,7 +877,10 @@
 	struct nfs_inode *nfsi = NFS_I(inode);
 
 	spin_lock(&inode->i_lock);
-	list_add(&ctx->list, &nfsi->open_files);
+	if (ctx->mode & FMODE_WRITE)
+		list_add(&ctx->list, &nfsi->open_files);
+	else
+		list_add_tail(&ctx->list, &nfsi->open_files);
 	spin_unlock(&inode->i_lock);
 }
 EXPORT_SYMBOL_GPL(nfs_inode_attach_open_context);
@@ -972,6 +973,13 @@
 	if (NFS_STALE(inode))
 		goto out;
 
+	/* pNFS: Attributes aren't updated until we layoutcommit */
+	if (S_ISREG(inode->i_mode)) {
+		status = pnfs_sync_inode(inode, false);
+		if (status)
+			goto out;
+	}
+
 	status = -ENOMEM;
 	fattr = nfs_alloc_fattr();
 	if (fattr == NULL)
@@ -1122,14 +1130,12 @@
 }
 
 /**
- * __nfs_revalidate_mapping - Revalidate the pagecache
+ * nfs_revalidate_mapping - Revalidate the pagecache
  * @inode - pointer to host inode
  * @mapping - pointer to mapping
- * @may_lock - take inode->i_mutex?
  */
-static int __nfs_revalidate_mapping(struct inode *inode,
-		struct address_space *mapping,
-		bool may_lock)
+int nfs_revalidate_mapping(struct inode *inode,
+		struct address_space *mapping)
 {
 	struct nfs_inode *nfsi = NFS_I(inode);
 	unsigned long *bitlock = &nfsi->flags;
@@ -1178,12 +1184,7 @@
 	nfsi->cache_validity &= ~NFS_INO_INVALID_DATA;
 	spin_unlock(&inode->i_lock);
 	trace_nfs_invalidate_mapping_enter(inode);
-	if (may_lock) {
-		inode_lock(inode);
-		ret = nfs_invalidate_mapping(inode, mapping);
-		inode_unlock(inode);
-	} else
-		ret = nfs_invalidate_mapping(inode, mapping);
+	ret = nfs_invalidate_mapping(inode, mapping);
 	trace_nfs_invalidate_mapping_exit(inode, ret);
 
 	clear_bit_unlock(NFS_INO_INVALIDATING, bitlock);
@@ -1193,27 +1194,28 @@
 	return ret;
 }
 
-/**
- * nfs_revalidate_mapping - Revalidate the pagecache
- * @inode - pointer to host inode
- * @mapping - pointer to mapping
- */
-int nfs_revalidate_mapping(struct inode *inode, struct address_space *mapping)
+static bool nfs_file_has_writers(struct nfs_inode *nfsi)
 {
-	return __nfs_revalidate_mapping(inode, mapping, false);
+	struct inode *inode = &nfsi->vfs_inode;
+
+	assert_spin_locked(&inode->i_lock);
+
+	if (!S_ISREG(inode->i_mode))
+		return false;
+	if (list_empty(&nfsi->open_files))
+		return false;
+	/* Note: This relies on nfsi->open_files being ordered with writers
+	 *       being placed at the head of the list.
+	 *       See nfs_inode_attach_open_context()
+	 */
+	return (list_first_entry(&nfsi->open_files,
+			struct nfs_open_context,
+			list)->mode & FMODE_WRITE) == FMODE_WRITE;
 }
 
-/**
- * nfs_revalidate_mapping_protected - Revalidate the pagecache
- * @inode - pointer to host inode
- * @mapping - pointer to mapping
- *
- * Differs from nfs_revalidate_mapping() in that it grabs the inode->i_mutex
- * while invalidating the mapping.
- */
-int nfs_revalidate_mapping_protected(struct inode *inode, struct address_space *mapping)
+static bool nfs_file_has_buffered_writers(struct nfs_inode *nfsi)
 {
-	return __nfs_revalidate_mapping(inode, mapping, true);
+	return nfs_file_has_writers(nfsi) && nfs_file_io_is_buffered(nfsi);
 }
 
 static unsigned long nfs_wcc_update_inode(struct inode *inode, struct nfs_fattr *fattr)
@@ -1280,22 +1282,24 @@
 	if ((fattr->valid & NFS_ATTR_FATTR_TYPE) && (inode->i_mode & S_IFMT) != (fattr->mode & S_IFMT))
 		return -EIO;
 
-	if ((fattr->valid & NFS_ATTR_FATTR_CHANGE) != 0 &&
-			inode->i_version != fattr->change_attr)
-		invalid |= NFS_INO_INVALID_ATTR|NFS_INO_REVAL_PAGECACHE;
+	if (!nfs_file_has_buffered_writers(nfsi)) {
+		/* Verify a few of the more important attributes */
+		if ((fattr->valid & NFS_ATTR_FATTR_CHANGE) != 0 && inode->i_version != fattr->change_attr)
+			invalid |= NFS_INO_INVALID_ATTR | NFS_INO_REVAL_PAGECACHE;
 
-	/* Verify a few of the more important attributes */
-	if ((fattr->valid & NFS_ATTR_FATTR_MTIME) && !timespec_equal(&inode->i_mtime, &fattr->mtime))
-		invalid |= NFS_INO_INVALID_ATTR;
+		if ((fattr->valid & NFS_ATTR_FATTR_MTIME) && !timespec_equal(&inode->i_mtime, &fattr->mtime))
+			invalid |= NFS_INO_INVALID_ATTR;
 
-	if (fattr->valid & NFS_ATTR_FATTR_SIZE) {
-		cur_size = i_size_read(inode);
-		new_isize = nfs_size_to_loff_t(fattr->size);
-		if (cur_size != new_isize)
-			invalid |= NFS_INO_INVALID_ATTR|NFS_INO_REVAL_PAGECACHE;
+		if ((fattr->valid & NFS_ATTR_FATTR_CTIME) && !timespec_equal(&inode->i_ctime, &fattr->ctime))
+			invalid |= NFS_INO_INVALID_ATTR;
+
+		if (fattr->valid & NFS_ATTR_FATTR_SIZE) {
+			cur_size = i_size_read(inode);
+			new_isize = nfs_size_to_loff_t(fattr->size);
+			if (cur_size != new_isize)
+				invalid |= NFS_INO_INVALID_ATTR|NFS_INO_REVAL_PAGECACHE;
+		}
 	}
-	if (nfsi->nrequests != 0)
-		invalid &= ~NFS_INO_REVAL_PAGECACHE;
 
 	/* Have any file permissions changed? */
 	if ((fattr->valid & NFS_ATTR_FATTR_MODE) && (inode->i_mode & S_IALLUGO) != (fattr->mode & S_IALLUGO))
@@ -1470,28 +1474,12 @@
 		((long)nfsi->attr_gencount - (long)nfs_read_attr_generation_counter() > 0);
 }
 
-/*
- * Don't trust the change_attribute, mtime, ctime or size if
- * a pnfs LAYOUTCOMMIT is outstanding
- */
-static void nfs_inode_attrs_handle_layoutcommit(struct inode *inode,
-		struct nfs_fattr *fattr)
-{
-	if (pnfs_layoutcommit_outstanding(inode))
-		fattr->valid &= ~(NFS_ATTR_FATTR_CHANGE |
-				NFS_ATTR_FATTR_MTIME |
-				NFS_ATTR_FATTR_CTIME |
-				NFS_ATTR_FATTR_SIZE);
-}
-
 static int nfs_refresh_inode_locked(struct inode *inode, struct nfs_fattr *fattr)
 {
 	int ret;
 
 	trace_nfs_refresh_inode_enter(inode);
 
-	nfs_inode_attrs_handle_layoutcommit(inode, fattr);
-
 	if (nfs_inode_attrs_need_update(inode, fattr))
 		ret = nfs_update_inode(inode, fattr);
 	else
@@ -1527,7 +1515,7 @@
 
 static int nfs_post_op_update_inode_locked(struct inode *inode, struct nfs_fattr *fattr)
 {
-	unsigned long invalid = NFS_INO_INVALID_ATTR|NFS_INO_REVAL_PAGECACHE;
+	unsigned long invalid = NFS_INO_INVALID_ATTR;
 
 	/*
 	 * Don't revalidate the pagecache if we hold a delegation, but do
@@ -1676,7 +1664,8 @@
 	unsigned long invalid = 0;
 	unsigned long now = jiffies;
 	unsigned long save_cache_validity;
-	bool cache_revalidated = true;
+	bool have_writers = nfs_file_has_buffered_writers(nfsi);
+	bool cache_revalidated;
 
 	dfprintk(VFS, "NFS: %s(%s/%lu fh_crc=0x%08x ct=%d info=0x%x)\n",
 			__func__, inode->i_sb->s_id, inode->i_ino,
@@ -1725,17 +1714,23 @@
 	/* Do atomic weak cache consistency updates */
 	invalid |= nfs_wcc_update_inode(inode, fattr);
 
+
+	cache_revalidated = !pnfs_layoutcommit_outstanding(inode);
+
 	/* More cache consistency checks */
 	if (fattr->valid & NFS_ATTR_FATTR_CHANGE) {
 		if (inode->i_version != fattr->change_attr) {
 			dprintk("NFS: change_attr change on server for file %s/%ld\n",
 					inode->i_sb->s_id, inode->i_ino);
-			invalid |= NFS_INO_INVALID_ATTR
-				| NFS_INO_INVALID_DATA
-				| NFS_INO_INVALID_ACCESS
-				| NFS_INO_INVALID_ACL;
-			if (S_ISDIR(inode->i_mode))
-				nfs_force_lookup_revalidate(inode);
+			/* Could it be a race with writeback? */
+			if (!have_writers) {
+				invalid |= NFS_INO_INVALID_ATTR
+					| NFS_INO_INVALID_DATA
+					| NFS_INO_INVALID_ACCESS
+					| NFS_INO_INVALID_ACL;
+				if (S_ISDIR(inode->i_mode))
+					nfs_force_lookup_revalidate(inode);
+			}
 			inode->i_version = fattr->change_attr;
 		}
 	} else {
@@ -1768,9 +1763,10 @@
 		if (new_isize != cur_isize) {
 			/* Do we perhaps have any outstanding writes, or has
 			 * the file grown beyond our last write? */
-			if ((nfsi->nrequests == 0) || new_isize > cur_isize) {
+			if (nfsi->nrequests == 0 || new_isize > cur_isize) {
 				i_size_write(inode, new_isize);
-				invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA;
+				if (!have_writers)
+					invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA;
 			}
 			dprintk("NFS: isize change on server for file %s/%ld "
 					"(%Ld to %Ld)\n",
diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index 5154fa6..8de509b 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -66,13 +66,16 @@
 
 struct nfs_client_initdata {
 	unsigned long init_flags;
-	const char *hostname;
-	const struct sockaddr *addr;
+	const char *hostname;			/* Hostname of the server */
+	const struct sockaddr *addr;		/* Address of the server */
+	const char *nodename;			/* Hostname of the client */
+	const char *ip_addr;			/* IP address of the client */
 	size_t addrlen;
 	struct nfs_subversion *nfs_mod;
 	int proto;
 	u32 minorversion;
 	struct net *net;
+	const struct rpc_timeout *timeparms;
 };
 
 /*
@@ -147,9 +150,8 @@
 extern const struct rpc_program nfs_program;
 extern void nfs_clients_init(struct net *net);
 extern struct nfs_client *nfs_alloc_client(const struct nfs_client_initdata *);
-int nfs_create_rpc_client(struct nfs_client *, const struct rpc_timeout *, rpc_authflavor_t);
+int nfs_create_rpc_client(struct nfs_client *, const struct nfs_client_initdata *, rpc_authflavor_t);
 struct nfs_client *nfs_get_client(const struct nfs_client_initdata *,
-				  const struct rpc_timeout *, const char *,
 				  rpc_authflavor_t);
 int nfs_probe_fsinfo(struct nfs_server *server, struct nfs_fh *, struct nfs_fattr *);
 void nfs_server_insert_lists(struct nfs_server *);
@@ -184,7 +186,7 @@
 					   rpc_authflavor_t);
 extern int nfs_wait_client_init_complete(const struct nfs_client *clp);
 extern void nfs_mark_client_ready(struct nfs_client *clp, int state);
-extern struct nfs_client *nfs4_set_ds_client(struct nfs_client* mds_clp,
+extern struct nfs_client *nfs4_set_ds_client(struct nfs_server *mds_srv,
 					     const struct sockaddr *ds_addr,
 					     int ds_addrlen, int ds_proto,
 					     unsigned int ds_timeo,
@@ -193,7 +195,7 @@
 					     rpc_authflavor_t au_flavor);
 extern struct rpc_clnt *nfs4_find_or_create_ds_client(struct nfs_client *,
 						struct inode *);
-extern struct nfs_client *nfs3_set_ds_client(struct nfs_client *mds_clp,
+extern struct nfs_client *nfs3_set_ds_client(struct nfs_server *mds_srv,
 			const struct sockaddr *ds_addr, int ds_addrlen,
 			int ds_proto, unsigned int ds_timeo,
 			unsigned int ds_retrans, rpc_authflavor_t au_flavor);
@@ -338,8 +340,7 @@
 /* proc.c */
 void nfs_close_context(struct nfs_open_context *ctx, int is_sync);
 extern struct nfs_client *nfs_init_client(struct nfs_client *clp,
-			   const struct rpc_timeout *timeparms,
-			   const char *ip_addr);
+			   const struct nfs_client_initdata *);
 
 /* dir.c */
 extern void nfs_force_use_readdirplus(struct inode *dir);
@@ -411,6 +412,19 @@
 extern bool nfs_sb_active(struct super_block *sb);
 extern void nfs_sb_deactive(struct super_block *sb);
 
+/* io.c */
+extern void nfs_start_io_read(struct inode *inode);
+extern void nfs_end_io_read(struct inode *inode);
+extern void nfs_start_io_write(struct inode *inode);
+extern void nfs_end_io_write(struct inode *inode);
+extern void nfs_start_io_direct(struct inode *inode);
+extern void nfs_end_io_direct(struct inode *inode);
+
+static inline bool nfs_file_io_is_buffered(struct nfs_inode *nfsi)
+{
+	return test_bit(NFS_INO_ODIRECT, &nfsi->flags) == 0;
+}
+
 /* namespace.c */
 #define NFS_PATH_CANONICAL 1
 extern char *nfs_path(char **p, struct dentry *dentry,
@@ -496,9 +510,29 @@
 		    struct inode *inode,
 		    struct nfs_direct_req *dreq);
 int nfs_key_timeout_notify(struct file *filp, struct inode *inode);
-bool nfs_ctx_key_to_expire(struct nfs_open_context *ctx);
+bool nfs_ctx_key_to_expire(struct nfs_open_context *ctx, struct inode *inode);
 void nfs_pageio_stop_mirroring(struct nfs_pageio_descriptor *pgio);
 
+int nfs_filemap_write_and_wait_range(struct address_space *mapping,
+		loff_t lstart, loff_t lend);
+
+#ifdef CONFIG_NFS_V4_1
+static inline
+void nfs_clear_pnfs_ds_commit_verifiers(struct pnfs_ds_commit_info *cinfo)
+{
+	int i;
+
+	for (i = 0; i < cinfo->nbuckets; i++)
+		cinfo->buckets[i].direct_verf.committed = NFS_INVALID_STABLE_HOW;
+}
+#else
+static inline
+void nfs_clear_pnfs_ds_commit_verifiers(struct pnfs_ds_commit_info *cinfo)
+{
+}
+#endif
+
+
 #ifdef CONFIG_MIGRATION
 extern int nfs_migrate_page(struct address_space *,
 		struct page *, struct page *, enum migrate_mode);
@@ -506,6 +540,13 @@
 #define nfs_migrate_page NULL
 #endif
 
+static inline int
+nfs_write_verifier_cmp(const struct nfs_write_verifier *v1,
+		const struct nfs_write_verifier *v2)
+{
+	return memcmp(v1->data, v2->data, sizeof(v1->data));
+}
+
 /* unlink.c */
 extern struct rpc_task *
 nfs_async_rename(struct inode *old_dir, struct inode *new_dir,
@@ -521,8 +562,7 @@
 /* nfs4proc.c */
 extern void __nfs4_read_done_cb(struct nfs_pgio_header *);
 extern struct nfs_client *nfs4_init_client(struct nfs_client *clp,
-			    const struct rpc_timeout *timeparms,
-			    const char *ip_addr);
+			    const struct nfs_client_initdata *);
 extern int nfs40_walk_client_list(struct nfs_client *clp,
 				struct nfs_client **result,
 				struct rpc_cred *cred);
diff --git a/fs/nfs/io.c b/fs/nfs/io.c
new file mode 100644
index 0000000..1fc5d1c
--- /dev/null
+++ b/fs/nfs/io.c
@@ -0,0 +1,147 @@
+/*
+ * Copyright (c) 2016 Trond Myklebust
+ *
+ * I/O and data path helper functionality.
+ */
+
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/bitops.h>
+#include <linux/rwsem.h>
+#include <linux/fs.h>
+#include <linux/nfs_fs.h>
+
+#include "internal.h"
+
+/* Call with exclusively locked inode->i_rwsem */
+static void nfs_block_o_direct(struct nfs_inode *nfsi, struct inode *inode)
+{
+	if (test_bit(NFS_INO_ODIRECT, &nfsi->flags)) {
+		clear_bit(NFS_INO_ODIRECT, &nfsi->flags);
+		inode_dio_wait(inode);
+	}
+}
+
+/**
+ * nfs_start_io_read - declare the file is being used for buffered reads
+ * @inode - file inode
+ *
+ * Declare that a buffered read operation is about to start, and ensure
+ * that we block all direct I/O.
+ * On exit, the function ensures that the NFS_INO_ODIRECT flag is unset,
+ * and holds a shared lock on inode->i_rwsem to ensure that the flag
+ * cannot be changed.
+ * In practice, this means that buffered read operations are allowed to
+ * execute in parallel, thanks to the shared lock, whereas direct I/O
+ * operations need to wait to grab an exclusive lock in order to set
+ * NFS_INO_ODIRECT.
+ * Note that buffered writes and truncates both take a write lock on
+ * inode->i_rwsem, meaning that those are serialised w.r.t. the reads.
+ */
+void
+nfs_start_io_read(struct inode *inode)
+{
+	struct nfs_inode *nfsi = NFS_I(inode);
+	/* Be an optimist! */
+	down_read(&inode->i_rwsem);
+	if (test_bit(NFS_INO_ODIRECT, &nfsi->flags) == 0)
+		return;
+	up_read(&inode->i_rwsem);
+	/* Slow path.... */
+	down_write(&inode->i_rwsem);
+	nfs_block_o_direct(nfsi, inode);
+	downgrade_write(&inode->i_rwsem);
+}
+
+/**
+ * nfs_end_io_read - declare that the buffered read operation is done
+ * @inode - file inode
+ *
+ * Declare that a buffered read operation is done, and release the shared
+ * lock on inode->i_rwsem.
+ */
+void
+nfs_end_io_read(struct inode *inode)
+{
+	up_read(&inode->i_rwsem);
+}
+
+/**
+ * nfs_start_io_write - declare the file is being used for buffered writes
+ * @inode - file inode
+ *
+ * Declare that a buffered read operation is about to start, and ensure
+ * that we block all direct I/O.
+ */
+void
+nfs_start_io_write(struct inode *inode)
+{
+	down_write(&inode->i_rwsem);
+	nfs_block_o_direct(NFS_I(inode), inode);
+}
+
+/**
+ * nfs_end_io_write - declare that the buffered write operation is done
+ * @inode - file inode
+ *
+ * Declare that a buffered write operation is done, and release the
+ * lock on inode->i_rwsem.
+ */
+void
+nfs_end_io_write(struct inode *inode)
+{
+	up_write(&inode->i_rwsem);
+}
+
+/* Call with exclusively locked inode->i_rwsem */
+static void nfs_block_buffered(struct nfs_inode *nfsi, struct inode *inode)
+{
+	if (!test_bit(NFS_INO_ODIRECT, &nfsi->flags)) {
+		set_bit(NFS_INO_ODIRECT, &nfsi->flags);
+		nfs_wb_all(inode);
+	}
+}
+
+/**
+ * nfs_end_io_direct - declare the file is being used for direct i/o
+ * @inode - file inode
+ *
+ * Declare that a direct I/O operation is about to start, and ensure
+ * that we block all buffered I/O.
+ * On exit, the function ensures that the NFS_INO_ODIRECT flag is set,
+ * and holds a shared lock on inode->i_rwsem to ensure that the flag
+ * cannot be changed.
+ * In practice, this means that direct I/O operations are allowed to
+ * execute in parallel, thanks to the shared lock, whereas buffered I/O
+ * operations need to wait to grab an exclusive lock in order to clear
+ * NFS_INO_ODIRECT.
+ * Note that buffered writes and truncates both take a write lock on
+ * inode->i_rwsem, meaning that those are serialised w.r.t. O_DIRECT.
+ */
+void
+nfs_start_io_direct(struct inode *inode)
+{
+	struct nfs_inode *nfsi = NFS_I(inode);
+	/* Be an optimist! */
+	down_read(&inode->i_rwsem);
+	if (test_bit(NFS_INO_ODIRECT, &nfsi->flags) != 0)
+		return;
+	up_read(&inode->i_rwsem);
+	/* Slow path.... */
+	down_write(&inode->i_rwsem);
+	nfs_block_buffered(nfsi, inode);
+	downgrade_write(&inode->i_rwsem);
+}
+
+/**
+ * nfs_end_io_direct - declare that the direct i/o operation is done
+ * @inode - file inode
+ *
+ * Declare that a direct I/O operation is done, and release the shared
+ * lock on inode->i_rwsem.
+ */
+void
+nfs_end_io_direct(struct inode *inode)
+{
+	up_read(&inode->i_rwsem);
+}
diff --git a/fs/nfs/nfs3client.c b/fs/nfs/nfs3client.c
index 9e9fa34..ee75354 100644
--- a/fs/nfs/nfs3client.c
+++ b/fs/nfs/nfs3client.c
@@ -76,19 +76,23 @@
  * low timeout interval so that if a connection is lost, we retry through
  * the MDS.
  */
-struct nfs_client *nfs3_set_ds_client(struct nfs_client *mds_clp,
+struct nfs_client *nfs3_set_ds_client(struct nfs_server *mds_srv,
 		const struct sockaddr *ds_addr, int ds_addrlen,
 		int ds_proto, unsigned int ds_timeo, unsigned int ds_retrans,
 		rpc_authflavor_t au_flavor)
 {
+	struct rpc_timeout ds_timeout;
+	struct nfs_client *mds_clp = mds_srv->nfs_client;
 	struct nfs_client_initdata cl_init = {
 		.addr = ds_addr,
 		.addrlen = ds_addrlen,
+		.nodename = mds_clp->cl_rpcclient->cl_nodename,
+		.ip_addr = mds_clp->cl_ipaddr,
 		.nfs_mod = &nfs_v3,
 		.proto = ds_proto,
 		.net = mds_clp->cl_net,
+		.timeparms = &ds_timeout,
 	};
-	struct rpc_timeout ds_timeout;
 	struct nfs_client *clp;
 	char buf[INET6_ADDRSTRLEN + 1];
 
@@ -97,10 +101,12 @@
 		return ERR_PTR(-EINVAL);
 	cl_init.hostname = buf;
 
+	if (mds_srv->flags & NFS_MOUNT_NORESVPORT)
+		set_bit(NFS_CS_NORESVPORT, &cl_init.init_flags);
+
 	/* Use the MDS nfs_client cl_ipaddr. */
 	nfs_init_timeout_values(&ds_timeout, ds_proto, ds_timeo, ds_retrans);
-	clp = nfs_get_client(&cl_init, &ds_timeout, mds_clp->cl_ipaddr,
-			     au_flavor);
+	clp = nfs_get_client(&cl_init, au_flavor);
 
 	return clp;
 }
diff --git a/fs/nfs/nfs42proc.c b/fs/nfs/nfs42proc.c
index aa03ed0..33da841 100644
--- a/fs/nfs/nfs42proc.c
+++ b/fs/nfs/nfs42proc.c
@@ -113,15 +113,17 @@
 	if (!nfs_server_capable(inode, NFS_CAP_DEALLOCATE))
 		return -EOPNOTSUPP;
 
-	nfs_wb_all(inode);
 	inode_lock(inode);
+	err = nfs_sync_inode(inode);
+	if (err)
+		goto out_unlock;
 
 	err = nfs42_proc_fallocate(&msg, filep, offset, len);
 	if (err == 0)
 		truncate_pagecache_range(inode, offset, (offset + len) -1);
 	if (err == -EOPNOTSUPP)
 		NFS_SERVER(inode)->caps &= ~NFS_CAP_DEALLOCATE;
-
+out_unlock:
 	inode_unlock(inode);
 	return err;
 }
@@ -154,11 +156,20 @@
 	if (status)
 		return status;
 
+	status = nfs_filemap_write_and_wait_range(file_inode(src)->i_mapping,
+			pos_src, pos_src + (loff_t)count - 1);
+	if (status)
+		return status;
+
 	status = nfs4_set_rw_stateid(&args.dst_stateid, dst_lock->open_context,
 				     dst_lock, FMODE_WRITE);
 	if (status)
 		return status;
 
+	status = nfs_sync_inode(dst_inode);
+	if (status)
+		return status;
+
 	status = nfs4_call_sync(server->client, server, &msg,
 				&args.seq_args, &res.seq_res, 0);
 	if (status == -ENOTSUPP)
@@ -258,7 +269,11 @@
 	if (status)
 		return status;
 
-	nfs_wb_all(inode);
+	status = nfs_filemap_write_and_wait_range(inode->i_mapping,
+			offset, LLONG_MAX);
+	if (status)
+		return status;
+
 	status = nfs4_call_sync(server->client, server, &msg,
 				&args.seq_args, &res.seq_res, 0);
 	if (status == -ENOTSUPP)
@@ -336,8 +351,7 @@
 			 * Mark the bad layout state as invalid, then retry
 			 * with the current stateid.
 			 */
-			set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
-			pnfs_mark_matching_lsegs_invalid(lo, &head, NULL, 0);
+			pnfs_mark_layout_stateid_invalid(lo, &head);
 			spin_unlock(&inode->i_lock);
 			pnfs_free_lseg_list(&head);
 		} else
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index 768456f..4be567a 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -185,6 +185,7 @@
 struct nfs4_exception {
 	struct nfs4_state *state;
 	struct inode *inode;
+	nfs4_stateid *stateid;
 	long timeout;
 	unsigned char delay : 1,
 		      recovering : 1,
diff --git a/fs/nfs/nfs4client.c b/fs/nfs/nfs4client.c
index 10410e8..8d7d08d 100644
--- a/fs/nfs/nfs4client.c
+++ b/fs/nfs/nfs4client.c
@@ -349,10 +349,10 @@
  * Returns pointer to an NFS client, or an ERR_PTR value.
  */
 struct nfs_client *nfs4_init_client(struct nfs_client *clp,
-				    const struct rpc_timeout *timeparms,
-				    const char *ip_addr)
+				    const struct nfs_client_initdata *cl_init)
 {
 	char buf[INET6_ADDRSTRLEN + 1];
+	const char *ip_addr = cl_init->ip_addr;
 	struct nfs_client *old;
 	int error;
 
@@ -370,9 +370,9 @@
 	__set_bit(NFS_CS_DISCRTRY, &clp->cl_flags);
 	__set_bit(NFS_CS_NO_RETRANS_TIMEOUT, &clp->cl_flags);
 
-	error = nfs_create_rpc_client(clp, timeparms, RPC_AUTH_GSS_KRB5I);
+	error = nfs_create_rpc_client(clp, cl_init, RPC_AUTH_GSS_KRB5I);
 	if (error == -EINVAL)
-		error = nfs_create_rpc_client(clp, timeparms, RPC_AUTH_UNIX);
+		error = nfs_create_rpc_client(clp, cl_init, RPC_AUTH_UNIX);
 	if (error < 0)
 		goto error;
 
@@ -793,10 +793,12 @@
 		.hostname = hostname,
 		.addr = addr,
 		.addrlen = addrlen,
+		.ip_addr = ip_addr,
 		.nfs_mod = &nfs_v4,
 		.proto = proto,
 		.minorversion = minorversion,
 		.net = net,
+		.timeparms = timeparms,
 	};
 	struct nfs_client *clp;
 	int error;
@@ -809,7 +811,7 @@
 		set_bit(NFS_CS_MIGRATION, &cl_init.init_flags);
 
 	/* Allocate or find a client reference we can use */
-	clp = nfs_get_client(&cl_init, timeparms, ip_addr, authflavour);
+	clp = nfs_get_client(&cl_init, authflavour);
 	if (IS_ERR(clp)) {
 		error = PTR_ERR(clp);
 		goto error;
@@ -842,20 +844,24 @@
  * low timeout interval so that if a connection is lost, we retry through
  * the MDS.
  */
-struct nfs_client *nfs4_set_ds_client(struct nfs_client* mds_clp,
+struct nfs_client *nfs4_set_ds_client(struct nfs_server *mds_srv,
 		const struct sockaddr *ds_addr, int ds_addrlen,
 		int ds_proto, unsigned int ds_timeo, unsigned int ds_retrans,
 		u32 minor_version, rpc_authflavor_t au_flavor)
 {
+	struct rpc_timeout ds_timeout;
+	struct nfs_client *mds_clp = mds_srv->nfs_client;
 	struct nfs_client_initdata cl_init = {
 		.addr = ds_addr,
 		.addrlen = ds_addrlen,
+		.nodename = mds_clp->cl_rpcclient->cl_nodename,
+		.ip_addr = mds_clp->cl_ipaddr,
 		.nfs_mod = &nfs_v4,
 		.proto = ds_proto,
 		.minorversion = minor_version,
 		.net = mds_clp->cl_net,
+		.timeparms = &ds_timeout,
 	};
-	struct rpc_timeout ds_timeout;
 	struct nfs_client *clp;
 	char buf[INET6_ADDRSTRLEN + 1];
 
@@ -863,14 +869,16 @@
 		return ERR_PTR(-EINVAL);
 	cl_init.hostname = buf;
 
+	if (mds_srv->flags & NFS_MOUNT_NORESVPORT)
+		__set_bit(NFS_CS_NORESVPORT, &cl_init.init_flags);
+
 	/*
 	 * Set an authflavor equual to the MDS value. Use the MDS nfs_client
 	 * cl_ipaddr so as to use the same EXCHANGE_ID co_ownerid as the MDS
 	 * (section 13.1 RFC 5661).
 	 */
 	nfs_init_timeout_values(&ds_timeout, ds_proto, ds_timeo, ds_retrans);
-	clp = nfs_get_client(&cl_init, &ds_timeout, mds_clp->cl_ipaddr,
-			     au_flavor);
+	clp = nfs_get_client(&cl_init, au_flavor);
 
 	dprintk("<-- %s %p\n", __func__, clp);
 	return clp;
diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c
index 014b0e4..d085ad7 100644
--- a/fs/nfs/nfs4file.c
+++ b/fs/nfs/nfs4file.c
@@ -66,7 +66,7 @@
 	if (openflags & O_TRUNC) {
 		attr.ia_valid |= ATTR_SIZE;
 		attr.ia_size = 0;
-		nfs_sync_inode(inode);
+		filemap_write_and_wait(inode->i_mapping);
 	}
 
 	inode = NFS_PROTO(dir)->open_context(dir, ctx, openflags, &attr, NULL);
@@ -133,21 +133,9 @@
 				    struct file *file_out, loff_t pos_out,
 				    size_t count, unsigned int flags)
 {
-	struct inode *in_inode = file_inode(file_in);
-	struct inode *out_inode = file_inode(file_out);
-	int ret;
-
-	if (in_inode == out_inode)
+	if (file_inode(file_in) == file_inode(file_out))
 		return -EINVAL;
 
-	/* flush any pending writes */
-	ret = nfs_sync_inode(in_inode);
-	if (ret)
-		return ret;
-	ret = nfs_sync_inode(out_inode);
-	if (ret)
-		return ret;
-
 	return nfs42_proc_copy(file_in, pos_in, file_out, pos_out, count);
 }
 
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index ff416d0..d30f88c 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -363,6 +363,7 @@
 {
 	struct nfs_client *clp = server->nfs_client;
 	struct nfs4_state *state = exception->state;
+	const nfs4_stateid *stateid = exception->stateid;
 	struct inode *inode = exception->inode;
 	int ret = errorcode;
 
@@ -376,9 +377,18 @@
 		case -NFS4ERR_DELEG_REVOKED:
 		case -NFS4ERR_ADMIN_REVOKED:
 		case -NFS4ERR_BAD_STATEID:
-			if (inode && nfs_async_inode_return_delegation(inode,
-						NULL) == 0)
-				goto wait_on_recovery;
+			if (inode) {
+				int err;
+
+				err = nfs_async_inode_return_delegation(inode,
+						stateid);
+				if (err == 0)
+					goto wait_on_recovery;
+				if (stateid != NULL && stateid->type == NFS4_DELEGATION_STATEID_TYPE) {
+					exception->retry = 1;
+					break;
+				}
+			}
 			if (state == NULL)
 				break;
 			ret = nfs4_schedule_stateid_recovery(server, state);
@@ -427,6 +437,7 @@
 		case -NFS4ERR_DELAY:
 			nfs_inc_server_stats(server, NFSIOS_DELAY);
 		case -NFS4ERR_GRACE:
+		case -NFS4ERR_LAYOUTTRYLATER:
 		case -NFS4ERR_RECALLCONFLICT:
 			exception->delay = 1;
 			return 0;
@@ -2669,10 +2680,61 @@
 	return res;
 }
 
-static int _nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred,
-			    struct nfs_fattr *fattr, struct iattr *sattr,
-			    struct nfs4_state *state, struct nfs4_label *ilabel,
-			    struct nfs4_label *olabel)
+static int _nfs4_do_setattr(struct inode *inode,
+			    struct nfs_setattrargs *arg,
+			    struct nfs_setattrres *res,
+			    struct rpc_cred *cred,
+			    struct nfs4_state *state)
+{
+	struct nfs_server *server = NFS_SERVER(inode);
+        struct rpc_message msg = {
+		.rpc_proc	= &nfs4_procedures[NFSPROC4_CLNT_SETATTR],
+		.rpc_argp	= arg,
+		.rpc_resp	= res,
+		.rpc_cred	= cred,
+        };
+	struct rpc_cred *delegation_cred = NULL;
+	unsigned long timestamp = jiffies;
+	fmode_t fmode;
+	bool truncate;
+	int status;
+
+	nfs_fattr_init(res->fattr);
+
+	/* Servers should only apply open mode checks for file size changes */
+	truncate = (arg->iap->ia_valid & ATTR_SIZE) ? true : false;
+	fmode = truncate ? FMODE_WRITE : FMODE_READ;
+
+	if (nfs4_copy_delegation_stateid(inode, fmode, &arg->stateid, &delegation_cred)) {
+		/* Use that stateid */
+	} else if (truncate && state != NULL) {
+		struct nfs_lockowner lockowner = {
+			.l_owner = current->files,
+			.l_pid = current->tgid,
+		};
+		if (!nfs4_valid_open_stateid(state))
+			return -EBADF;
+		if (nfs4_select_rw_stateid(state, FMODE_WRITE, &lockowner,
+				&arg->stateid, &delegation_cred) == -EIO)
+			return -EBADF;
+	} else
+		nfs4_stateid_copy(&arg->stateid, &zero_stateid);
+	if (delegation_cred)
+		msg.rpc_cred = delegation_cred;
+
+	status = nfs4_call_sync(server->client, server, &msg, &arg->seq_args, &res->seq_res, 1);
+
+	put_rpccred(delegation_cred);
+	if (status == 0 && state != NULL)
+		renew_lease(server, timestamp);
+	trace_nfs4_setattr(inode, &arg->stateid, status);
+	return status;
+}
+
+static int nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred,
+			   struct nfs_fattr *fattr, struct iattr *sattr,
+			   struct nfs4_state *state, struct nfs4_label *ilabel,
+			   struct nfs4_label *olabel)
 {
 	struct nfs_server *server = NFS_SERVER(inode);
         struct nfs_setattrargs  arg = {
@@ -2687,67 +2749,19 @@
 		.label		= olabel,
 		.server		= server,
         };
-        struct rpc_message msg = {
-		.rpc_proc	= &nfs4_procedures[NFSPROC4_CLNT_SETATTR],
-		.rpc_argp	= &arg,
-		.rpc_resp	= &res,
-		.rpc_cred	= cred,
-        };
-	struct rpc_cred *delegation_cred = NULL;
-	unsigned long timestamp = jiffies;
-	fmode_t fmode;
-	bool truncate;
-	int status;
+	struct nfs4_exception exception = {
+		.state = state,
+		.inode = inode,
+		.stateid = &arg.stateid,
+	};
+	int err;
 
 	arg.bitmask = nfs4_bitmask(server, ilabel);
 	if (ilabel)
 		arg.bitmask = nfs4_bitmask(server, olabel);
 
-	nfs_fattr_init(fattr);
-
-	/* Servers should only apply open mode checks for file size changes */
-	truncate = (sattr->ia_valid & ATTR_SIZE) ? true : false;
-	fmode = truncate ? FMODE_WRITE : FMODE_READ;
-
-	if (nfs4_copy_delegation_stateid(inode, fmode, &arg.stateid, &delegation_cred)) {
-		/* Use that stateid */
-	} else if (truncate && state != NULL) {
-		struct nfs_lockowner lockowner = {
-			.l_owner = current->files,
-			.l_pid = current->tgid,
-		};
-		if (!nfs4_valid_open_stateid(state))
-			return -EBADF;
-		if (nfs4_select_rw_stateid(state, FMODE_WRITE, &lockowner,
-				&arg.stateid, &delegation_cred) == -EIO)
-			return -EBADF;
-	} else
-		nfs4_stateid_copy(&arg.stateid, &zero_stateid);
-	if (delegation_cred)
-		msg.rpc_cred = delegation_cred;
-
-	status = nfs4_call_sync(server->client, server, &msg, &arg.seq_args, &res.seq_res, 1);
-
-	put_rpccred(delegation_cred);
-	if (status == 0 && state != NULL)
-		renew_lease(server, timestamp);
-	trace_nfs4_setattr(inode, &arg.stateid, status);
-	return status;
-}
-
-static int nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred,
-			   struct nfs_fattr *fattr, struct iattr *sattr,
-			   struct nfs4_state *state, struct nfs4_label *ilabel,
-			   struct nfs4_label *olabel)
-{
-	struct nfs_server *server = NFS_SERVER(inode);
-	struct nfs4_exception exception = {
-		.state = state,
-		.inode = inode,
-	};
-	int err;
 	do {
-		err = _nfs4_do_setattr(inode, cred, fattr, sattr, state, ilabel, olabel);
+		err = _nfs4_do_setattr(inode, &arg, &res, cred, state);
 		switch (err) {
 		case -NFS4ERR_OPENMODE:
 			if (!(sattr->ia_valid & ATTR_SIZE)) {
@@ -4392,7 +4406,8 @@
 				 struct rpc_message *msg)
 {
 	hdr->timestamp   = jiffies;
-	hdr->pgio_done_cb = nfs4_read_done_cb;
+	if (!hdr->pgio_done_cb)
+		hdr->pgio_done_cb = nfs4_read_done_cb;
 	msg->rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_READ];
 	nfs4_init_sequence(&hdr->args.seq_args, &hdr->res.seq_res, 0);
 }
@@ -7869,11 +7884,13 @@
 	struct inode *inode = lgp->args.inode;
 	struct nfs_server *server = NFS_SERVER(inode);
 	struct pnfs_layout_hdr *lo;
-	int status = task->tk_status;
+	int nfs4err = task->tk_status;
+	int err, status = 0;
+	LIST_HEAD(head);
 
 	dprintk("--> %s tk_status => %d\n", __func__, -task->tk_status);
 
-	switch (status) {
+	switch (nfs4err) {
 	case 0:
 		goto out;
 
@@ -7905,45 +7922,42 @@
 			status = -EOVERFLOW;
 			goto out;
 		}
-		/* Fallthrough */
+		status = -EBUSY;
+		break;
 	case -NFS4ERR_RECALLCONFLICT:
-		nfs4_handle_exception(server, -NFS4ERR_RECALLCONFLICT,
-					exception);
 		status = -ERECALLCONFLICT;
-		goto out;
+		break;
 	case -NFS4ERR_EXPIRED:
 	case -NFS4ERR_BAD_STATEID:
 		exception->timeout = 0;
 		spin_lock(&inode->i_lock);
-		if (nfs4_stateid_match(&lgp->args.stateid,
+		lo = NFS_I(inode)->layout;
+		/* If the open stateid was bad, then recover it. */
+		if (!lo || test_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags) ||
+		    nfs4_stateid_match_other(&lgp->args.stateid,
 					&lgp->args.ctx->state->stateid)) {
 			spin_unlock(&inode->i_lock);
-			/* If the open stateid was bad, then recover it. */
 			exception->state = lgp->args.ctx->state;
 			break;
 		}
-		lo = NFS_I(inode)->layout;
-		if (lo && !test_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags) &&
-		    nfs4_stateid_match_other(&lgp->args.stateid, &lo->plh_stateid)) {
-			LIST_HEAD(head);
 
-			/*
-			 * Mark the bad layout state as invalid, then retry
-			 * with the current stateid.
-			 */
-			set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
-			pnfs_mark_matching_lsegs_invalid(lo, &head, NULL, 0);
-			spin_unlock(&inode->i_lock);
-			pnfs_free_lseg_list(&head);
-			status = -EAGAIN;
-			goto out;
-		} else
-			spin_unlock(&inode->i_lock);
+		/*
+		 * Mark the bad layout state as invalid, then retry
+		 */
+		pnfs_mark_layout_stateid_invalid(lo, &head);
+		spin_unlock(&inode->i_lock);
+		pnfs_free_lseg_list(&head);
+		status = -EAGAIN;
+		goto out;
 	}
 
-	status = nfs4_handle_exception(server, status, exception);
-	if (exception->retry)
-		status = -EAGAIN;
+	err = nfs4_handle_exception(server, nfs4err, exception);
+	if (!status) {
+		if (exception->retry)
+			status = -EAGAIN;
+		else
+			status = err;
+	}
 out:
 	dprintk("<-- %s\n", __func__);
 	return status;
@@ -8129,8 +8143,7 @@
 	spin_lock(&lo->plh_inode->i_lock);
 	pnfs_mark_matching_lsegs_invalid(lo, &freeme, &lrp->args.range,
 			be32_to_cpu(lrp->args.stateid.seqid));
-	pnfs_mark_layout_returned_if_empty(lo);
-	if (lrp->res.lrs_present)
+	if (lrp->res.lrs_present && pnfs_layout_is_valid(lo))
 		pnfs_set_layout_stateid(lo, &lrp->res.stateid, true);
 	pnfs_clear_layoutreturn_waitbit(lo);
 	spin_unlock(&lo->plh_inode->i_lock);
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 661e753..7bd3a5c 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -1985,9 +1985,14 @@
 	p = xdr_encode_hyper(p, args->lastbytewritten + 1);	/* length */
 	*p = cpu_to_be32(0); /* reclaim */
 	encode_nfs4_stateid(xdr, &args->stateid);
-	p = reserve_space(xdr, 20);
-	*p++ = cpu_to_be32(1); /* newoffset = TRUE */
-	p = xdr_encode_hyper(p, args->lastbytewritten);
+	if (args->lastbytewritten != U64_MAX) {
+		p = reserve_space(xdr, 20);
+		*p++ = cpu_to_be32(1); /* newoffset = TRUE */
+		p = xdr_encode_hyper(p, args->lastbytewritten);
+	} else {
+		p = reserve_space(xdr, 12);
+		*p++ = cpu_to_be32(0); /* newoffset = FALSE */
+	}
 	*p++ = cpu_to_be32(0); /* Never send time_modify_changed */
 	*p++ = cpu_to_be32(NFS_SERVER(args->inode)->pnfs_curr_ld->id);/* type */
 
diff --git a/fs/nfs/nfstrace.h b/fs/nfs/nfstrace.h
index 0b9e5cc..fe80a1c 100644
--- a/fs/nfs/nfstrace.h
+++ b/fs/nfs/nfstrace.h
@@ -37,7 +37,6 @@
 			{ 1 << NFS_INO_ADVISE_RDPLUS, "ADVISE_RDPLUS" }, \
 			{ 1 << NFS_INO_STALE, "STALE" }, \
 			{ 1 << NFS_INO_INVALIDATING, "INVALIDATING" }, \
-			{ 1 << NFS_INO_FLUSHING, "FLUSHING" }, \
 			{ 1 << NFS_INO_FSCACHE, "FSCACHE" }, \
 			{ 1 << NFS_INO_LAYOUTCOMMIT, "NEED_LAYOUTCOMMIT" }, \
 			{ 1 << NFS_INO_LAYOUTCOMMITTING, "LAYOUTCOMMIT" })
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 0fbe734..70806ca 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -259,7 +259,7 @@
  * is required.
  * Note that caller must hold inode->i_lock.
  */
-static int
+int
 pnfs_mark_layout_stateid_invalid(struct pnfs_layout_hdr *lo,
 		struct list_head *lseg_list)
 {
@@ -334,14 +334,17 @@
 }
 
 static void
-init_lseg(struct pnfs_layout_hdr *lo, struct pnfs_layout_segment *lseg)
+pnfs_init_lseg(struct pnfs_layout_hdr *lo, struct pnfs_layout_segment *lseg,
+		const struct pnfs_layout_range *range,
+		const nfs4_stateid *stateid)
 {
 	INIT_LIST_HEAD(&lseg->pls_list);
 	INIT_LIST_HEAD(&lseg->pls_lc_list);
 	atomic_set(&lseg->pls_refcount, 1);
-	smp_mb();
 	set_bit(NFS_LSEG_VALID, &lseg->pls_flags);
 	lseg->pls_layout = lo;
+	lseg->pls_range = *range;
+	lseg->pls_seq = be32_to_cpu(stateid->seqid);
 }
 
 static void pnfs_free_lseg(struct pnfs_layout_segment *lseg)
@@ -486,15 +489,6 @@
 	       (end2 == NFS4_MAX_UINT64 || end2 > start1);
 }
 
-static bool
-should_free_lseg(const struct pnfs_layout_range *lseg_range,
-		 const struct pnfs_layout_range *recall_range)
-{
-	return (recall_range->iomode == IOMODE_ANY ||
-		lseg_range->iomode == recall_range->iomode) &&
-	       pnfs_lseg_range_intersecting(lseg_range, recall_range);
-}
-
 static bool pnfs_lseg_dec_and_remove_zero(struct pnfs_layout_segment *lseg,
 		struct list_head *tmp_list)
 {
@@ -533,6 +527,27 @@
 	return (s32)(s1 - s2) > 0;
 }
 
+static bool
+pnfs_should_free_range(const struct pnfs_layout_range *lseg_range,
+		 const struct pnfs_layout_range *recall_range)
+{
+	return (recall_range->iomode == IOMODE_ANY ||
+		lseg_range->iomode == recall_range->iomode) &&
+	       pnfs_lseg_range_intersecting(lseg_range, recall_range);
+}
+
+static bool
+pnfs_match_lseg_recall(const struct pnfs_layout_segment *lseg,
+		const struct pnfs_layout_range *recall_range,
+		u32 seq)
+{
+	if (seq != 0 && pnfs_seqid_is_newer(lseg->pls_seq, seq))
+		return false;
+	if (recall_range == NULL)
+		return true;
+	return pnfs_should_free_range(&lseg->pls_range, recall_range);
+}
+
 /**
  * pnfs_mark_matching_lsegs_invalid - tear down lsegs or mark them for later
  * @lo: layout header containing the lsegs
@@ -562,10 +577,7 @@
 	if (list_empty(&lo->plh_segs))
 		return 0;
 	list_for_each_entry_safe(lseg, next, &lo->plh_segs, pls_list)
-		if (!recall_range ||
-		    should_free_lseg(&lseg->pls_range, recall_range)) {
-			if (seq && pnfs_seqid_is_newer(lseg->pls_seq, seq))
-				continue;
+		if (pnfs_match_lseg_recall(lseg, recall_range, seq)) {
 			dprintk("%s: freeing lseg %p iomode %d seq %u"
 				"offset %llu length %llu\n", __func__,
 				lseg, lseg->pls_range.iomode, lseg->pls_seq,
@@ -761,24 +773,25 @@
 pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo, const nfs4_stateid *new,
 			bool update_barrier)
 {
-	u32 oldseq, newseq, new_barrier;
-	int empty = list_empty(&lo->plh_segs);
+	u32 oldseq, newseq, new_barrier = 0;
+	bool invalid = !pnfs_layout_is_valid(lo);
 
 	oldseq = be32_to_cpu(lo->plh_stateid.seqid);
 	newseq = be32_to_cpu(new->seqid);
-	if (empty || pnfs_seqid_is_newer(newseq, oldseq)) {
+	if (invalid || pnfs_seqid_is_newer(newseq, oldseq)) {
 		nfs4_stateid_copy(&lo->plh_stateid, new);
-		if (update_barrier) {
-			new_barrier = be32_to_cpu(new->seqid);
-		} else {
-			/* Because of wraparound, we want to keep the barrier
-			 * "close" to the current seqids.
-			 */
-			new_barrier = newseq - atomic_read(&lo->plh_outstanding);
-		}
-		if (empty || pnfs_seqid_is_newer(new_barrier, lo->plh_barrier))
-			lo->plh_barrier = new_barrier;
+		/*
+		 * Because of wraparound, we want to keep the barrier
+		 * "close" to the current seqids.
+		 */
+		new_barrier = newseq - atomic_read(&lo->plh_outstanding);
 	}
+	if (update_barrier)
+		new_barrier = be32_to_cpu(new->seqid);
+	else if (new_barrier == 0)
+		return;
+	if (invalid || pnfs_seqid_is_newer(new_barrier, lo->plh_barrier))
+		lo->plh_barrier = new_barrier;
 }
 
 static bool
@@ -873,15 +886,37 @@
 	rpc_wake_up(&NFS_SERVER(lo->plh_inode)->roc_rpcwaitq);
 }
 
+static void
+pnfs_clear_layoutreturn_info(struct pnfs_layout_hdr *lo)
+{
+	lo->plh_return_iomode = 0;
+	lo->plh_return_seq = 0;
+	clear_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags);
+}
+
 static bool
-pnfs_prepare_layoutreturn(struct pnfs_layout_hdr *lo)
+pnfs_prepare_layoutreturn(struct pnfs_layout_hdr *lo,
+		nfs4_stateid *stateid,
+		enum pnfs_iomode *iomode)
 {
 	if (test_and_set_bit(NFS_LAYOUT_RETURN, &lo->plh_flags))
 		return false;
-	lo->plh_return_iomode = 0;
-	lo->plh_return_seq = 0;
 	pnfs_get_layout_hdr(lo);
-	clear_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags);
+	if (test_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags)) {
+		if (stateid != NULL) {
+			nfs4_stateid_copy(stateid, &lo->plh_stateid);
+			if (lo->plh_return_seq != 0)
+				stateid->seqid = cpu_to_be32(lo->plh_return_seq);
+		}
+		if (iomode != NULL)
+			*iomode = lo->plh_return_iomode;
+		pnfs_clear_layoutreturn_info(lo);
+		return true;
+	}
+	if (stateid != NULL)
+		nfs4_stateid_copy(stateid, &lo->plh_stateid);
+	if (iomode != NULL)
+		*iomode = IOMODE_ANY;
 	return true;
 }
 
@@ -949,10 +984,7 @@
 		enum pnfs_iomode iomode;
 		bool send;
 
-		nfs4_stateid_copy(&stateid, &lo->plh_stateid);
-		stateid.seqid = cpu_to_be32(lo->plh_return_seq);
-		iomode = lo->plh_return_iomode;
-		send = pnfs_prepare_layoutreturn(lo);
+		send = pnfs_prepare_layoutreturn(lo, &stateid, &iomode);
 		spin_unlock(&inode->i_lock);
 		if (send) {
 			/* Send an async layoutreturn so we dont deadlock */
@@ -989,7 +1021,6 @@
 		dprintk("NFS: %s no layout to return\n", __func__);
 		goto out;
 	}
-	nfs4_stateid_copy(&stateid, &nfsi->layout->plh_stateid);
 	/* Reference matched in nfs4_layoutreturn_release */
 	pnfs_get_layout_hdr(lo);
 	empty = list_empty(&lo->plh_segs);
@@ -1012,8 +1043,7 @@
 		goto out_put_layout_hdr;
 	}
 
-	set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
-	send = pnfs_prepare_layoutreturn(lo);
+	send = pnfs_prepare_layoutreturn(lo, &stateid, NULL);
 	spin_unlock(&ino->i_lock);
 	pnfs_free_lseg_list(&tmp_list);
 	if (send)
@@ -1080,11 +1110,10 @@
 			goto out_noroc;
 	}
 
-	nfs4_stateid_copy(&stateid, &lo->plh_stateid);
 	/* always send layoutreturn if being marked so */
-	if (test_and_clear_bit(NFS_LAYOUT_RETURN_REQUESTED,
-				   &lo->plh_flags))
-		layoutreturn = pnfs_prepare_layoutreturn(lo);
+	if (test_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags))
+		layoutreturn = pnfs_prepare_layoutreturn(lo,
+				&stateid, NULL);
 
 	list_for_each_entry_safe(lseg, tmp, &lo->plh_segs, pls_list)
 		/* If we are sending layoutreturn, invalidate all valid lsegs */
@@ -1132,7 +1161,6 @@
 
 	spin_lock(&ino->i_lock);
 	lo = NFS_I(ino)->layout;
-	pnfs_mark_layout_returned_if_empty(lo);
 	if (pnfs_seqid_is_newer(barrier, lo->plh_barrier))
 		lo->plh_barrier = barrier;
 	spin_unlock(&ino->i_lock);
@@ -1505,7 +1533,7 @@
 	struct pnfs_layout_segment *lseg = NULL;
 	nfs4_stateid stateid;
 	long timeout = 0;
-	unsigned long giveup = jiffies + rpc_get_timeout(server->client);
+	unsigned long giveup = jiffies + (clp->cl_lease_time << 1);
 	bool first;
 
 	if (!pnfs_enabled_sb(NFS_SERVER(ino))) {
@@ -1645,33 +1673,44 @@
 	lseg = send_layoutget(lo, ctx, &stateid, &arg, &timeout, gfp_flags);
 	trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				 PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET);
+	atomic_dec(&lo->plh_outstanding);
 	if (IS_ERR(lseg)) {
 		switch(PTR_ERR(lseg)) {
-		case -ERECALLCONFLICT:
+		case -EBUSY:
 			if (time_after(jiffies, giveup))
 				lseg = NULL;
+			break;
+		case -ERECALLCONFLICT:
+			/* Huh? We hold no layouts, how is there a recall? */
+			if (first) {
+				lseg = NULL;
+				break;
+			}
+			/* Destroy the existing layout and start over */
+			if (time_after(jiffies, giveup))
+				pnfs_destroy_layout(NFS_I(ino));
 			/* Fallthrough */
 		case -EAGAIN:
-			pnfs_put_layout_hdr(lo);
-			if (first)
-				pnfs_clear_first_layoutget(lo);
-			if (lseg) {
-				trace_pnfs_update_layout(ino, pos, count,
-					iomode, lo, lseg, PNFS_UPDATE_LAYOUT_RETRY);
-				goto lookup_again;
-			}
-			/* Fallthrough */
+			break;
 		default:
 			if (!nfs_error_is_fatal(PTR_ERR(lseg))) {
 				pnfs_layout_clear_fail_bit(lo, pnfs_iomode_to_fail_bit(iomode));
 				lseg = NULL;
 			}
+			goto out_put_layout_hdr;
+		}
+		if (lseg) {
+			if (first)
+				pnfs_clear_first_layoutget(lo);
+			trace_pnfs_update_layout(ino, pos, count,
+				iomode, lo, lseg, PNFS_UPDATE_LAYOUT_RETRY);
+			pnfs_put_layout_hdr(lo);
+			goto lookup_again;
 		}
 	} else {
 		pnfs_layout_clear_fail_bit(lo, pnfs_iomode_to_fail_bit(iomode));
 	}
 
-	atomic_dec(&lo->plh_outstanding);
 out_put_layout_hdr:
 	if (first)
 		pnfs_clear_first_layoutget(lo);
@@ -1735,9 +1774,7 @@
 		return lseg;
 	}
 
-	init_lseg(lo, lseg);
-	lseg->pls_range = res->range;
-	lseg->pls_seq = be32_to_cpu(res->stateid.seqid);
+	pnfs_init_lseg(lo, lseg, &res->range, &res->stateid);
 
 	spin_lock(&ino->i_lock);
 	if (pnfs_layoutgets_blocked(lo)) {
@@ -1758,16 +1795,19 @@
 		 * inode invalid, and don't bother validating the stateid
 		 * sequence number.
 		 */
-		pnfs_mark_matching_lsegs_invalid(lo, &free_me, NULL, 0);
+		pnfs_mark_layout_stateid_invalid(lo, &free_me);
 
 		nfs4_stateid_copy(&lo->plh_stateid, &res->stateid);
 		lo->plh_barrier = be32_to_cpu(res->stateid.seqid);
 	}
 
-	clear_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
-
 	pnfs_get_lseg(lseg);
 	pnfs_layout_insert_lseg(lo, lseg, &free_me);
+	if (!pnfs_layout_is_valid(lo)) {
+		pnfs_clear_layoutreturn_info(lo);
+		clear_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
+	}
+
 
 	if (res->return_on_close)
 		set_bit(NFS_LSEG_ROC, &lseg->pls_flags);
@@ -1787,14 +1827,14 @@
 pnfs_set_plh_return_info(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode,
 			 u32 seq)
 {
-	if (lo->plh_return_iomode == iomode)
-		return;
-	if (lo->plh_return_iomode != 0)
+	if (lo->plh_return_iomode != 0 && lo->plh_return_iomode != iomode)
 		iomode = IOMODE_ANY;
 	lo->plh_return_iomode = iomode;
 	set_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags);
-	if (!lo->plh_return_seq || pnfs_seqid_is_newer(seq, lo->plh_return_seq))
+	if (seq != 0) {
+		WARN_ON_ONCE(lo->plh_return_seq != 0 && lo->plh_return_seq != seq);
 		lo->plh_return_seq = seq;
+	}
 }
 
 /**
@@ -1824,7 +1864,7 @@
 	assert_spin_locked(&lo->plh_inode->i_lock);
 
 	list_for_each_entry_safe(lseg, next, &lo->plh_segs, pls_list)
-		if (should_free_lseg(&lseg->pls_range, return_range)) {
+		if (pnfs_match_lseg_recall(lseg, return_range, seq)) {
 			dprintk("%s: marking lseg %p iomode %d "
 				"offset %llu length %llu\n", __func__,
 				lseg, lseg->pls_range.iomode,
@@ -1855,19 +1895,17 @@
 	bool return_now = false;
 
 	spin_lock(&inode->i_lock);
-	pnfs_set_plh_return_info(lo, range.iomode, lseg->pls_seq);
+	pnfs_set_plh_return_info(lo, range.iomode, 0);
 	/*
 	 * mark all matching lsegs so that we are sure to have no live
 	 * segments at hand when sending layoutreturn. See pnfs_put_lseg()
 	 * for how it works.
 	 */
-	if (!pnfs_mark_matching_lsegs_return(lo, &free_me,
-						&range, lseg->pls_seq)) {
+	if (!pnfs_mark_matching_lsegs_return(lo, &free_me, &range, 0)) {
 		nfs4_stateid stateid;
-		enum pnfs_iomode iomode = lo->plh_return_iomode;
+		enum pnfs_iomode iomode;
 
-		nfs4_stateid_copy(&stateid, &lo->plh_stateid);
-		return_now = pnfs_prepare_layoutreturn(lo);
+		return_now = pnfs_prepare_layoutreturn(lo, &stateid, &iomode);
 		spin_unlock(&inode->i_lock);
 		if (return_now)
 			pnfs_send_layoutreturn(lo, &stateid, iomode, false);
@@ -2382,7 +2420,10 @@
 	nfs_fattr_init(&data->fattr);
 	data->args.bitmask = NFS_SERVER(inode)->cache_consistency_bitmask;
 	data->res.fattr = &data->fattr;
-	data->args.lastbytewritten = end_pos - 1;
+	if (end_pos != 0)
+		data->args.lastbytewritten = end_pos - 1;
+	else
+		data->args.lastbytewritten = U64_MAX;
 	data->res.server = NFS_SERVER(inode);
 
 	if (ld->prepare_layoutcommit) {
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index b21bd0b..31d99b2 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -268,6 +268,8 @@
 				struct list_head *tmp_list,
 				const struct pnfs_layout_range *recall_range,
 				u32 seq);
+int pnfs_mark_layout_stateid_invalid(struct pnfs_layout_hdr *lo,
+		struct list_head *lseg_list);
 bool pnfs_roc(struct inode *ino);
 void pnfs_roc_release(struct inode *ino);
 void pnfs_roc_set_barrier(struct inode *ino, u32 barrier);
@@ -375,6 +377,11 @@
 	return NFS_I(inode)->layout != NULL;
 }
 
+static inline bool pnfs_layout_is_valid(const struct pnfs_layout_hdr *lo)
+{
+	return test_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags) == 0;
+}
+
 static inline struct nfs4_deviceid_node *
 nfs4_get_deviceid(struct nfs4_deviceid_node *d)
 {
@@ -545,19 +552,6 @@
 	return 1 + end - offset;
 }
 
-/**
- * pnfs_mark_layout_returned_if_empty - marks the layout as returned
- * @lo: layout header
- *
- * Note: Caller must hold inode->i_lock
- */
-static inline void
-pnfs_mark_layout_returned_if_empty(struct pnfs_layout_hdr *lo)
-{
-	if (list_empty(&lo->plh_segs))
-		set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
-}
-
 static inline void
 pnfs_copy_range(struct pnfs_layout_range *dst,
 		const struct pnfs_layout_range *src)
@@ -629,6 +623,13 @@
 }
 
 static inline bool
+pnfs_layoutcommit_outstanding(struct inode *inode)
+{
+	return false;
+}
+
+
+static inline bool
 pnfs_roc(struct inode *ino)
 {
 	return false;
@@ -716,13 +717,6 @@
 	return false;
 }
 
-static inline bool
-pnfs_layoutcommit_outstanding(struct inode *inode)
-{
-	return false;
-}
-
-
 static inline struct nfs4_threshold *pnfs_mdsthreshold_alloc(void)
 {
 	return NULL;
diff --git a/fs/nfs/pnfs_nfs.c b/fs/nfs/pnfs_nfs.c
index b38e3c0..f3468b5 100644
--- a/fs/nfs/pnfs_nfs.c
+++ b/fs/nfs/pnfs_nfs.c
@@ -595,7 +595,7 @@
 }
 
 static struct nfs_client *(*get_v3_ds_connect)(
-			struct nfs_client *mds_clp,
+			struct nfs_server *mds_srv,
 			const struct sockaddr *ds_addr,
 			int ds_addrlen,
 			int ds_proto,
@@ -654,7 +654,7 @@
 			rpc_clnt_add_xprt(clp->cl_rpcclient, &xprt_args,
 					rpc_clnt_test_and_add_xprt, NULL);
 		} else
-			clp = get_v3_ds_connect(mds_srv->nfs_client,
+			clp = get_v3_ds_connect(mds_srv,
 					(struct sockaddr *)&da->da_addr,
 					da->da_addrlen, IPPROTO_TCP,
 					timeo, retrans, au_flavor);
@@ -690,7 +690,7 @@
 		dprintk("%s: DS %s: trying address %s\n",
 			__func__, ds->ds_remotestr, da->da_remotestr);
 
-		clp = nfs4_set_ds_client(mds_srv->nfs_client,
+		clp = nfs4_set_ds_client(mds_srv,
 					(struct sockaddr *)&da->da_addr,
 					da->da_addrlen, IPPROTO_TCP,
 					timeo, retrans, minor_version,
@@ -940,6 +940,13 @@
 int
 pnfs_nfs_generic_sync(struct inode *inode, bool datasync)
 {
+	int ret;
+
+	if (!pnfs_layoutcommit_outstanding(inode))
+		return 0;
+	ret = nfs_commit_inode(inode, FLUSH_SYNC);
+	if (ret < 0)
+		return ret;
 	if (datasync)
 		return 0;
 	return pnfs_layoutcommit_inode(inode, true);
diff --git a/fs/nfs/super.c b/fs/nfs/super.c
index 2137e02..18d446e 100644
--- a/fs/nfs/super.c
+++ b/fs/nfs/super.c
@@ -1684,6 +1684,7 @@
 {
 	rpc_authflavor_t flavor = RPC_AUTH_MAXFLAVOR;
 	unsigned int i;
+	int use_auth_null = false;
 
 	/*
 	 * If the sec= mount option is used, the specified flavor or AUTH_NULL
@@ -1691,14 +1692,21 @@
 	 *
 	 * AUTH_NULL has a special meaning when it's in the server list - it
 	 * means that the server will ignore the rpc creds, so any flavor
-	 * can be used.
+	 * can be used but still use the sec= that was specified.
 	 */
 	for (i = 0; i < count; i++) {
 		flavor = server_authlist[i];
 
-		if (nfs_auth_info_match(&args->auth_info, flavor) ||
-		    flavor == RPC_AUTH_NULL)
+		if (nfs_auth_info_match(&args->auth_info, flavor))
 			goto out;
+
+		if (flavor == RPC_AUTH_NULL)
+			use_auth_null = true;
+	}
+
+	if (use_auth_null) {
+		flavor = RPC_AUTH_NULL;
+		goto out;
 	}
 
 	dfprintk(MOUNT,
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index e1c74d3..510c5be 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -625,7 +625,7 @@
 	int err;
 
 	nfs_inc_stats(inode, NFSIOS_VFSWRITEPAGE);
-	nfs_pageio_init_write(&pgio, inode, wb_priority(wbc),
+	nfs_pageio_init_write(&pgio, inode, 0,
 				false, &nfs_async_write_completion_ops);
 	err = nfs_do_writepage(page, wbc, &pgio, launder);
 	nfs_pageio_complete(&pgio);
@@ -657,16 +657,9 @@
 int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc)
 {
 	struct inode *inode = mapping->host;
-	unsigned long *bitlock = &NFS_I(inode)->flags;
 	struct nfs_pageio_descriptor pgio;
 	int err;
 
-	/* Stop dirtying of new pages while we sync */
-	err = wait_on_bit_lock_action(bitlock, NFS_INO_FLUSHING,
-			nfs_wait_bit_killable, TASK_KILLABLE);
-	if (err)
-		goto out_err;
-
 	nfs_inc_stats(inode, NFSIOS_VFSWRITEPAGES);
 
 	nfs_pageio_init_write(&pgio, inode, wb_priority(wbc), false,
@@ -674,10 +667,6 @@
 	err = write_cache_pages(mapping, wbc, nfs_writepages_callback, &pgio);
 	nfs_pageio_complete(&pgio);
 
-	clear_bit_unlock(NFS_INO_FLUSHING, bitlock);
-	smp_mb__after_atomic();
-	wake_up_bit(bitlock, NFS_INO_FLUSHING);
-
 	if (err < 0)
 		goto out_err;
 	err = pgio.pg_error;
@@ -1195,9 +1184,11 @@
 /*
  * Test if the open context credential key is marked to expire soon.
  */
-bool nfs_ctx_key_to_expire(struct nfs_open_context *ctx)
+bool nfs_ctx_key_to_expire(struct nfs_open_context *ctx, struct inode *inode)
 {
-	return rpcauth_cred_key_to_expire(ctx->cred);
+	struct rpc_auth *auth = NFS_SERVER(inode)->client->cl_auth;
+
+	return rpcauth_cred_key_to_expire(auth, ctx->cred);
 }
 
 /*
@@ -1289,6 +1280,9 @@
 	dprintk("NFS:       nfs_updatepage(%pD2 %d@%lld)\n",
 		file, count, (long long)(page_file_offset(page) + offset));
 
+	if (!count)
+		goto out;
+
 	if (nfs_can_extend_write(file, page, inode)) {
 		count = max(count + offset, nfs_page_length(page));
 		offset = 0;
@@ -1299,7 +1293,7 @@
 		nfs_set_pageerror(page);
 	else
 		__set_page_dirty_nobuffers(page);
-
+out:
 	dprintk("NFS:       nfs_updatepage returns %d (isize %lld)\n",
 			status, (long long)i_size_read(inode));
 	return status;
@@ -1800,7 +1794,7 @@
 
 		/* Okay, COMMIT succeeded, apparently. Check the verifier
 		 * returned by the server against all stored verfs. */
-		if (!memcmp(&req->wb_verf, &data->verf.verifier, sizeof(req->wb_verf))) {
+		if (!nfs_write_verifier_cmp(&req->wb_verf, &data->verf.verifier)) {
 			/* We have a match */
 			nfs_inode_remove_request(req);
 			dprintk(" OK\n");
@@ -1924,6 +1918,24 @@
 EXPORT_SYMBOL_GPL(nfs_write_inode);
 
 /*
+ * Wrapper for filemap_write_and_wait_range()
+ *
+ * Needed for pNFS in order to ensure data becomes visible to the
+ * client.
+ */
+int nfs_filemap_write_and_wait_range(struct address_space *mapping,
+		loff_t lstart, loff_t lend)
+{
+	int ret;
+
+	ret = filemap_write_and_wait_range(mapping, lstart, lend);
+	if (ret == 0)
+		ret = pnfs_sync_inode(mapping->host, true);
+	return ret;
+}
+EXPORT_SYMBOL_GPL(nfs_filemap_write_and_wait_range);
+
+/*
  * flush the inode to disk.
  */
 int nfs_wb_all(struct inode *inode)
diff --git a/include/linux/nfs_fs.h b/include/linux/nfs_fs.h
index d71278c..810124b 100644
--- a/include/linux/nfs_fs.h
+++ b/include/linux/nfs_fs.h
@@ -205,12 +205,12 @@
 #define NFS_INO_STALE		(1)		/* possible stale inode */
 #define NFS_INO_ACL_LRU_SET	(2)		/* Inode is on the LRU list */
 #define NFS_INO_INVALIDATING	(3)		/* inode is being invalidated */
-#define NFS_INO_FLUSHING	(4)		/* inode is flushing out data */
 #define NFS_INO_FSCACHE		(5)		/* inode can be cached by FS-Cache */
 #define NFS_INO_FSCACHE_LOCK	(6)		/* FS-Cache cookie management lock */
 #define NFS_INO_LAYOUTCOMMIT	(9)		/* layoutcommit required */
 #define NFS_INO_LAYOUTCOMMITTING (10)		/* layoutcommit inflight */
 #define NFS_INO_LAYOUTSTATS	(11)		/* layoutstats inflight */
+#define NFS_INO_ODIRECT		(12)		/* I/O setting is O_DIRECT */
 
 static inline struct nfs_inode *NFS_I(const struct inode *inode)
 {
@@ -351,7 +351,6 @@
 extern int __nfs_revalidate_inode(struct nfs_server *, struct inode *);
 extern int nfs_revalidate_mapping(struct inode *inode, struct address_space *mapping);
 extern int nfs_revalidate_mapping_rcu(struct inode *inode);
-extern int nfs_revalidate_mapping_protected(struct inode *inode, struct address_space *mapping);
 extern int nfs_setattr(struct dentry *, struct iattr *);
 extern void nfs_setattr_update_inode(struct inode *inode, struct iattr *attr, struct nfs_fattr *);
 extern void nfs_setsecurity(struct inode *inode, struct nfs_fattr *fattr,
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index c304a11..82b81a1 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -1596,9 +1596,8 @@
 	int (*have_delegation)(struct inode *, fmode_t);
 	int (*return_delegation)(struct inode *);
 	struct nfs_client *(*alloc_client) (const struct nfs_client_initdata *);
-	struct nfs_client *
-		(*init_client) (struct nfs_client *, const struct rpc_timeout *,
-				const char *);
+	struct nfs_client *(*init_client) (struct nfs_client *,
+				const struct nfs_client_initdata *);
 	void	(*free_client) (struct nfs_client *);
 	struct nfs_server *(*create_server)(struct nfs_mount_info *, struct nfs_subversion *);
 	struct nfs_server *(*clone_server)(struct nfs_server *, struct nfs_fh *,
diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
index 3a40287..4ccf184 100644
--- a/include/linux/sunrpc/auth.h
+++ b/include/linux/sunrpc/auth.h
@@ -37,7 +37,6 @@
 
 /* auth_cred ac_flags bits */
 enum {
-	RPC_CRED_NO_CRKEY_TIMEOUT = 0, /* underlying cred has no key timeout */
 	RPC_CRED_KEY_EXPIRE_SOON = 1, /* underlying cred key will expire soon */
 	RPC_CRED_NOTIFY_TIMEOUT = 2,   /* nofity generic cred when underlying
 					key will expire soon */
@@ -82,6 +81,9 @@
 
 #define RPCAUTH_CRED_MAGIC	0x0f4aa4f0
 
+/* rpc_auth au_flags */
+#define RPCAUTH_AUTH_NO_CRKEY_TIMEOUT	0x0001 /* underlying cred has no key timeout */
+
 /*
  * Client authentication handle
  */
@@ -199,7 +201,7 @@
 void			rpcauth_clear_credcache(struct rpc_cred_cache *);
 int			rpcauth_key_timeout_notify(struct rpc_auth *,
 						struct rpc_cred *);
-bool			rpcauth_cred_key_to_expire(struct rpc_cred *);
+bool			rpcauth_cred_key_to_expire(struct rpc_auth *, struct rpc_cred *);
 char *			rpcauth_stringify_acceptor(struct rpc_cred *);
 
 static inline
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 05a1809..817af0b 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -230,6 +230,10 @@
 					struct rpc_task *);
 void		rpc_wake_up(struct rpc_wait_queue *);
 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
+struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
+					struct rpc_wait_queue *,
+					bool (*)(struct rpc_task *, void *),
+					void *);
 struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
 					bool (*)(struct rpc_task *, void *),
 					void *);
@@ -247,6 +251,7 @@
 int		rpc_init_mempool(void);
 void		rpc_destroy_mempool(void);
 extern struct workqueue_struct *rpciod_workqueue;
+extern struct workqueue_struct *xprtiod_workqueue;
 void		rpc_prepare_task(struct rpc_task *task);
 
 static inline int rpc_wait_for_completion_task(struct rpc_task *task)
diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index 0ece4ba..bef3fb0 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -80,6 +80,7 @@
 #define TCP_RPC_REPLY		(1UL << 6)
 
 #define XPRT_SOCK_CONNECTING	1U
+#define XPRT_SOCK_DATA_READY	(2)
 
 #endif /* __KERNEL__ */
 
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index 040ff62..a7e42f9 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -51,9 +51,7 @@
 	ret = kstrtoul(val, 0, &num);
 	if (ret == -EINVAL)
 		goto out_inval;
-	nbits = fls(num);
-	if (num > (1U << nbits))
-		nbits++;
+	nbits = fls(num - 1);
 	if (nbits > MAX_HASHTABLE_BITS || nbits < 2)
 		goto out_inval;
 	*(unsigned int *)kp->arg = nbits;
@@ -359,8 +357,10 @@
 EXPORT_SYMBOL_GPL(rpcauth_key_timeout_notify);
 
 bool
-rpcauth_cred_key_to_expire(struct rpc_cred *cred)
+rpcauth_cred_key_to_expire(struct rpc_auth *auth, struct rpc_cred *cred)
 {
+	if (auth->au_flags & RPCAUTH_AUTH_NO_CRKEY_TIMEOUT)
+		return false;
 	if (!cred->cr_ops->crkey_to_expire)
 		return false;
 	return cred->cr_ops->crkey_to_expire(cred);
diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c
index 54dd3fd..1682195 100644
--- a/net/sunrpc/auth_generic.c
+++ b/net/sunrpc/auth_generic.c
@@ -224,7 +224,7 @@
 
 
 	/* Fast track for non crkey_timeout (no key) underlying credentials */
-	if (test_bit(RPC_CRED_NO_CRKEY_TIMEOUT, &acred->ac_flags))
+	if (auth->au_flags & RPCAUTH_AUTH_NO_CRKEY_TIMEOUT)
 		return 0;
 
 	/* Fast track for the normal case */
@@ -236,12 +236,6 @@
 	if (IS_ERR(tcred))
 		return -EACCES;
 
-	if (!tcred->cr_ops->crkey_timeout) {
-		set_bit(RPC_CRED_NO_CRKEY_TIMEOUT, &acred->ac_flags);
-		ret = 0;
-		goto out_put;
-	}
-
 	/* Test for the almost error case */
 	ret = tcred->cr_ops->crkey_timeout(tcred);
 	if (ret != 0) {
@@ -257,7 +251,6 @@
 		set_bit(RPC_CRED_NOTIFY_TIMEOUT, &acred->ac_flags);
 	}
 
-out_put:
 	put_rpccred(tcred);
 	return ret;
 }
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index bca3537..23c8e7c 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1015,6 +1015,7 @@
 	auth = &gss_auth->rpc_auth;
 	auth->au_cslack = GSS_CRED_SLACK >> 2;
 	auth->au_rslack = GSS_VERF_SLACK >> 2;
+	auth->au_flags = 0;
 	auth->au_ops = &authgss_ops;
 	auth->au_flavor = flavor;
 	if (gss_pseudoflavor_to_datatouch(gss_auth->mech, flavor))
diff --git a/net/sunrpc/auth_null.c b/net/sunrpc/auth_null.c
index 8d9eb4d..4d17376 100644
--- a/net/sunrpc/auth_null.c
+++ b/net/sunrpc/auth_null.c
@@ -115,6 +115,7 @@
 struct rpc_auth null_auth = {
 	.au_cslack	= NUL_CALLSLACK,
 	.au_rslack	= NUL_REPLYSLACK,
+	.au_flags	= RPCAUTH_AUTH_NO_CRKEY_TIMEOUT,
 	.au_ops		= &authnull_ops,
 	.au_flavor	= RPC_AUTH_NULL,
 	.au_count	= ATOMIC_INIT(0),
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
index 9f65452..a99278c 100644
--- a/net/sunrpc/auth_unix.c
+++ b/net/sunrpc/auth_unix.c
@@ -228,6 +228,7 @@
 struct rpc_auth		unix_auth = {
 	.au_cslack	= UNX_CALLSLACK,
 	.au_rslack	= NUL_REPLYSLACK,
+	.au_flags	= RPCAUTH_AUTH_NO_CRKEY_TIMEOUT,
 	.au_ops		= &authunix_ops,
 	.au_flavor	= RPC_AUTH_UNIX,
 	.au_count	= ATOMIC_INIT(0),
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 2808d55..cb49898 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -2577,7 +2577,7 @@
 	kfree(data);
 }
 
-const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
+static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
 	.rpc_call_done = rpc_cb_add_xprt_done,
 	.rpc_release = rpc_cb_add_xprt_release,
 };
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index fcfd48d..9ae5885 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -54,7 +54,8 @@
 /*
  * rpciod-related stuff
  */
-struct workqueue_struct *rpciod_workqueue;
+struct workqueue_struct *rpciod_workqueue __read_mostly;
+struct workqueue_struct *xprtiod_workqueue __read_mostly;
 
 /*
  * Disable the timer for a given RPC task. Should be called with
@@ -329,7 +330,8 @@
  * lockless RPC_IS_QUEUED() test) before we've had a chance to test
  * the RPC_TASK_RUNNING flag.
  */
-static void rpc_make_runnable(struct rpc_task *task)
+static void rpc_make_runnable(struct workqueue_struct *wq,
+		struct rpc_task *task)
 {
 	bool need_wakeup = !rpc_test_and_set_running(task);
 
@@ -338,7 +340,7 @@
 		return;
 	if (RPC_IS_ASYNC(task)) {
 		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
-		queue_work(rpciod_workqueue, &task->u.tk_work);
+		queue_work(wq, &task->u.tk_work);
 	} else
 		wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
 }
@@ -407,13 +409,16 @@
 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
 
 /**
- * __rpc_do_wake_up_task - wake up a single rpc_task
+ * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
+ * @wq: workqueue on which to run task
  * @queue: wait queue
  * @task: task to be woken up
  *
  * Caller must hold queue->lock, and have cleared the task queued flag.
  */
-static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue,
+		struct rpc_task *task)
 {
 	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
 			task->tk_pid, jiffies);
@@ -428,7 +433,7 @@
 
 	__rpc_remove_wait_queue(queue, task);
 
-	rpc_make_runnable(task);
+	rpc_make_runnable(wq, task);
 
 	dprintk("RPC:       __rpc_wake_up_task done\n");
 }
@@ -436,16 +441,25 @@
 /*
  * Wake up a queued task while the queue lock is being held
  */
-static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue, struct rpc_task *task)
 {
 	if (RPC_IS_QUEUED(task)) {
 		smp_rmb();
 		if (task->tk_waitqueue == queue)
-			__rpc_do_wake_up_task(queue, task);
+			__rpc_do_wake_up_task_on_wq(wq, queue, task);
 	}
 }
 
 /*
+ * Wake up a queued task while the queue lock is being held
+ */
+static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
+{
+	rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
+}
+
+/*
  * Wake up a task on a specific queue
  */
 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
@@ -518,7 +532,8 @@
 /*
  * Wake up the first task on the wait queue.
  */
-struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
+struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue,
 		bool (*func)(struct rpc_task *, void *), void *data)
 {
 	struct rpc_task	*task = NULL;
@@ -529,7 +544,7 @@
 	task = __rpc_find_next_queued(queue);
 	if (task != NULL) {
 		if (func(task, data))
-			rpc_wake_up_task_queue_locked(queue, task);
+			rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
 		else
 			task = NULL;
 	}
@@ -537,6 +552,15 @@
 
 	return task;
 }
+
+/*
+ * Wake up the first task on the wait queue.
+ */
+struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
+		bool (*func)(struct rpc_task *, void *), void *data)
+{
+	return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
+}
 EXPORT_SYMBOL_GPL(rpc_wake_up_first);
 
 static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
@@ -814,7 +838,7 @@
 	bool is_async = RPC_IS_ASYNC(task);
 
 	rpc_set_active(task);
-	rpc_make_runnable(task);
+	rpc_make_runnable(rpciod_workqueue, task);
 	if (!is_async)
 		__rpc_execute(task);
 }
@@ -1071,10 +1095,22 @@
 	 * Create the rpciod thread and wait for it to start.
 	 */
 	dprintk("RPC:       creating workqueue rpciod\n");
-	/* Note: highpri because network receive is latency sensitive */
-	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
+	if (!wq)
+		goto out_failed;
 	rpciod_workqueue = wq;
-	return rpciod_workqueue != NULL;
+	/* Note: highpri because network receive is latency sensitive */
+	wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+	if (!wq)
+		goto free_rpciod;
+	xprtiod_workqueue = wq;
+	return 1;
+free_rpciod:
+	wq = rpciod_workqueue;
+	rpciod_workqueue = NULL;
+	destroy_workqueue(wq);
+out_failed:
+	return 0;
 }
 
 static void rpciod_stop(void)
@@ -1088,6 +1124,9 @@
 	wq = rpciod_workqueue;
 	rpciod_workqueue = NULL;
 	destroy_workqueue(wq);
+	wq = xprtiod_workqueue;
+	xprtiod_workqueue = NULL;
+	destroy_workqueue(wq);
 }
 
 void
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 216a138..8313960 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -220,7 +220,7 @@
 		clear_bit(XPRT_LOCKED, &xprt->state);
 		smp_mb__after_atomic();
 	} else
-		queue_work(rpciod_workqueue, &xprt->task_cleanup);
+		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 }
 
 /*
@@ -295,7 +295,8 @@
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 		return;
 
-	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt))
+	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
+				__xprt_lock_write_func, xprt))
 		return;
 	xprt_clear_locked(xprt);
 }
@@ -324,7 +325,8 @@
 		return;
 	if (RPCXPRT_CONGESTED(xprt))
 		goto out_unlock;
-	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))
+	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
+				__xprt_lock_write_cong_func, xprt))
 		return;
 out_unlock:
 	xprt_clear_locked(xprt);
@@ -645,7 +647,7 @@
 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	/* Try to schedule an autoclose RPC call */
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
-		queue_work(rpciod_workqueue, &xprt->task_cleanup);
+		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	xprt_wake_pending_tasks(xprt, -EAGAIN);
 	spin_unlock_bh(&xprt->transport_lock);
 }
@@ -672,7 +674,7 @@
 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	/* Try to schedule an autoclose RPC call */
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
-		queue_work(rpciod_workqueue, &xprt->task_cleanup);
+		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	xprt_wake_pending_tasks(xprt, -EAGAIN);
 out:
 	spin_unlock_bh(&xprt->transport_lock);
@@ -689,7 +691,7 @@
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 		goto out_abort;
 	spin_unlock(&xprt->transport_lock);
-	queue_work(rpciod_workqueue, &xprt->task_cleanup);
+	queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	return;
 out_abort:
 	spin_unlock(&xprt->transport_lock);
diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c
index e7fd769..66c9d63 100644
--- a/net/sunrpc/xprtmultipath.c
+++ b/net/sunrpc/xprtmultipath.c
@@ -271,14 +271,12 @@
 		xprt_switch_find_xprt_t find_next)
 {
 	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
-	struct list_head *head;
 
 	if (xps == NULL)
 		return NULL;
-	head = &xps->xps_xprt_list;
-	if (xps->xps_nxprts < 2)
-		return xprt_switch_find_first_entry(head);
-	return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next);
+	return xprt_switch_set_next_cursor(&xps->xps_xprt_list,
+			&xpi->xpi_cursor,
+			find_next);
 }
 
 static
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 7e2b2fa..111767a 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -124,7 +124,7 @@
 		.mode		= 0644,
 		.proc_handler	= proc_dointvec_minmax,
 		.extra1		= &xprt_min_resvport_limit,
-		.extra2		= &xprt_max_resvport_limit
+		.extra2		= &xprt_max_resvport
 	},
 	{
 		.procname	= "max_resvport",
@@ -132,7 +132,7 @@
 		.maxlen		= sizeof(unsigned int),
 		.mode		= 0644,
 		.proc_handler	= proc_dointvec_minmax,
-		.extra1		= &xprt_min_resvport_limit,
+		.extra1		= &xprt_min_resvport,
 		.extra2		= &xprt_max_resvport_limit
 	},
 	{
@@ -642,6 +642,7 @@
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 	struct xdr_buf *xdr = &req->rq_snd_buf;
 	bool zerocopy = true;
+	bool vm_wait = false;
 	int status;
 	int sent;
 
@@ -677,15 +678,33 @@
 			return 0;
 		}
 
+		WARN_ON_ONCE(sent == 0 && status == 0);
+
+		if (status == -EAGAIN ) {
+			/*
+			 * Return EAGAIN if we're sure we're hitting the
+			 * socket send buffer limits.
+			 */
+			if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
+				break;
+			/*
+			 * Did we hit a memory allocation failure?
+			 */
+			if (sent == 0) {
+				status = -ENOBUFS;
+				if (vm_wait)
+					break;
+				/* Retry, knowing now that we're below the
+				 * socket send buffer limit
+				 */
+				vm_wait = true;
+			}
+			continue;
+		}
 		if (status < 0)
 			break;
-		if (sent == 0) {
-			status = -EAGAIN;
-			break;
-		}
+		vm_wait = false;
 	}
-	if (status == -EAGAIN && sk_stream_is_writeable(transport->inet))
-		status = -ENOBUFS;
 
 	switch (status) {
 	case -ENOTSOCK:
@@ -755,11 +774,19 @@
 	sk->sk_error_report = transport->old_error_report;
 }
 
+static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
+{
+	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+
+	clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+}
+
 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
 {
 	smp_mb__before_atomic();
 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	clear_bit(XPRT_CLOSING, &xprt->state);
+	xs_sock_reset_state_flags(xprt);
 	smp_mb__after_atomic();
 }
 
@@ -962,10 +989,13 @@
 		goto out;
 	for (;;) {
 		skb = skb_recv_datagram(sk, 0, 1, &err);
-		if (skb == NULL)
+		if (skb != NULL) {
+			xs_local_data_read_skb(&transport->xprt, sk, skb);
+			skb_free_datagram(sk, skb);
+			continue;
+		}
+		if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
 			break;
-		xs_local_data_read_skb(&transport->xprt, sk, skb);
-		skb_free_datagram(sk, skb);
 	}
 out:
 	mutex_unlock(&transport->recv_mutex);
@@ -1043,10 +1073,13 @@
 		goto out;
 	for (;;) {
 		skb = skb_recv_datagram(sk, 0, 1, &err);
-		if (skb == NULL)
+		if (skb != NULL) {
+			xs_udp_data_read_skb(&transport->xprt, sk, skb);
+			skb_free_datagram(sk, skb);
+			continue;
+		}
+		if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
 			break;
-		xs_udp_data_read_skb(&transport->xprt, sk, skb);
-		skb_free_datagram(sk, skb);
 	}
 out:
 	mutex_unlock(&transport->recv_mutex);
@@ -1074,7 +1107,14 @@
 	if (xprt != NULL) {
 		struct sock_xprt *transport = container_of(xprt,
 				struct sock_xprt, xprt);
-		queue_work(rpciod_workqueue, &transport->recv_worker);
+		transport->old_data_ready(sk);
+		/* Any data means we had a useful conversation, so
+		 * then we don't need to delay the next reconnect
+		 */
+		if (xprt->reestablish_timeout)
+			xprt->reestablish_timeout = 0;
+		if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+			queue_work(xprtiod_workqueue, &transport->recv_worker);
 	}
 	read_unlock_bh(&sk->sk_callback_lock);
 }
@@ -1474,10 +1514,15 @@
 	for (;;) {
 		lock_sock(sk);
 		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
-		release_sock(sk);
-		if (read <= 0)
-			break;
-		total += read;
+		if (read <= 0) {
+			clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+			release_sock(sk);
+			if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+				break;
+		} else {
+			release_sock(sk);
+			total += read;
+		}
 		rd_desc.count = 65536;
 	}
 out:
@@ -1493,34 +1538,6 @@
 }
 
 /**
- * xs_tcp_data_ready - "data ready" callback for TCP sockets
- * @sk: socket with data to read
- *
- */
-static void xs_tcp_data_ready(struct sock *sk)
-{
-	struct sock_xprt *transport;
-	struct rpc_xprt *xprt;
-
-	dprintk("RPC:       xs_tcp_data_ready...\n");
-
-	read_lock_bh(&sk->sk_callback_lock);
-	if (!(xprt = xprt_from_sock(sk)))
-		goto out;
-	transport = container_of(xprt, struct sock_xprt, xprt);
-
-	/* Any data means we had a useful conversation, so
-	 * the we don't need to delay the next reconnect
-	 */
-	if (xprt->reestablish_timeout)
-		xprt->reestablish_timeout = 0;
-	queue_work(rpciod_workqueue, &transport->recv_worker);
-
-out:
-	read_unlock_bh(&sk->sk_callback_lock);
-}
-
-/**
  * xs_tcp_state_change - callback to handle TCP socket state changes
  * @sk: socket whose state has changed
  *
@@ -1714,7 +1731,7 @@
 
 static unsigned short xs_get_random_port(void)
 {
-	unsigned short range = xprt_max_resvport - xprt_min_resvport;
+	unsigned short range = xprt_max_resvport - xprt_min_resvport + 1;
 	unsigned short rand = (unsigned short) prandom_u32() % range;
 	return rand + xprt_min_resvport;
 }
@@ -2241,7 +2258,7 @@
 		xs_save_old_callbacks(transport, sk);
 
 		sk->sk_user_data = xprt;
-		sk->sk_data_ready = xs_tcp_data_ready;
+		sk->sk_data_ready = xs_data_ready;
 		sk->sk_state_change = xs_tcp_state_change;
 		sk->sk_write_space = xs_tcp_write_space;
 		sock_set_flag(sk, SOCK_FASYNC);
@@ -2380,7 +2397,7 @@
 		/* Start by resetting any existing state */
 		xs_reset_transport(transport);
 
-		queue_delayed_work(rpciod_workqueue,
+		queue_delayed_work(xprtiod_workqueue,
 				   &transport->connect_worker,
 				   xprt->reestablish_timeout);
 		xprt->reestablish_timeout <<= 1;
@@ -2390,7 +2407,7 @@
 			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
 	} else {
 		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
-		queue_delayed_work(rpciod_workqueue,
+		queue_delayed_work(xprtiod_workqueue,
 				   &transport->connect_worker, 0);
 	}
 }
@@ -3153,8 +3170,12 @@
 
 static int param_set_portnr(const char *val, const struct kernel_param *kp)
 {
-	return param_set_uint_minmax(val, kp,
+	if (kp->arg == &xprt_min_resvport)
+		return param_set_uint_minmax(val, kp,
 			RPC_MIN_RESVPORT,
+			xprt_max_resvport);
+	return param_set_uint_minmax(val, kp,
+			xprt_min_resvport,
 			RPC_MAX_RESVPORT);
 }