Merge branch 'bugfixes'
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index 5427cdf..14358de 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -51,7 +51,7 @@ __be32 nfs4_callback_getattr(void *argp, void *resp,
 		goto out_iput;
 	res->size = i_size_read(inode);
 	res->change_attr = delegation->change_attr;
-	if (nfsi->nrequests != 0)
+	if (nfs_have_writebacks(inode))
 		res->change_attr++;
 	res->ctime = inode->i_ctime;
 	res->mtime = inode->i_mtime;
diff --git a/fs/nfs/delegation.c b/fs/nfs/delegation.c
index d7df5e6..606dd38 100644
--- a/fs/nfs/delegation.c
+++ b/fs/nfs/delegation.c
@@ -1089,7 +1089,7 @@ bool nfs4_delegation_flush_on_close(const struct inode *inode)
 	delegation = rcu_dereference(nfsi->delegation);
 	if (delegation == NULL || !(delegation->type & FMODE_WRITE))
 		goto out;
-	if (nfsi->nrequests < delegation->pagemod_limit)
+	if (atomic_long_read(&nfsi->nrequests) < delegation->pagemod_limit)
 		ret = false;
 out:
 	rcu_read_unlock();
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 6fb9fad..d2972d5 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -616,13 +616,13 @@ nfs_direct_write_scan_commit_list(struct inode *inode,
 				  struct list_head *list,
 				  struct nfs_commit_info *cinfo)
 {
-	spin_lock(&cinfo->inode->i_lock);
+	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
 #ifdef CONFIG_NFS_V4_1
 	if (cinfo->ds != NULL && cinfo->ds->nwritten != 0)
 		NFS_SERVER(inode)->pnfs_curr_ld->recover_commit_reqs(list, cinfo);
 #endif
 	nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
-	spin_unlock(&cinfo->inode->i_lock);
+	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 }
 
 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index 109279d..134d9f56 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -1285,7 +1285,6 @@ static bool nfs_file_has_buffered_writers(struct nfs_inode *nfsi)
 
 static unsigned long nfs_wcc_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 {
-	struct nfs_inode *nfsi = NFS_I(inode);
 	unsigned long ret = 0;
 
 	if ((fattr->valid & NFS_ATTR_FATTR_PRECHANGE)
@@ -1315,7 +1314,7 @@ static unsigned long nfs_wcc_update_inode(struct inode *inode, struct nfs_fattr
 	if ((fattr->valid & NFS_ATTR_FATTR_PRESIZE)
 			&& (fattr->valid & NFS_ATTR_FATTR_SIZE)
 			&& i_size_read(inode) == nfs_size_to_loff_t(fattr->pre_size)
-			&& nfsi->nrequests == 0) {
+			&& !nfs_have_writebacks(inode)) {
 		i_size_write(inode, nfs_size_to_loff_t(fattr->size));
 		ret |= NFS_INO_INVALID_ATTR;
 	}
@@ -1823,7 +1822,7 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 		if (new_isize != cur_isize) {
 			/* Do we perhaps have any outstanding writes, or has
 			 * the file grown beyond our last write? */
-			if (nfsi->nrequests == 0 || new_isize > cur_isize) {
+			if (!nfs_have_writebacks(inode) || new_isize > cur_isize) {
 				i_size_write(inode, new_isize);
 				if (!have_writers)
 					invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_DATA;
@@ -2012,10 +2011,11 @@ static void init_once(void *foo)
 	INIT_LIST_HEAD(&nfsi->access_cache_entry_lru);
 	INIT_LIST_HEAD(&nfsi->access_cache_inode_lru);
 	INIT_LIST_HEAD(&nfsi->commit_info.list);
-	nfsi->nrequests = 0;
-	nfsi->commit_info.ncommit = 0;
+	atomic_long_set(&nfsi->nrequests, 0);
+	atomic_long_set(&nfsi->commit_info.ncommit, 0);
 	atomic_set(&nfsi->commit_info.rpcs_out, 0);
 	init_rwsem(&nfsi->rmdir_sem);
+	mutex_init(&nfsi->commit_mutex);
 	nfs4_init_once(nfsi);
 }
 
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index d901326..08cc974 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -1659,12 +1659,52 @@ _nfs4_opendata_reclaim_to_nfs4_state(struct nfs4_opendata *data)
 	return state;
 }
 
+static struct inode *
+nfs4_opendata_get_inode(struct nfs4_opendata *data)
+{
+	struct inode *inode;
+
+	switch (data->o_arg.claim) {
+	case NFS4_OPEN_CLAIM_NULL:
+	case NFS4_OPEN_CLAIM_DELEGATE_CUR:
+	case NFS4_OPEN_CLAIM_DELEGATE_PREV:
+		if (!(data->f_attr.valid & NFS_ATTR_FATTR))
+			return ERR_PTR(-EAGAIN);
+		inode = nfs_fhget(data->dir->d_sb, &data->o_res.fh,
+				&data->f_attr, data->f_label);
+		break;
+	default:
+		inode = d_inode(data->dentry);
+		ihold(inode);
+		nfs_refresh_inode(inode, &data->f_attr);
+	}
+	return inode;
+}
+
+static struct nfs4_state *
+nfs4_opendata_find_nfs4_state(struct nfs4_opendata *data)
+{
+	struct nfs4_state *state;
+	struct inode *inode;
+
+	inode = nfs4_opendata_get_inode(data);
+	if (IS_ERR(inode))
+		return ERR_CAST(inode);
+	if (data->state != NULL && data->state->inode == inode) {
+		state = data->state;
+		atomic_inc(&state->count);
+	} else
+		state = nfs4_get_open_state(inode, data->owner);
+	iput(inode);
+	if (state == NULL)
+		state = ERR_PTR(-ENOMEM);
+	return state;
+}
+
 static struct nfs4_state *
 _nfs4_opendata_to_nfs4_state(struct nfs4_opendata *data)
 {
-	struct inode *inode;
-	struct nfs4_state *state = NULL;
-	int ret;
+	struct nfs4_state *state;
 
 	if (!data->rpc_done) {
 		state = nfs4_try_open_cached(data);
@@ -1672,29 +1712,17 @@ _nfs4_opendata_to_nfs4_state(struct nfs4_opendata *data)
 		goto out;
 	}
 
-	ret = -EAGAIN;
-	if (!(data->f_attr.valid & NFS_ATTR_FATTR))
-		goto err;
-	inode = nfs_fhget(data->dir->d_sb, &data->o_res.fh, &data->f_attr, data->f_label);
-	ret = PTR_ERR(inode);
-	if (IS_ERR(inode))
-		goto err;
-	ret = -ENOMEM;
-	state = nfs4_get_open_state(inode, data->owner);
-	if (state == NULL)
-		goto err_put_inode;
+	state = nfs4_opendata_find_nfs4_state(data);
+	if (IS_ERR(state))
+		goto out;
+
 	if (data->o_res.delegation_type != 0)
 		nfs4_opendata_check_deleg(data, state);
 	update_open_stateid(state, &data->o_res.stateid, NULL,
 			data->o_arg.fmode);
-	iput(inode);
 out:
 	nfs_release_seqid(data->o_arg.seqid);
 	return state;
-err_put_inode:
-	iput(inode);
-err:
-	return ERR_PTR(ret);
 }
 
 static struct nfs4_state *
@@ -2071,7 +2099,6 @@ static void nfs4_open_prepare(struct rpc_task *task, void *calldata)
 		data->o_arg.open_bitmap = &nfs4_open_noattr_bitmap[0];
 	case NFS4_OPEN_CLAIM_FH:
 		task->tk_msg.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_OPEN_NOATTR];
-		nfs_copy_fh(&data->o_res.fh, data->o_arg.fh);
 	}
 	data->timestamp = jiffies;
 	if (nfs4_setup_sequence(data->o_arg.server->nfs_client,
@@ -7318,7 +7345,9 @@ static int nfs4_sp4_select_mode(struct nfs_client *clp,
 		      1 << (OP_DESTROY_SESSION - 32) |
 		      1 << (OP_DESTROY_CLIENTID - 32)
 	};
+	unsigned long flags = 0;
 	unsigned int i;
+	int ret = 0;
 
 	if (sp->how == SP4_MACH_CRED) {
 		/* Print state protect result */
@@ -7334,7 +7363,8 @@ static int nfs4_sp4_select_mode(struct nfs_client *clp,
 		for (i = 0; i < NFS4_OP_MAP_NUM_WORDS; i++) {
 			if (sp->enforce.u.words[i] & ~supported_enforce[i]) {
 				dfprintk(MOUNT, "sp4_mach_cred: disabled\n");
-				return -EINVAL;
+				ret = -EINVAL;
+				goto out;
 			}
 		}
 
@@ -7353,10 +7383,11 @@ static int nfs4_sp4_select_mode(struct nfs_client *clp,
 		    test_bit(OP_DESTROY_CLIENTID, sp->enforce.u.longs)) {
 			dfprintk(MOUNT, "sp4_mach_cred:\n");
 			dfprintk(MOUNT, "  minimal mode enabled\n");
-			set_bit(NFS_SP4_MACH_CRED_MINIMAL, &clp->cl_sp4_flags);
+			__set_bit(NFS_SP4_MACH_CRED_MINIMAL, &flags);
 		} else {
 			dfprintk(MOUNT, "sp4_mach_cred: disabled\n");
-			return -EINVAL;
+			ret = -EINVAL;
+			goto out;
 		}
 
 		if (test_bit(OP_CLOSE, sp->allow.u.longs) &&
@@ -7364,110 +7395,46 @@ static int nfs4_sp4_select_mode(struct nfs_client *clp,
 		    test_bit(OP_DELEGRETURN, sp->allow.u.longs) &&
 		    test_bit(OP_LOCKU, sp->allow.u.longs)) {
 			dfprintk(MOUNT, "  cleanup mode enabled\n");
-			set_bit(NFS_SP4_MACH_CRED_CLEANUP, &clp->cl_sp4_flags);
+			__set_bit(NFS_SP4_MACH_CRED_CLEANUP, &flags);
 		}
 
 		if (test_bit(OP_LAYOUTRETURN, sp->allow.u.longs)) {
 			dfprintk(MOUNT, "  pnfs cleanup mode enabled\n");
-			set_bit(NFS_SP4_MACH_CRED_PNFS_CLEANUP,
-				&clp->cl_sp4_flags);
+			__set_bit(NFS_SP4_MACH_CRED_PNFS_CLEANUP, &flags);
 		}
 
 		if (test_bit(OP_SECINFO, sp->allow.u.longs) &&
 		    test_bit(OP_SECINFO_NO_NAME, sp->allow.u.longs)) {
 			dfprintk(MOUNT, "  secinfo mode enabled\n");
-			set_bit(NFS_SP4_MACH_CRED_SECINFO, &clp->cl_sp4_flags);
+			__set_bit(NFS_SP4_MACH_CRED_SECINFO, &flags);
 		}
 
 		if (test_bit(OP_TEST_STATEID, sp->allow.u.longs) &&
 		    test_bit(OP_FREE_STATEID, sp->allow.u.longs)) {
 			dfprintk(MOUNT, "  stateid mode enabled\n");
-			set_bit(NFS_SP4_MACH_CRED_STATEID, &clp->cl_sp4_flags);
+			__set_bit(NFS_SP4_MACH_CRED_STATEID, &flags);
 		}
 
 		if (test_bit(OP_WRITE, sp->allow.u.longs)) {
 			dfprintk(MOUNT, "  write mode enabled\n");
-			set_bit(NFS_SP4_MACH_CRED_WRITE, &clp->cl_sp4_flags);
+			__set_bit(NFS_SP4_MACH_CRED_WRITE, &flags);
 		}
 
 		if (test_bit(OP_COMMIT, sp->allow.u.longs)) {
 			dfprintk(MOUNT, "  commit mode enabled\n");
-			set_bit(NFS_SP4_MACH_CRED_COMMIT, &clp->cl_sp4_flags);
+			__set_bit(NFS_SP4_MACH_CRED_COMMIT, &flags);
 		}
 	}
-
+out:
+	clp->cl_sp4_flags = flags;
 	return 0;
 }
 
 struct nfs41_exchange_id_data {
 	struct nfs41_exchange_id_res res;
 	struct nfs41_exchange_id_args args;
-	struct rpc_xprt *xprt;
-	int rpc_status;
 };
 
-static void nfs4_exchange_id_done(struct rpc_task *task, void *data)
-{
-	struct nfs41_exchange_id_data *cdata =
-					(struct nfs41_exchange_id_data *)data;
-	struct nfs_client *clp = cdata->args.client;
-	int status = task->tk_status;
-
-	trace_nfs4_exchange_id(clp, status);
-
-	if (status == 0)
-		status = nfs4_check_cl_exchange_flags(cdata->res.flags);
-
-	if (cdata->xprt && status == 0) {
-		status = nfs4_detect_session_trunking(clp, &cdata->res,
-						      cdata->xprt);
-		goto out;
-	}
-
-	if (status  == 0)
-		status = nfs4_sp4_select_mode(clp, &cdata->res.state_protect);
-
-	if (status == 0) {
-		clp->cl_clientid = cdata->res.clientid;
-		clp->cl_exchange_flags = cdata->res.flags;
-		clp->cl_seqid = cdata->res.seqid;
-		/* Client ID is not confirmed */
-		if (!(cdata->res.flags & EXCHGID4_FLAG_CONFIRMED_R))
-			clear_bit(NFS4_SESSION_ESTABLISHED,
-				  &clp->cl_session->session_state);
-
-		kfree(clp->cl_serverowner);
-		clp->cl_serverowner = cdata->res.server_owner;
-		cdata->res.server_owner = NULL;
-
-		/* use the most recent implementation id */
-		kfree(clp->cl_implid);
-		clp->cl_implid = cdata->res.impl_id;
-		cdata->res.impl_id = NULL;
-
-		if (clp->cl_serverscope != NULL &&
-		    !nfs41_same_server_scope(clp->cl_serverscope,
-					cdata->res.server_scope)) {
-			dprintk("%s: server_scope mismatch detected\n",
-				__func__);
-			set_bit(NFS4CLNT_SERVER_SCOPE_MISMATCH, &clp->cl_state);
-			kfree(clp->cl_serverscope);
-			clp->cl_serverscope = NULL;
-		}
-
-		if (clp->cl_serverscope == NULL) {
-			clp->cl_serverscope = cdata->res.server_scope;
-			cdata->res.server_scope = NULL;
-		}
-		/* Save the EXCHANGE_ID verifier session trunk tests */
-		memcpy(clp->cl_confirm.data, cdata->args.verifier.data,
-		       sizeof(clp->cl_confirm.data));
-	}
-out:
-	cdata->rpc_status = status;
-	return;
-}
-
 static void nfs4_exchange_id_release(void *data)
 {
 	struct nfs41_exchange_id_data *cdata =
@@ -7481,7 +7448,6 @@ static void nfs4_exchange_id_release(void *data)
 }
 
 static const struct rpc_call_ops nfs4_exchange_id_call_ops = {
-	.rpc_call_done = nfs4_exchange_id_done,
 	.rpc_release = nfs4_exchange_id_release,
 };
 
@@ -7490,7 +7456,8 @@ static const struct rpc_call_ops nfs4_exchange_id_call_ops = {
  *
  * Wrapper for EXCHANGE_ID operation.
  */
-static int _nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
+static struct rpc_task *
+nfs4_run_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
 			u32 sp4_how, struct rpc_xprt *xprt)
 {
 	struct rpc_message msg = {
@@ -7504,17 +7471,15 @@ static int _nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
 		.flags = RPC_TASK_TIMEOUT,
 	};
 	struct nfs41_exchange_id_data *calldata;
-	struct rpc_task *task;
 	int status;
 
 	if (!atomic_inc_not_zero(&clp->cl_count))
-		return -EIO;
+		return ERR_PTR(-EIO);
 
+	status = -ENOMEM;
 	calldata = kzalloc(sizeof(*calldata), GFP_NOFS);
-	if (!calldata) {
-		nfs_put_client(clp);
-		return -ENOMEM;
-	}
+	if (!calldata)
+		goto out;
 
 	nfs4_init_boot_verifier(clp, &calldata->args.verifier);
 
@@ -7553,34 +7518,22 @@ static int _nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
 		goto out_impl_id;
 	}
 	if (xprt) {
-		calldata->xprt = xprt;
 		task_setup_data.rpc_xprt = xprt;
 		task_setup_data.flags |= RPC_TASK_SOFTCONN;
 		memcpy(calldata->args.verifier.data, clp->cl_confirm.data,
 				sizeof(calldata->args.verifier.data));
 	}
 	calldata->args.client = clp;
+	calldata->args.flags = EXCHGID4_FLAG_SUPP_MOVED_REFER |
+	EXCHGID4_FLAG_BIND_PRINC_STATEID;
 #ifdef CONFIG_NFS_V4_1_MIGRATION
-	calldata->args.flags = EXCHGID4_FLAG_SUPP_MOVED_REFER |
-	EXCHGID4_FLAG_BIND_PRINC_STATEID |
-	EXCHGID4_FLAG_SUPP_MOVED_MIGR,
-#else
-	calldata->args.flags = EXCHGID4_FLAG_SUPP_MOVED_REFER |
-	EXCHGID4_FLAG_BIND_PRINC_STATEID,
+	calldata->args.flags |= EXCHGID4_FLAG_SUPP_MOVED_MIGR;
 #endif
 	msg.rpc_argp = &calldata->args;
 	msg.rpc_resp = &calldata->res;
 	task_setup_data.callback_data = calldata;
 
-	task = rpc_run_task(&task_setup_data);
-	if (IS_ERR(task))
-		return PTR_ERR(task);
-
-	status = calldata->rpc_status;
-
-	rpc_put_task(task);
-out:
-	return status;
+	return rpc_run_task(&task_setup_data);
 
 out_impl_id:
 	kfree(calldata->res.impl_id);
@@ -7590,8 +7543,69 @@ static int _nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
 	kfree(calldata->res.server_owner);
 out_calldata:
 	kfree(calldata);
+out:
 	nfs_put_client(clp);
-	goto out;
+	return ERR_PTR(status);
+}
+
+/*
+ * _nfs4_proc_exchange_id()
+ *
+ * Wrapper for EXCHANGE_ID operation.
+ */
+static int _nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred,
+			u32 sp4_how)
+{
+	struct rpc_task *task;
+	struct nfs41_exchange_id_args *argp;
+	struct nfs41_exchange_id_res *resp;
+	int status;
+
+	task = nfs4_run_exchange_id(clp, cred, sp4_how, NULL);
+	if (IS_ERR(task))
+		return PTR_ERR(task);
+
+	argp = task->tk_msg.rpc_argp;
+	resp = task->tk_msg.rpc_resp;
+	status = task->tk_status;
+	if (status  != 0)
+		goto out;
+
+	status = nfs4_check_cl_exchange_flags(resp->flags);
+	if (status  != 0)
+		goto out;
+
+	status = nfs4_sp4_select_mode(clp, &resp->state_protect);
+	if (status != 0)
+		goto out;
+
+	clp->cl_clientid = resp->clientid;
+	clp->cl_exchange_flags = resp->flags;
+	clp->cl_seqid = resp->seqid;
+	/* Client ID is not confirmed */
+	if (!(resp->flags & EXCHGID4_FLAG_CONFIRMED_R))
+		clear_bit(NFS4_SESSION_ESTABLISHED,
+			  &clp->cl_session->session_state);
+
+	if (clp->cl_serverscope != NULL &&
+	    !nfs41_same_server_scope(clp->cl_serverscope,
+				resp->server_scope)) {
+		dprintk("%s: server_scope mismatch detected\n",
+			__func__);
+		set_bit(NFS4CLNT_SERVER_SCOPE_MISMATCH, &clp->cl_state);
+	}
+
+	swap(clp->cl_serverowner, resp->server_owner);
+	swap(clp->cl_serverscope, resp->server_scope);
+	swap(clp->cl_implid, resp->impl_id);
+
+	/* Save the EXCHANGE_ID verifier session trunk tests */
+	memcpy(clp->cl_confirm.data, argp->verifier.data,
+	       sizeof(clp->cl_confirm.data));
+out:
+	trace_nfs4_exchange_id(clp, status);
+	rpc_put_task(task);
+	return status;
 }
 
 /*
@@ -7614,13 +7628,13 @@ int nfs4_proc_exchange_id(struct nfs_client *clp, struct rpc_cred *cred)
 	/* try SP4_MACH_CRED if krb5i/p	*/
 	if (authflavor == RPC_AUTH_GSS_KRB5I ||
 	    authflavor == RPC_AUTH_GSS_KRB5P) {
-		status = _nfs4_proc_exchange_id(clp, cred, SP4_MACH_CRED, NULL);
+		status = _nfs4_proc_exchange_id(clp, cred, SP4_MACH_CRED);
 		if (!status)
 			return 0;
 	}
 
 	/* try SP4_NONE */
-	return _nfs4_proc_exchange_id(clp, cred, SP4_NONE, NULL);
+	return _nfs4_proc_exchange_id(clp, cred, SP4_NONE);
 }
 
 /**
@@ -7642,6 +7656,9 @@ int nfs4_test_session_trunk(struct rpc_clnt *clnt, struct rpc_xprt *xprt,
 			    void *data)
 {
 	struct nfs4_add_xprt_data *adata = (struct nfs4_add_xprt_data *)data;
+	struct rpc_task *task;
+	int status;
+
 	u32 sp4_how;
 
 	dprintk("--> %s try %s\n", __func__,
@@ -7650,7 +7667,17 @@ int nfs4_test_session_trunk(struct rpc_clnt *clnt, struct rpc_xprt *xprt,
 	sp4_how = (adata->clp->cl_sp4_flags == 0 ? SP4_NONE : SP4_MACH_CRED);
 
 	/* Test connection for session trunking. Async exchange_id call */
-	return  _nfs4_proc_exchange_id(adata->clp, adata->cred, sp4_how, xprt);
+	task = nfs4_run_exchange_id(adata->clp, adata->cred, sp4_how, xprt);
+	if (IS_ERR(task))
+		return PTR_ERR(task);
+
+	status = task->tk_status;
+	if (status == 0)
+		status = nfs4_detect_session_trunking(adata->clp,
+				task->tk_msg.rpc_resp, xprt);
+
+	rpc_put_task(task);
+	return status;
 }
 EXPORT_SYMBOL_GPL(nfs4_test_session_trunk);
 
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index ee66d8c..23cf4a8 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -134,19 +134,14 @@ EXPORT_SYMBOL_GPL(nfs_async_iocounter_wait);
 /*
  * nfs_page_group_lock - lock the head of the page group
  * @req - request in group that is to be locked
- * @nonblock - if true don't block waiting for lock
  *
- * this lock must be held if modifying the page group list
+ * this lock must be held when traversing or modifying the page
+ * group list
  *
- * return 0 on success, < 0 on error: -EDELAY if nonblocking or the
- * result from wait_on_bit_lock
- *
- * NOTE: calling with nonblock=false should always have set the
- *       lock bit (see fs/buffer.c and other uses of wait_on_bit_lock
- *       with TASK_UNINTERRUPTIBLE), so there is no need to check the result.
+ * return 0 on success, < 0 on error
  */
 int
-nfs_page_group_lock(struct nfs_page *req, bool nonblock)
+nfs_page_group_lock(struct nfs_page *req)
 {
 	struct nfs_page *head = req->wb_head;
 
@@ -155,35 +150,10 @@ nfs_page_group_lock(struct nfs_page *req, bool nonblock)
 	if (!test_and_set_bit(PG_HEADLOCK, &head->wb_flags))
 		return 0;
 
-	if (!nonblock) {
-		set_bit(PG_CONTENDED1, &head->wb_flags);
-		smp_mb__after_atomic();
-		return wait_on_bit_lock(&head->wb_flags, PG_HEADLOCK,
-				TASK_UNINTERRUPTIBLE);
-	}
-
-	return -EAGAIN;
-}
-
-/*
- * nfs_page_group_lock_wait - wait for the lock to clear, but don't grab it
- * @req - a request in the group
- *
- * This is a blocking call to wait for the group lock to be cleared.
- */
-void
-nfs_page_group_lock_wait(struct nfs_page *req)
-{
-	struct nfs_page *head = req->wb_head;
-
-	WARN_ON_ONCE(head != head->wb_head);
-
-	if (!test_bit(PG_HEADLOCK, &head->wb_flags))
-		return;
 	set_bit(PG_CONTENDED1, &head->wb_flags);
 	smp_mb__after_atomic();
-	wait_on_bit(&head->wb_flags, PG_HEADLOCK,
-		TASK_UNINTERRUPTIBLE);
+	return wait_on_bit_lock(&head->wb_flags, PG_HEADLOCK,
+				TASK_UNINTERRUPTIBLE);
 }
 
 /*
@@ -246,7 +216,7 @@ bool nfs_page_group_sync_on_bit(struct nfs_page *req, unsigned int bit)
 {
 	bool ret;
 
-	nfs_page_group_lock(req, false);
+	nfs_page_group_lock(req);
 	ret = nfs_page_group_sync_on_bit_locked(req, bit);
 	nfs_page_group_unlock(req);
 
@@ -288,9 +258,7 @@ nfs_page_group_init(struct nfs_page *req, struct nfs_page *prev)
 			inode = page_file_mapping(req->wb_page)->host;
 			set_bit(PG_INODE_REF, &req->wb_flags);
 			kref_get(&req->wb_kref);
-			spin_lock(&inode->i_lock);
-			NFS_I(inode)->nrequests++;
-			spin_unlock(&inode->i_lock);
+			atomic_long_inc(&NFS_I(inode)->nrequests);
 		}
 	}
 }
@@ -306,14 +274,11 @@ static void
 nfs_page_group_destroy(struct kref *kref)
 {
 	struct nfs_page *req = container_of(kref, struct nfs_page, wb_kref);
+	struct nfs_page *head = req->wb_head;
 	struct nfs_page *tmp, *next;
 
-	/* subrequests must release the ref on the head request */
-	if (req->wb_head != req)
-		nfs_release_request(req->wb_head);
-
 	if (!nfs_page_group_sync_on_bit(req, PG_TEARDOWN))
-		return;
+		goto out;
 
 	tmp = req;
 	do {
@@ -324,6 +289,10 @@ nfs_page_group_destroy(struct kref *kref)
 		nfs_free_request(tmp);
 		tmp = next;
 	} while (tmp != req);
+out:
+	/* subrequests must release the ref on the head request */
+	if (head != req)
+		nfs_release_request(head);
 }
 
 /**
@@ -465,6 +434,7 @@ void nfs_release_request(struct nfs_page *req)
 {
 	kref_put(&req->wb_kref, nfs_page_group_destroy);
 }
+EXPORT_SYMBOL_GPL(nfs_release_request);
 
 /**
  * nfs_wait_on_request - Wait for a request to complete.
@@ -483,6 +453,7 @@ nfs_wait_on_request(struct nfs_page *req)
 	return wait_on_bit_io(&req->wb_flags, PG_BUSY,
 			      TASK_UNINTERRUPTIBLE);
 }
+EXPORT_SYMBOL_GPL(nfs_wait_on_request);
 
 /*
  * nfs_generic_pg_test - determine if requests can be coalesced
@@ -1039,7 +1010,7 @@ static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
 	unsigned int bytes_left = 0;
 	unsigned int offset, pgbase;
 
-	nfs_page_group_lock(req, false);
+	nfs_page_group_lock(req);
 
 	subreq = req;
 	bytes_left = subreq->wb_bytes;
@@ -1061,7 +1032,7 @@ static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
 			if (mirror->pg_recoalesce)
 				return 0;
 			/* retry add_request for this subreq */
-			nfs_page_group_lock(req, false);
+			nfs_page_group_lock(req);
 			continue;
 		}
 
@@ -1158,7 +1129,7 @@ int nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
 
 	for (midx = 0; midx < desc->pg_mirror_count; midx++) {
 		if (midx) {
-			nfs_page_group_lock(req, false);
+			nfs_page_group_lock(req);
 
 			/* find the last request */
 			for (lastreq = req->wb_head;
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index c383d09..3125a9d 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -529,47 +529,6 @@ pnfs_put_lseg(struct pnfs_layout_segment *lseg)
 }
 EXPORT_SYMBOL_GPL(pnfs_put_lseg);
 
-static void pnfs_free_lseg_async_work(struct work_struct *work)
-{
-	struct pnfs_layout_segment *lseg;
-	struct pnfs_layout_hdr *lo;
-
-	lseg = container_of(work, struct pnfs_layout_segment, pls_work);
-	lo = lseg->pls_layout;
-
-	pnfs_free_lseg(lseg);
-	pnfs_put_layout_hdr(lo);
-}
-
-static void pnfs_free_lseg_async(struct pnfs_layout_segment *lseg)
-{
-	INIT_WORK(&lseg->pls_work, pnfs_free_lseg_async_work);
-	schedule_work(&lseg->pls_work);
-}
-
-void
-pnfs_put_lseg_locked(struct pnfs_layout_segment *lseg)
-{
-	if (!lseg)
-		return;
-
-	assert_spin_locked(&lseg->pls_layout->plh_inode->i_lock);
-
-	dprintk("%s: lseg %p ref %d valid %d\n", __func__, lseg,
-		atomic_read(&lseg->pls_refcount),
-		test_bit(NFS_LSEG_VALID, &lseg->pls_flags));
-	if (atomic_dec_and_test(&lseg->pls_refcount)) {
-		struct pnfs_layout_hdr *lo = lseg->pls_layout;
-		if (test_bit(NFS_LSEG_VALID, &lseg->pls_flags))
-			return;
-		pnfs_layout_remove_lseg(lo, lseg);
-		if (!pnfs_cache_lseg_for_layoutreturn(lo, lseg)) {
-			pnfs_get_layout_hdr(lo);
-			pnfs_free_lseg_async(lseg);
-		}
-	}
-}
-
 /*
  * is l2 fully contained in l1?
  *   start1                             end1
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index 99731e3..87f144f 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -67,7 +67,6 @@ struct pnfs_layout_segment {
 	u32 pls_seq;
 	unsigned long pls_flags;
 	struct pnfs_layout_hdr *pls_layout;
-	struct work_struct pls_work;
 };
 
 enum pnfs_try_status {
@@ -230,7 +229,6 @@ extern int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool sync);
 /* pnfs.c */
 void pnfs_get_layout_hdr(struct pnfs_layout_hdr *lo);
 void pnfs_put_lseg(struct pnfs_layout_segment *lseg);
-void pnfs_put_lseg_locked(struct pnfs_layout_segment *lseg);
 
 void set_pnfs_layoutdriver(struct nfs_server *, const struct nfs_fh *, struct nfs_fsinfo *);
 void unset_pnfs_layoutdriver(struct nfs_server *);
diff --git a/fs/nfs/pnfs_nfs.c b/fs/nfs/pnfs_nfs.c
index 25f28fa..303ff17 100644
--- a/fs/nfs/pnfs_nfs.c
+++ b/fs/nfs/pnfs_nfs.c
@@ -83,7 +83,7 @@ pnfs_generic_clear_request_commit(struct nfs_page *req,
 	}
 out:
 	nfs_request_remove_commit_list(req, cinfo);
-	pnfs_put_lseg_locked(freeme);
+	pnfs_put_lseg(freeme);
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_clear_request_commit);
 
@@ -91,21 +91,30 @@ static int
 pnfs_generic_transfer_commit_list(struct list_head *src, struct list_head *dst,
 				  struct nfs_commit_info *cinfo, int max)
 {
-	struct nfs_page *req, *tmp;
+	struct nfs_page *req;
 	int ret = 0;
 
-	list_for_each_entry_safe(req, tmp, src, wb_list) {
-		if (!nfs_lock_request(req))
-			continue;
+	while(!list_empty(src)) {
+		req = list_first_entry(src, struct nfs_page, wb_list);
+
 		kref_get(&req->wb_kref);
-		if (cond_resched_lock(&cinfo->inode->i_lock))
-			list_safe_reset_next(req, tmp, wb_list);
+		if (!nfs_lock_request(req)) {
+			int status;
+			mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
+			status = nfs_wait_on_request(req);
+			nfs_release_request(req);
+			mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
+			if (status < 0)
+				break;
+			continue;
+		}
 		nfs_request_remove_commit_list(req, cinfo);
 		clear_bit(PG_COMMIT_TO_DS, &req->wb_flags);
 		nfs_list_add_request(req, dst);
 		ret++;
 		if ((ret == max) && !cinfo->dreq)
 			break;
+		cond_resched();
 	}
 	return ret;
 }
@@ -119,7 +128,7 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
 	struct list_head *dst = &bucket->committing;
 	int ret;
 
-	lockdep_assert_held(&cinfo->inode->i_lock);
+	lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
 	ret = pnfs_generic_transfer_commit_list(src, dst, cinfo, max);
 	if (ret) {
 		cinfo->ds->nwritten -= ret;
@@ -127,7 +136,7 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
 		if (bucket->clseg == NULL)
 			bucket->clseg = pnfs_get_lseg(bucket->wlseg);
 		if (list_empty(src)) {
-			pnfs_put_lseg_locked(bucket->wlseg);
+			pnfs_put_lseg(bucket->wlseg);
 			bucket->wlseg = NULL;
 		}
 	}
@@ -142,7 +151,7 @@ int pnfs_generic_scan_commit_lists(struct nfs_commit_info *cinfo,
 {
 	int i, rv = 0, cnt;
 
-	lockdep_assert_held(&cinfo->inode->i_lock);
+	lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
 	for (i = 0; i < cinfo->ds->nbuckets && max != 0; i++) {
 		cnt = pnfs_generic_scan_ds_commit_list(&cinfo->ds->buckets[i],
 						       cinfo, max);
@@ -162,7 +171,7 @@ void pnfs_generic_recover_commit_reqs(struct list_head *dst,
 	int nwritten;
 	int i;
 
-	lockdep_assert_held(&cinfo->inode->i_lock);
+	lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
 restart:
 	for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) {
 		nwritten = pnfs_generic_transfer_commit_list(&b->written,
@@ -953,12 +962,12 @@ pnfs_layout_mark_request_commit(struct nfs_page *req,
 	struct list_head *list;
 	struct pnfs_commit_bucket *buckets;
 
-	spin_lock(&cinfo->inode->i_lock);
+	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
 	buckets = cinfo->ds->buckets;
 	list = &buckets[ds_commit_idx].written;
 	if (list_empty(list)) {
 		if (!pnfs_is_valid_lseg(lseg)) {
-			spin_unlock(&cinfo->inode->i_lock);
+			mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 			cinfo->completion_ops->resched_write(cinfo, req);
 			return;
 		}
@@ -975,7 +984,7 @@ pnfs_layout_mark_request_commit(struct nfs_page *req,
 	cinfo->ds->nwritten++;
 
 	nfs_request_add_commit_list_locked(req, list, cinfo);
-	spin_unlock(&cinfo->inode->i_lock);
+	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 	nfs_mark_page_unstable(req->wb_page, cinfo);
 }
 EXPORT_SYMBOL_GPL(pnfs_layout_mark_request_commit);
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index ae78ac0..1a877d1 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -154,6 +154,14 @@ static void nfs_context_set_write_error(struct nfs_open_context *ctx, int error)
 	set_bit(NFS_CONTEXT_ERROR_WRITE, &ctx->flags);
 }
 
+static struct nfs_page *
+nfs_page_private_request(struct page *page)
+{
+	if (!PagePrivate(page))
+		return NULL;
+	return (struct nfs_page *)page_private(page);
+}
+
 /*
  * nfs_page_find_head_request_locked - find head request associated with @page
  *
@@ -162,21 +170,41 @@ static void nfs_context_set_write_error(struct nfs_open_context *ctx, int error)
  * returns matching head request with reference held, or NULL if not found.
  */
 static struct nfs_page *
-nfs_page_find_head_request_locked(struct nfs_inode *nfsi, struct page *page)
+nfs_page_find_private_request(struct page *page)
 {
-	struct nfs_page *req = NULL;
+	struct address_space *mapping = page_file_mapping(page);
+	struct nfs_page *req;
 
-	if (PagePrivate(page))
-		req = (struct nfs_page *)page_private(page);
-	else if (unlikely(PageSwapCache(page)))
-		req = nfs_page_search_commits_for_head_request_locked(nfsi,
-			page);
-
+	if (!PagePrivate(page))
+		return NULL;
+	spin_lock(&mapping->private_lock);
+	req = nfs_page_private_request(page);
 	if (req) {
 		WARN_ON_ONCE(req->wb_head != req);
 		kref_get(&req->wb_kref);
 	}
+	spin_unlock(&mapping->private_lock);
+	return req;
+}
 
+static struct nfs_page *
+nfs_page_find_swap_request(struct page *page)
+{
+	struct inode *inode = page_file_mapping(page)->host;
+	struct nfs_inode *nfsi = NFS_I(inode);
+	struct nfs_page *req = NULL;
+	if (!PageSwapCache(page))
+		return NULL;
+	mutex_lock(&nfsi->commit_mutex);
+	if (PageSwapCache(page)) {
+		req = nfs_page_search_commits_for_head_request_locked(nfsi,
+			page);
+		if (req) {
+			WARN_ON_ONCE(req->wb_head != req);
+			kref_get(&req->wb_kref);
+		}
+	}
+	mutex_unlock(&nfsi->commit_mutex);
 	return req;
 }
 
@@ -187,12 +215,11 @@ nfs_page_find_head_request_locked(struct nfs_inode *nfsi, struct page *page)
  */
 static struct nfs_page *nfs_page_find_head_request(struct page *page)
 {
-	struct inode *inode = page_file_mapping(page)->host;
-	struct nfs_page *req = NULL;
+	struct nfs_page *req;
 
-	spin_lock(&inode->i_lock);
-	req = nfs_page_find_head_request_locked(NFS_I(inode), page);
-	spin_unlock(&inode->i_lock);
+	req = nfs_page_find_private_request(page);
+	if (!req)
+		req = nfs_page_find_swap_request(page);
 	return req;
 }
 
@@ -241,9 +268,6 @@ nfs_page_group_search_locked(struct nfs_page *head, unsigned int page_offset)
 {
 	struct nfs_page *req;
 
-	WARN_ON_ONCE(head != head->wb_head);
-	WARN_ON_ONCE(!test_bit(PG_HEADLOCK, &head->wb_head->wb_flags));
-
 	req = head;
 	do {
 		if (page_offset >= req->wb_pgbase &&
@@ -269,20 +293,17 @@ static bool nfs_page_group_covers_page(struct nfs_page *req)
 	unsigned int pos = 0;
 	unsigned int len = nfs_page_length(req->wb_page);
 
-	nfs_page_group_lock(req, false);
+	nfs_page_group_lock(req);
 
-	do {
+	for (;;) {
 		tmp = nfs_page_group_search_locked(req->wb_head, pos);
-		if (tmp) {
-			/* no way this should happen */
-			WARN_ON_ONCE(tmp->wb_pgbase != pos);
-			pos += tmp->wb_bytes - (pos - tmp->wb_pgbase);
-		}
-	} while (tmp && pos < len);
+		if (!tmp)
+			break;
+		pos = tmp->wb_pgbase + tmp->wb_bytes;
+	}
 
 	nfs_page_group_unlock(req);
-	WARN_ON_ONCE(pos > len);
-	return pos == len;
+	return pos >= len;
 }
 
 /* We can set the PG_uptodate flag if we see that a write request
@@ -333,8 +354,11 @@ static void nfs_end_page_writeback(struct nfs_page *req)
 {
 	struct inode *inode = page_file_mapping(req->wb_page)->host;
 	struct nfs_server *nfss = NFS_SERVER(inode);
+	bool is_done;
 
-	if (!nfs_page_group_sync_on_bit(req, PG_WB_END))
+	is_done = nfs_page_group_sync_on_bit(req, PG_WB_END);
+	nfs_unlock_request(req);
+	if (!is_done)
 		return;
 
 	end_page_writeback(req->wb_page);
@@ -342,22 +366,6 @@ static void nfs_end_page_writeback(struct nfs_page *req)
 		clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
 }
 
-
-/* nfs_page_group_clear_bits
- *   @req - an nfs request
- * clears all page group related bits from @req
- */
-static void
-nfs_page_group_clear_bits(struct nfs_page *req)
-{
-	clear_bit(PG_TEARDOWN, &req->wb_flags);
-	clear_bit(PG_UNLOCKPAGE, &req->wb_flags);
-	clear_bit(PG_UPTODATE, &req->wb_flags);
-	clear_bit(PG_WB_END, &req->wb_flags);
-	clear_bit(PG_REMOVE, &req->wb_flags);
-}
-
-
 /*
  * nfs_unroll_locks_and_wait -  unlock all newly locked reqs and wait on @req
  *
@@ -366,43 +374,24 @@ nfs_page_group_clear_bits(struct nfs_page *req)
  * @inode - inode associated with request page group, must be holding inode lock
  * @head  - head request of page group, must be holding head lock
  * @req   - request that couldn't lock and needs to wait on the req bit lock
- * @nonblock - if true, don't actually wait
  *
- * NOTE: this must be called holding page_group bit lock and inode spin lock
- *       and BOTH will be released before returning.
+ * NOTE: this must be called holding page_group bit lock
+ *       which will be released before returning.
  *
  * returns 0 on success, < 0 on error.
  */
-static int
-nfs_unroll_locks_and_wait(struct inode *inode, struct nfs_page *head,
-			  struct nfs_page *req, bool nonblock)
-	__releases(&inode->i_lock)
+static void
+nfs_unroll_locks(struct inode *inode, struct nfs_page *head,
+			  struct nfs_page *req)
 {
 	struct nfs_page *tmp;
-	int ret;
 
 	/* relinquish all the locks successfully grabbed this run */
-	for (tmp = head ; tmp != req; tmp = tmp->wb_this_page)
-		nfs_unlock_request(tmp);
-
-	WARN_ON_ONCE(test_bit(PG_TEARDOWN, &req->wb_flags));
-
-	/* grab a ref on the request that will be waited on */
-	kref_get(&req->wb_kref);
-
-	nfs_page_group_unlock(head);
-	spin_unlock(&inode->i_lock);
-
-	/* release ref from nfs_page_find_head_request_locked */
-	nfs_release_request(head);
-
-	if (!nonblock)
-		ret = nfs_wait_on_request(req);
-	else
-		ret = -EAGAIN;
-	nfs_release_request(req);
-
-	return ret;
+	for (tmp = head->wb_this_page ; tmp != req; tmp = tmp->wb_this_page) {
+		if (!kref_read(&tmp->wb_kref))
+			continue;
+		nfs_unlock_and_release_request(tmp);
+	}
 }
 
 /*
@@ -417,7 +406,8 @@ nfs_unroll_locks_and_wait(struct inode *inode, struct nfs_page *head,
  */
 static void
 nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
-				 struct nfs_page *old_head)
+				 struct nfs_page *old_head,
+				 struct inode *inode)
 {
 	while (destroy_list) {
 		struct nfs_page *subreq = destroy_list;
@@ -428,33 +418,28 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
 		WARN_ON_ONCE(old_head != subreq->wb_head);
 
 		/* make sure old group is not used */
-		subreq->wb_head = subreq;
 		subreq->wb_this_page = subreq;
 
+		clear_bit(PG_REMOVE, &subreq->wb_flags);
+
+		/* Note: races with nfs_page_group_destroy() */
+		if (!kref_read(&subreq->wb_kref)) {
+			/* Check if we raced with nfs_page_group_destroy() */
+			if (test_and_clear_bit(PG_TEARDOWN, &subreq->wb_flags))
+				nfs_free_request(subreq);
+			continue;
+		}
+
+		subreq->wb_head = subreq;
+
+		if (test_and_clear_bit(PG_INODE_REF, &subreq->wb_flags)) {
+			nfs_release_request(subreq);
+			atomic_long_dec(&NFS_I(inode)->nrequests);
+		}
+
 		/* subreq is now totally disconnected from page group or any
 		 * write / commit lists. last chance to wake any waiters */
-		nfs_unlock_request(subreq);
-
-		if (!test_bit(PG_TEARDOWN, &subreq->wb_flags)) {
-			/* release ref on old head request */
-			nfs_release_request(old_head);
-
-			nfs_page_group_clear_bits(subreq);
-
-			/* release the PG_INODE_REF reference */
-			if (test_and_clear_bit(PG_INODE_REF, &subreq->wb_flags))
-				nfs_release_request(subreq);
-			else
-				WARN_ON_ONCE(1);
-		} else {
-			WARN_ON_ONCE(test_bit(PG_CLEAN, &subreq->wb_flags));
-			/* zombie requests have already released the last
-			 * reference and were waiting on the rest of the
-			 * group to complete. Since it's no longer part of a
-			 * group, simply free the request */
-			nfs_page_group_clear_bits(subreq);
-			nfs_free_request(subreq);
-		}
+		nfs_unlock_and_release_request(subreq);
 	}
 }
 
@@ -464,7 +449,6 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
  *                              operations for this page.
  *
  * @page - the page used to lookup the "page group" of nfs_page structures
- * @nonblock - if true, don't block waiting for request locks
  *
  * This function joins all sub requests to the head request by first
  * locking all requests in the group, cancelling any pending operations
@@ -478,7 +462,7 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
  * error was encountered.
  */
 static struct nfs_page *
-nfs_lock_and_join_requests(struct page *page, bool nonblock)
+nfs_lock_and_join_requests(struct page *page)
 {
 	struct inode *inode = page_file_mapping(page)->host;
 	struct nfs_page *head, *subreq;
@@ -487,43 +471,59 @@ nfs_lock_and_join_requests(struct page *page, bool nonblock)
 	int ret;
 
 try_again:
-	total_bytes = 0;
-
-	WARN_ON_ONCE(destroy_list);
-
-	spin_lock(&inode->i_lock);
-
 	/*
 	 * A reference is taken only on the head request which acts as a
 	 * reference to the whole page group - the group will not be destroyed
 	 * until the head reference is released.
 	 */
-	head = nfs_page_find_head_request_locked(NFS_I(inode), page);
-
-	if (!head) {
-		spin_unlock(&inode->i_lock);
+	head = nfs_page_find_head_request(page);
+	if (!head)
 		return NULL;
+
+	/* lock the page head first in order to avoid an ABBA inefficiency */
+	if (!nfs_lock_request(head)) {
+		ret = nfs_wait_on_request(head);
+		nfs_release_request(head);
+		if (ret < 0)
+			return ERR_PTR(ret);
+		goto try_again;
 	}
 
-	/* holding inode lock, so always make a non-blocking call to try the
-	 * page group lock */
-	ret = nfs_page_group_lock(head, true);
+	/* Ensure that nobody removed the request before we locked it */
+	if (head != nfs_page_private_request(page) && !PageSwapCache(page)) {
+		nfs_unlock_and_release_request(head);
+		goto try_again;
+	}
+
+	ret = nfs_page_group_lock(head);
 	if (ret < 0) {
-		spin_unlock(&inode->i_lock);
-
-		if (!nonblock && ret == -EAGAIN) {
-			nfs_page_group_lock_wait(head);
-			nfs_release_request(head);
-			goto try_again;
-		}
-
-		nfs_release_request(head);
+		nfs_unlock_and_release_request(head);
 		return ERR_PTR(ret);
 	}
 
 	/* lock each request in the page group */
-	subreq = head;
-	do {
+	total_bytes = head->wb_bytes;
+	for (subreq = head->wb_this_page; subreq != head;
+			subreq = subreq->wb_this_page) {
+
+		if (!kref_get_unless_zero(&subreq->wb_kref))
+			continue;
+		while (!nfs_lock_request(subreq)) {
+			/*
+			 * Unlock page to allow nfs_page_group_sync_on_bit()
+			 * to succeed
+			 */
+			nfs_page_group_unlock(head);
+			ret = nfs_wait_on_request(subreq);
+			if (!ret)
+				ret = nfs_page_group_lock(head);
+			if (ret < 0) {
+				nfs_unroll_locks(inode, head, subreq);
+				nfs_release_request(subreq);
+				nfs_unlock_and_release_request(head);
+				return ERR_PTR(ret);
+			}
+		}
 		/*
 		 * Subrequests are always contiguous, non overlapping
 		 * and in order - but may be repeated (mirrored writes).
@@ -534,25 +534,13 @@ nfs_lock_and_join_requests(struct page *page, bool nonblock)
 		} else if (WARN_ON_ONCE(subreq->wb_offset < head->wb_offset ||
 			    ((subreq->wb_offset + subreq->wb_bytes) >
 			     (head->wb_offset + total_bytes)))) {
+			nfs_unroll_locks(inode, head, subreq);
+			nfs_unlock_and_release_request(subreq);
 			nfs_page_group_unlock(head);
-			spin_unlock(&inode->i_lock);
+			nfs_unlock_and_release_request(head);
 			return ERR_PTR(-EIO);
 		}
-
-		if (!nfs_lock_request(subreq)) {
-			/* releases page group bit lock and
-			 * inode spin lock and all references */
-			ret = nfs_unroll_locks_and_wait(inode, head,
-				subreq, nonblock);
-
-			if (ret == 0)
-				goto try_again;
-
-			return ERR_PTR(ret);
-		}
-
-		subreq = subreq->wb_this_page;
-	} while (subreq != head);
+	}
 
 	/* Now that all requests are locked, make sure they aren't on any list.
 	 * Commit list removal accounting is done after locks are dropped */
@@ -573,34 +561,30 @@ nfs_lock_and_join_requests(struct page *page, bool nonblock)
 		head->wb_bytes = total_bytes;
 	}
 
-	/*
-	 * prepare head request to be added to new pgio descriptor
-	 */
-	nfs_page_group_clear_bits(head);
-
-	/*
-	 * some part of the group was still on the inode list - otherwise
-	 * the group wouldn't be involved in async write.
-	 * grab a reference for the head request, iff it needs one.
-	 */
-	if (!test_and_set_bit(PG_INODE_REF, &head->wb_flags))
+	/* Postpone destruction of this request */
+	if (test_and_clear_bit(PG_REMOVE, &head->wb_flags)) {
+		set_bit(PG_INODE_REF, &head->wb_flags);
 		kref_get(&head->wb_kref);
+		atomic_long_inc(&NFS_I(inode)->nrequests);
+	}
 
 	nfs_page_group_unlock(head);
 
-	/* drop lock to clean uprequests on destroy list */
-	spin_unlock(&inode->i_lock);
+	nfs_destroy_unlinked_subrequests(destroy_list, head, inode);
 
-	nfs_destroy_unlinked_subrequests(destroy_list, head);
+	/* Did we lose a race with nfs_inode_remove_request()? */
+	if (!(PagePrivate(page) || PageSwapCache(page))) {
+		nfs_unlock_and_release_request(head);
+		return NULL;
+	}
 
-	/* still holds ref on head from nfs_page_find_head_request_locked
+	/* still holds ref on head from nfs_page_find_head_request
 	 * and still has lock on head from lock loop */
 	return head;
 }
 
 static void nfs_write_error_remove_page(struct nfs_page *req)
 {
-	nfs_unlock_request(req);
 	nfs_end_page_writeback(req);
 	generic_error_remove_page(page_file_mapping(req->wb_page),
 				  req->wb_page);
@@ -624,12 +608,12 @@ nfs_error_is_fatal_on_server(int err)
  * May return an error if the user signalled nfs_wait_on_request().
  */
 static int nfs_page_async_flush(struct nfs_pageio_descriptor *pgio,
-				struct page *page, bool nonblock)
+				struct page *page)
 {
 	struct nfs_page *req;
 	int ret = 0;
 
-	req = nfs_lock_and_join_requests(page, nonblock);
+	req = nfs_lock_and_join_requests(page);
 	if (!req)
 		goto out;
 	ret = PTR_ERR(req);
@@ -672,7 +656,7 @@ static int nfs_do_writepage(struct page *page, struct writeback_control *wbc,
 	int ret;
 
 	nfs_pageio_cond_complete(pgio, page_index(page));
-	ret = nfs_page_async_flush(pgio, page, wbc->sync_mode == WB_SYNC_NONE);
+	ret = nfs_page_async_flush(pgio, page);
 	if (ret == -EAGAIN) {
 		redirty_page_for_writepage(wbc, page);
 		ret = 0;
@@ -759,6 +743,7 @@ int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc)
  */
 static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
 {
+	struct address_space *mapping = page_file_mapping(req->wb_page);
 	struct nfs_inode *nfsi = NFS_I(inode);
 
 	WARN_ON_ONCE(req->wb_this_page != req);
@@ -766,27 +751,30 @@ static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
 	/* Lock the request! */
 	nfs_lock_request(req);
 
-	spin_lock(&inode->i_lock);
-	if (!nfsi->nrequests &&
-	    NFS_PROTO(inode)->have_delegation(inode, FMODE_WRITE))
-		inode->i_version++;
 	/*
 	 * Swap-space should not get truncated. Hence no need to plug the race
 	 * with invalidate/truncate.
 	 */
+	spin_lock(&mapping->private_lock);
+	if (!nfs_have_writebacks(inode) &&
+	    NFS_PROTO(inode)->have_delegation(inode, FMODE_WRITE)) {
+		spin_lock(&inode->i_lock);
+		inode->i_version++;
+		spin_unlock(&inode->i_lock);
+	}
 	if (likely(!PageSwapCache(req->wb_page))) {
 		set_bit(PG_MAPPED, &req->wb_flags);
 		SetPagePrivate(req->wb_page);
 		set_page_private(req->wb_page, (unsigned long)req);
 	}
-	nfsi->nrequests++;
+	spin_unlock(&mapping->private_lock);
+	atomic_long_inc(&nfsi->nrequests);
 	/* this a head request for a page group - mark it as having an
 	 * extra reference so sub groups can follow suit.
 	 * This flag also informs pgio layer when to bump nrequests when
 	 * adding subrequests. */
 	WARN_ON(test_and_set_bit(PG_INODE_REF, &req->wb_flags));
 	kref_get(&req->wb_kref);
-	spin_unlock(&inode->i_lock);
 }
 
 /*
@@ -794,25 +782,22 @@ static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
  */
 static void nfs_inode_remove_request(struct nfs_page *req)
 {
-	struct inode *inode = d_inode(req->wb_context->dentry);
+	struct address_space *mapping = page_file_mapping(req->wb_page);
+	struct inode *inode = mapping->host;
 	struct nfs_inode *nfsi = NFS_I(inode);
 	struct nfs_page *head;
 
+	atomic_long_dec(&nfsi->nrequests);
 	if (nfs_page_group_sync_on_bit(req, PG_REMOVE)) {
 		head = req->wb_head;
 
-		spin_lock(&inode->i_lock);
+		spin_lock(&mapping->private_lock);
 		if (likely(head->wb_page && !PageSwapCache(head->wb_page))) {
 			set_page_private(head->wb_page, 0);
 			ClearPagePrivate(head->wb_page);
 			clear_bit(PG_MAPPED, &head->wb_flags);
 		}
-		nfsi->nrequests--;
-		spin_unlock(&inode->i_lock);
-	} else {
-		spin_lock(&inode->i_lock);
-		nfsi->nrequests--;
-		spin_unlock(&inode->i_lock);
+		spin_unlock(&mapping->private_lock);
 	}
 
 	if (test_and_clear_bit(PG_INODE_REF, &req->wb_flags))
@@ -868,7 +853,8 @@ nfs_page_search_commits_for_head_request_locked(struct nfs_inode *nfsi,
  * number of outstanding requests requiring a commit as well as
  * the MM page stats.
  *
- * The caller must hold cinfo->inode->i_lock, and the nfs_page lock.
+ * The caller must hold NFS_I(cinfo->inode)->commit_mutex, and the
+ * nfs_page lock.
  */
 void
 nfs_request_add_commit_list_locked(struct nfs_page *req, struct list_head *dst,
@@ -876,7 +862,7 @@ nfs_request_add_commit_list_locked(struct nfs_page *req, struct list_head *dst,
 {
 	set_bit(PG_CLEAN, &req->wb_flags);
 	nfs_list_add_request(req, dst);
-	cinfo->mds->ncommit++;
+	atomic_long_inc(&cinfo->mds->ncommit);
 }
 EXPORT_SYMBOL_GPL(nfs_request_add_commit_list_locked);
 
@@ -896,9 +882,9 @@ EXPORT_SYMBOL_GPL(nfs_request_add_commit_list_locked);
 void
 nfs_request_add_commit_list(struct nfs_page *req, struct nfs_commit_info *cinfo)
 {
-	spin_lock(&cinfo->inode->i_lock);
+	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
 	nfs_request_add_commit_list_locked(req, &cinfo->mds->list, cinfo);
-	spin_unlock(&cinfo->inode->i_lock);
+	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 	if (req->wb_page)
 		nfs_mark_page_unstable(req->wb_page, cinfo);
 }
@@ -922,7 +908,7 @@ nfs_request_remove_commit_list(struct nfs_page *req,
 	if (!test_and_clear_bit(PG_CLEAN, &(req)->wb_flags))
 		return;
 	nfs_list_remove_request(req);
-	cinfo->mds->ncommit--;
+	atomic_long_dec(&cinfo->mds->ncommit);
 }
 EXPORT_SYMBOL_GPL(nfs_request_remove_commit_list);
 
@@ -967,7 +953,7 @@ nfs_clear_page_commit(struct page *page)
 		    WB_RECLAIMABLE);
 }
 
-/* Called holding inode (/cinfo) lock */
+/* Called holding the request lock on @req */
 static void
 nfs_clear_request_commit(struct nfs_page *req)
 {
@@ -976,9 +962,11 @@ nfs_clear_request_commit(struct nfs_page *req)
 		struct nfs_commit_info cinfo;
 
 		nfs_init_cinfo_from_inode(&cinfo, inode);
+		mutex_lock(&NFS_I(inode)->commit_mutex);
 		if (!pnfs_clear_request_commit(req, &cinfo)) {
 			nfs_request_remove_commit_list(req, &cinfo);
 		}
+		mutex_unlock(&NFS_I(inode)->commit_mutex);
 		nfs_clear_page_commit(req->wb_page);
 	}
 }
@@ -1023,7 +1011,6 @@ static void nfs_write_completion(struct nfs_pgio_header *hdr)
 remove_req:
 		nfs_inode_remove_request(req);
 next:
-		nfs_unlock_request(req);
 		nfs_end_page_writeback(req);
 		nfs_release_request(req);
 	}
@@ -1035,28 +1022,36 @@ static void nfs_write_completion(struct nfs_pgio_header *hdr)
 unsigned long
 nfs_reqs_to_commit(struct nfs_commit_info *cinfo)
 {
-	return cinfo->mds->ncommit;
+	return atomic_long_read(&cinfo->mds->ncommit);
 }
 
-/* cinfo->inode->i_lock held by caller */
+/* NFS_I(cinfo->inode)->commit_mutex held by caller */
 int
 nfs_scan_commit_list(struct list_head *src, struct list_head *dst,
 		     struct nfs_commit_info *cinfo, int max)
 {
-	struct nfs_page *req, *tmp;
+	struct nfs_page *req;
 	int ret = 0;
 
-	list_for_each_entry_safe(req, tmp, src, wb_list) {
-		if (!nfs_lock_request(req))
-			continue;
+	while(!list_empty(src)) {
+		req = list_first_entry(src, struct nfs_page, wb_list);
 		kref_get(&req->wb_kref);
-		if (cond_resched_lock(&cinfo->inode->i_lock))
-			list_safe_reset_next(req, tmp, wb_list);
+		if (!nfs_lock_request(req)) {
+			int status;
+			mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
+			status = nfs_wait_on_request(req);
+			nfs_release_request(req);
+			mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
+			if (status < 0)
+				break;
+			continue;
+		}
 		nfs_request_remove_commit_list(req, cinfo);
 		nfs_list_add_request(req, dst);
 		ret++;
 		if ((ret == max) && !cinfo->dreq)
 			break;
+		cond_resched();
 	}
 	return ret;
 }
@@ -1076,15 +1071,17 @@ nfs_scan_commit(struct inode *inode, struct list_head *dst,
 {
 	int ret = 0;
 
-	spin_lock(&cinfo->inode->i_lock);
-	if (cinfo->mds->ncommit > 0) {
+	if (!atomic_long_read(&cinfo->mds->ncommit))
+		return 0;
+	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
+	if (atomic_long_read(&cinfo->mds->ncommit) > 0) {
 		const int max = INT_MAX;
 
 		ret = nfs_scan_commit_list(&cinfo->mds->list, dst,
 					   cinfo, max);
 		ret += pnfs_scan_commit_lists(inode, cinfo, max - ret);
 	}
-	spin_unlock(&cinfo->inode->i_lock);
+	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 	return ret;
 }
 
@@ -1105,43 +1102,21 @@ static struct nfs_page *nfs_try_to_update_request(struct inode *inode,
 	unsigned int end;
 	int error;
 
-	if (!PagePrivate(page))
-		return NULL;
-
 	end = offset + bytes;
-	spin_lock(&inode->i_lock);
 
-	for (;;) {
-		req = nfs_page_find_head_request_locked(NFS_I(inode), page);
-		if (req == NULL)
-			goto out_unlock;
+	req = nfs_lock_and_join_requests(page);
+	if (IS_ERR_OR_NULL(req))
+		return req;
 
-		/* should be handled by nfs_flush_incompatible */
-		WARN_ON_ONCE(req->wb_head != req);
-		WARN_ON_ONCE(req->wb_this_page != req);
-
-		rqend = req->wb_offset + req->wb_bytes;
-		/*
-		 * Tell the caller to flush out the request if
-		 * the offsets are non-contiguous.
-		 * Note: nfs_flush_incompatible() will already
-		 * have flushed out requests having wrong owners.
-		 */
-		if (offset > rqend
-		    || end < req->wb_offset)
-			goto out_flushme;
-
-		if (nfs_lock_request(req))
-			break;
-
-		/* The request is locked, so wait and then retry */
-		spin_unlock(&inode->i_lock);
-		error = nfs_wait_on_request(req);
-		nfs_release_request(req);
-		if (error != 0)
-			goto out_err;
-		spin_lock(&inode->i_lock);
-	}
+	rqend = req->wb_offset + req->wb_bytes;
+	/*
+	 * Tell the caller to flush out the request if
+	 * the offsets are non-contiguous.
+	 * Note: nfs_flush_incompatible() will already
+	 * have flushed out requests having wrong owners.
+	 */
+	if (offset > rqend || end < req->wb_offset)
+		goto out_flushme;
 
 	/* Okay, the request matches. Update the region */
 	if (offset < req->wb_offset) {
@@ -1152,17 +1127,17 @@ static struct nfs_page *nfs_try_to_update_request(struct inode *inode,
 		req->wb_bytes = end - req->wb_offset;
 	else
 		req->wb_bytes = rqend - req->wb_offset;
-out_unlock:
-	if (req)
-		nfs_clear_request_commit(req);
-	spin_unlock(&inode->i_lock);
 	return req;
 out_flushme:
-	spin_unlock(&inode->i_lock);
-	nfs_release_request(req);
+	/*
+	 * Note: we mark the request dirty here because
+	 * nfs_lock_and_join_requests() cannot preserve
+	 * commit flags, so we have to replay the write.
+	 */
+	nfs_mark_request_dirty(req);
+	nfs_unlock_and_release_request(req);
 	error = nfs_wb_page(inode, page);
-out_err:
-	return ERR_PTR(error);
+	return (error < 0) ? ERR_PTR(error) : NULL;
 }
 
 /*
@@ -1227,8 +1202,6 @@ int nfs_flush_incompatible(struct file *file, struct page *page)
 		l_ctx = req->wb_lock_context;
 		do_flush = req->wb_page != page ||
 			!nfs_match_open_context(req->wb_context, ctx);
-		/* for now, flush if more than 1 request in page_group */
-		do_flush |= req->wb_this_page != req;
 		if (l_ctx && flctx &&
 		    !(list_empty_careful(&flctx->flc_posix) &&
 		      list_empty_careful(&flctx->flc_flock))) {
@@ -1412,7 +1385,6 @@ static void nfs_redirty_request(struct nfs_page *req)
 {
 	nfs_mark_request_dirty(req);
 	set_bit(NFS_CONTEXT_RESEND_WRITES, &req->wb_context->flags);
-	nfs_unlock_request(req);
 	nfs_end_page_writeback(req);
 	nfs_release_request(req);
 }
@@ -1934,7 +1906,7 @@ int nfs_write_inode(struct inode *inode, struct writeback_control *wbc)
 	int ret = 0;
 
 	/* no commits means nothing needs to be done */
-	if (!nfsi->commit_info.ncommit)
+	if (!atomic_long_read(&nfsi->commit_info.ncommit))
 		return ret;
 
 	if (wbc->sync_mode == WB_SYNC_NONE) {
@@ -2015,7 +1987,7 @@ int nfs_wb_page_cancel(struct inode *inode, struct page *page)
 
 	/* blocking call to cancel all requests and join to a single (head)
 	 * request */
-	req = nfs_lock_and_join_requests(page, false);
+	req = nfs_lock_and_join_requests(page);
 
 	if (IS_ERR(req)) {
 		ret = PTR_ERR(req);
diff --git a/include/linux/nfs_fs.h b/include/linux/nfs_fs.h
index 5cc91d6..238fdc4 100644
--- a/include/linux/nfs_fs.h
+++ b/include/linux/nfs_fs.h
@@ -154,7 +154,7 @@ struct nfs_inode {
 	 */
 	__be32			cookieverf[2];
 
-	unsigned long		nrequests;
+	atomic_long_t		nrequests;
 	struct nfs_mds_commit_info commit_info;
 
 	/* Open contexts for shared mmap writes */
@@ -163,6 +163,7 @@ struct nfs_inode {
 	/* Readers: in-flight sillydelete RPC calls */
 	/* Writers: rmdir */
 	struct rw_semaphore	rmdir_sem;
+	struct mutex		commit_mutex;
 
 #if IS_ENABLED(CONFIG_NFS_V4)
 	struct nfs4_cached_acl	*nfs4_acl;
@@ -510,7 +511,7 @@ extern void nfs_commit_free(struct nfs_commit_data *data);
 static inline int
 nfs_have_writebacks(struct inode *inode)
 {
-	return NFS_I(inode)->nrequests != 0;
+	return atomic_long_read(&NFS_I(inode)->nrequests) != 0;
 }
 
 /*
diff --git a/include/linux/nfs_page.h b/include/linux/nfs_page.h
index 8b1a35a..d117120c 100644
--- a/include/linux/nfs_page.h
+++ b/include/linux/nfs_page.h
@@ -138,8 +138,7 @@ extern size_t nfs_generic_pg_test(struct nfs_pageio_descriptor *desc,
 extern  int nfs_wait_on_request(struct nfs_page *);
 extern	void nfs_unlock_request(struct nfs_page *req);
 extern	void nfs_unlock_and_release_request(struct nfs_page *);
-extern int nfs_page_group_lock(struct nfs_page *, bool);
-extern void nfs_page_group_lock_wait(struct nfs_page *);
+extern int nfs_page_group_lock(struct nfs_page *);
 extern void nfs_page_group_unlock(struct nfs_page *);
 extern bool nfs_page_group_sync_on_bit(struct nfs_page *, unsigned int);
 extern bool nfs_async_iocounter_wait(struct rpc_task *, struct nfs_lock_context *);
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index 62cbcb8..164d535 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -1476,7 +1476,7 @@ struct nfs_pgio_header {
 
 struct nfs_mds_commit_info {
 	atomic_t rpcs_out;
-	unsigned long		ncommit;
+	atomic_long_t		ncommit;
 	struct list_head	list;
 };
 
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 50a99a1..c1768f9 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -139,6 +139,8 @@ struct rpc_task_setup {
 #define RPC_TASK_RUNNING	0
 #define RPC_TASK_QUEUED		1
 #define RPC_TASK_ACTIVE		2
+#define RPC_TASK_MSG_RECV	3
+#define RPC_TASK_MSG_RECV_WAIT	4
 
 #define RPC_IS_RUNNING(t)	test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
 #define rpc_set_running(t)	set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index eab1c74..a97e6de 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -232,6 +232,7 @@ struct rpc_xprt {
 	 */
 	spinlock_t		transport_lock;	/* lock transport info */
 	spinlock_t		reserve_lock;	/* lock slot table */
+	spinlock_t		recv_lock;	/* lock receive list */
 	u32			xid;		/* Next XID value to use */
 	struct rpc_task *	snd_task;	/* Task blocked in send */
 	struct svc_xprt		*bc_xprt;	/* NFSv4.1 backchannel */
@@ -372,6 +373,8 @@ void			xprt_write_space(struct rpc_xprt *xprt);
 void			xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result);
 struct rpc_rqst *	xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid);
 void			xprt_complete_rqst(struct rpc_task *task, int copied);
+void			xprt_pin_rqst(struct rpc_rqst *req);
+void			xprt_unpin_rqst(struct rpc_rqst *req);
 void			xprt_release_rqst_cong(struct rpc_task *task);
 void			xprt_disconnect_done(struct rpc_xprt *xprt);
 void			xprt_force_disconnect(struct rpc_xprt *xprt);
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index ac701c2..c2c68a1 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -171,10 +171,10 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
 	/*
 	 * Add the temporary list to the backchannel preallocation list
 	 */
-	spin_lock_bh(&xprt->bc_pa_lock);
+	spin_lock(&xprt->bc_pa_lock);
 	list_splice(&tmp_list, &xprt->bc_pa_list);
 	xprt_inc_alloc_count(xprt, min_reqs);
-	spin_unlock_bh(&xprt->bc_pa_lock);
+	spin_unlock(&xprt->bc_pa_lock);
 
 	dprintk("RPC:       setup backchannel transport done\n");
 	return 0;
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 2b720fa..272063c 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -1001,7 +1001,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
 
 	if (!bc_xprt)
 		return -EAGAIN;
-	spin_lock_bh(&bc_xprt->transport_lock);
+	spin_lock(&bc_xprt->recv_lock);
 	req = xprt_lookup_rqst(bc_xprt, xid);
 	if (!req)
 		goto unlock_notfound;
@@ -1019,7 +1019,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
 	memcpy(dst->iov_base, src->iov_base, src->iov_len);
 	xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len);
 	rqstp->rq_arg.len = 0;
-	spin_unlock_bh(&bc_xprt->transport_lock);
+	spin_unlock(&bc_xprt->recv_lock);
 	return 0;
 unlock_notfound:
 	printk(KERN_NOTICE
@@ -1028,7 +1028,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
 		__func__, ntohl(calldir),
 		bc_xprt, ntohl(xid));
 unlock_eagain:
-	spin_unlock_bh(&bc_xprt->transport_lock);
+	spin_unlock(&bc_xprt->recv_lock);
 	return -EAGAIN;
 }
 
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 4654a99..2af189c 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -844,6 +844,48 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 }
 EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 
+/**
+ * xprt_pin_rqst - Pin a request on the transport receive list
+ * @req: Request to pin
+ *
+ * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
+ * so should be holding the xprt transport lock.
+ */
+void xprt_pin_rqst(struct rpc_rqst *req)
+{
+	set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
+}
+
+/**
+ * xprt_unpin_rqst - Unpin a request on the transport receive list
+ * @req: Request to pin
+ *
+ * Caller should be holding the xprt transport lock.
+ */
+void xprt_unpin_rqst(struct rpc_rqst *req)
+{
+	struct rpc_task *task = req->rq_task;
+
+	clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate);
+	if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
+		wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
+}
+
+static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
+__must_hold(&req->rq_xprt->recv_lock)
+{
+	struct rpc_task *task = req->rq_task;
+	
+	if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
+		spin_unlock(&req->rq_xprt->recv_lock);
+		set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
+		wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
+				TASK_UNINTERRUPTIBLE);
+		clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
+		spin_lock(&req->rq_xprt->recv_lock);
+	}
+}
+
 static void xprt_update_rtt(struct rpc_task *task)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
@@ -966,13 +1008,13 @@ void xprt_transmit(struct rpc_task *task)
 			/*
 			 * Add to the list only if we're expecting a reply
 			 */
-			spin_lock_bh(&xprt->transport_lock);
 			/* Update the softirq receive buffer */
 			memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
 					sizeof(req->rq_private_buf));
 			/* Add request to the receive list */
+			spin_lock(&xprt->recv_lock);
 			list_add_tail(&req->rq_list, &xprt->recv);
-			spin_unlock_bh(&xprt->transport_lock);
+			spin_unlock(&xprt->recv_lock);
 			xprt_reset_majortimeo(req);
 			/* Turn off autodisconnect */
 			del_singleshot_timer_sync(&xprt->timer);
@@ -1287,12 +1329,16 @@ void xprt_release(struct rpc_task *task)
 		task->tk_ops->rpc_count_stats(task, task->tk_calldata);
 	else if (task->tk_client)
 		rpc_count_iostats(task, task->tk_client->cl_metrics);
+	spin_lock(&xprt->recv_lock);
+	if (!list_empty(&req->rq_list)) {
+		list_del(&req->rq_list);
+		xprt_wait_on_pinned_rqst(req);
+	}
+	spin_unlock(&xprt->recv_lock);
 	spin_lock_bh(&xprt->transport_lock);
 	xprt->ops->release_xprt(xprt, task);
 	if (xprt->ops->release_request)
 		xprt->ops->release_request(task);
-	if (!list_empty(&req->rq_list))
-		list_del(&req->rq_list);
 	xprt->last_used = jiffies;
 	xprt_schedule_autodisconnect(xprt);
 	spin_unlock_bh(&xprt->transport_lock);
@@ -1318,6 +1364,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 
 	spin_lock_init(&xprt->transport_lock);
 	spin_lock_init(&xprt->reserve_lock);
+	spin_lock_init(&xprt->recv_lock);
 
 	INIT_LIST_HEAD(&xprt->free);
 	INIT_LIST_HEAD(&xprt->recv);
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index ca4d6e4..dfa748a 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1051,7 +1051,7 @@ rpcrdma_reply_handler(struct work_struct *work)
 	 * RPC completion while holding the transport lock to ensure
 	 * the rep, rqst, and rq_task pointers remain stable.
 	 */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
 	if (!rqst)
 		goto out_norqst;
@@ -1136,7 +1136,7 @@ rpcrdma_reply_handler(struct work_struct *work)
 		xprt_release_rqst_cong(rqst->rq_task);
 
 	xprt_complete_rqst(rqst->rq_task, status);
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->recv_lock);
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
 		__func__, xprt, rqst, status);
 	return;
@@ -1187,12 +1187,12 @@ rpcrdma_reply_handler(struct work_struct *work)
 	r_xprt->rx_stats.bad_reply_count++;
 	goto out;
 
-/* The req was still available, but by the time the transport_lock
+/* The req was still available, but by the time the recv_lock
  * was acquired, the rqst and task had been released. Thus the RPC
  * has already been terminated.
  */
 out_norqst:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->recv_lock);
 	rpcrdma_buffer_put(req);
 	dprintk("RPC:       %s: race, no rqst left for req %p\n",
 		__func__, req);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index c676ed0..0d574cd 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -52,7 +52,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
 	if (src->iov_len < 24)
 		goto out_shortreply;
 
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	req = xprt_lookup_rqst(xprt, xid);
 	if (!req)
 		goto out_notfound;
@@ -69,17 +69,20 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
 	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
 		credits = r_xprt->rx_buf.rb_bc_max_requests;
 
+	spin_lock_bh(&xprt->transport_lock);
 	cwnd = xprt->cwnd;
 	xprt->cwnd = credits << RPC_CWNDSHIFT;
 	if (xprt->cwnd > cwnd)
 		xprt_release_rqst_cong(req->rq_task);
+	spin_unlock_bh(&xprt->transport_lock);
+
 
 	ret = 0;
 	xprt_complete_rqst(req->rq_task, rcvbuf->len);
 	rcvbuf->len = 0;
 
 out_unlock:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->recv_lock);
 out:
 	return ret;
 
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 4f154d3..2b91813 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -969,10 +969,12 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
 		return;
 
 	/* Look up and lock the request corresponding to the given XID */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	rovr = xprt_lookup_rqst(xprt, *xp);
 	if (!rovr)
 		goto out_unlock;
+	xprt_pin_rqst(rovr);
+	spin_unlock(&xprt->recv_lock);
 	task = rovr->rq_task;
 
 	copied = rovr->rq_private_buf.buflen;
@@ -981,13 +983,16 @@ static void xs_local_data_read_skb(struct rpc_xprt *xprt,
 
 	if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 		dprintk("RPC:       sk_buff copy failed\n");
-		goto out_unlock;
+		spin_lock(&xprt->recv_lock);
+		goto out_unpin;
 	}
 
+	spin_lock(&xprt->recv_lock);
 	xprt_complete_rqst(task, copied);
-
+out_unpin:
+	xprt_unpin_rqst(rovr);
  out_unlock:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&xprt->recv_lock);
 }
 
 static void xs_local_data_receive(struct sock_xprt *transport)
@@ -1050,10 +1055,12 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
 		return;
 
 	/* Look up and lock the request corresponding to the given XID */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	rovr = xprt_lookup_rqst(xprt, *xp);
 	if (!rovr)
 		goto out_unlock;
+	xprt_pin_rqst(rovr);
+	spin_unlock(&xprt->recv_lock);
 	task = rovr->rq_task;
 
 	if ((copied = rovr->rq_private_buf.buflen) > repsize)
@@ -1062,16 +1069,21 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
 	/* Suck it into the iovec, verify checksum if not done by hw. */
 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
 		__UDPX_INC_STATS(sk, UDP_MIB_INERRORS);
-		goto out_unlock;
+		spin_lock(&xprt->recv_lock);
+		goto out_unpin;
 	}
 
 	__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
 
+	spin_lock_bh(&xprt->transport_lock);
 	xprt_adjust_cwnd(xprt, task, copied);
-	xprt_complete_rqst(task, copied);
-
- out_unlock:
 	spin_unlock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
+	xprt_complete_rqst(task, copied);
+out_unpin:
+	xprt_unpin_rqst(rovr);
+ out_unlock:
+	spin_unlock(&xprt->recv_lock);
 }
 
 static void xs_udp_data_receive(struct sock_xprt *transport)
@@ -1277,25 +1289,12 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
 	}
 
 	len = desc->count;
-	if (len > transport->tcp_reclen - transport->tcp_offset) {
-		struct xdr_skb_reader my_desc;
-
-		len = transport->tcp_reclen - transport->tcp_offset;
-		memcpy(&my_desc, desc, sizeof(my_desc));
-		my_desc.count = len;
-		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
-					  &my_desc, xdr_skb_read_bits);
-		desc->count -= r;
-		desc->offset += r;
-	} else
-		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
+	if (len > transport->tcp_reclen - transport->tcp_offset)
+		desc->count = transport->tcp_reclen - transport->tcp_offset;
+	r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
 					  desc, xdr_skb_read_bits);
 
-	if (r > 0) {
-		transport->tcp_copied += r;
-		transport->tcp_offset += r;
-	}
-	if (r != len) {
+	if (desc->count) {
 		/* Error when copying to the receive buffer,
 		 * usually because we weren't able to allocate
 		 * additional buffer pages. All we can do now
@@ -1315,6 +1314,10 @@ static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
 		return;
 	}
 
+	transport->tcp_copied += r;
+	transport->tcp_offset += r;
+	desc->count = len - r;
+
 	dprintk("RPC:       XID %08x read %zd bytes\n",
 			ntohl(transport->tcp_xid), r);
 	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
@@ -1343,21 +1346,24 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
 	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
 
 	/* Find and lock the request corresponding to this xid */
-	spin_lock_bh(&xprt->transport_lock);
+	spin_lock(&xprt->recv_lock);
 	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
 	if (!req) {
 		dprintk("RPC:       XID %08x request not found!\n",
 				ntohl(transport->tcp_xid));
-		spin_unlock_bh(&xprt->transport_lock);
+		spin_unlock(&xprt->recv_lock);
 		return -1;
 	}
+	xprt_pin_rqst(req);
+	spin_unlock(&xprt->recv_lock);
 
 	xs_tcp_read_common(xprt, desc, req);
 
+	spin_lock(&xprt->recv_lock);
 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
 		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
-
-	spin_unlock_bh(&xprt->transport_lock);
+	xprt_unpin_rqst(req);
+	spin_unlock(&xprt->recv_lock);
 	return 0;
 }
 
@@ -1376,11 +1382,9 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
 				container_of(xprt, struct sock_xprt, xprt);
 	struct rpc_rqst *req;
 
-	/* Look up and lock the request corresponding to the given XID */
-	spin_lock_bh(&xprt->transport_lock);
+	/* Look up the request corresponding to the given XID */
 	req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
 	if (req == NULL) {
-		spin_unlock_bh(&xprt->transport_lock);
 		printk(KERN_WARNING "Callback slot table overflowed\n");
 		xprt_force_disconnect(xprt);
 		return -1;
@@ -1391,7 +1395,6 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
 
 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
 		xprt_complete_bc_request(req, transport->tcp_copied);
-	spin_unlock_bh(&xprt->transport_lock);
 
 	return 0;
 }
@@ -1516,6 +1519,7 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
 		.arg.data = xprt,
 	};
 	unsigned long total = 0;
+	int loop;
 	int read = 0;
 
 	mutex_lock(&transport->recv_mutex);
@@ -1524,20 +1528,20 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
 		goto out;
 
 	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
-	for (;;) {
+	for (loop = 0; loop < 64; loop++) {
 		lock_sock(sk);
 		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
 		if (read <= 0) {
 			clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
 			release_sock(sk);
-			if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
-				break;
-		} else {
-			release_sock(sk);
-			total += read;
+			break;
 		}
+		release_sock(sk);
+		total += read;
 		rd_desc.count = 65536;
 	}
+	if (test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+		queue_work(xprtiod_workqueue, &transport->recv_worker);
 out:
 	mutex_unlock(&transport->recv_mutex);
 	trace_xs_tcp_data_ready(xprt, read, total);