SUNRPC: Dequeue the request from the receive queue while we're re-encoding

Ensure that we dequeue the request from the transport receive queue
while we're re-encoding to prevent issues like use-after-free when
we release the bvec.

Fixes: 7536908982047 ("SUNRPC: Ensure the bvecs are reset when we re-encode...")
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
Cc: stable@vger.kernel.org # v4.20+
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 13e108b..d783e15 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -352,6 +352,7 @@ bool			xprt_prepare_transmit(struct rpc_task *task);
 void			xprt_request_enqueue_transmit(struct rpc_task *task);
 void			xprt_request_enqueue_receive(struct rpc_task *task);
 void			xprt_request_wait_receive(struct rpc_task *task);
+void			xprt_request_dequeue_xprt(struct rpc_task *task);
 bool			xprt_request_need_retransmit(struct rpc_task *task);
 void			xprt_transmit(struct rpc_task *task);
 void			xprt_end_transmit(struct rpc_task *task);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index d8679b60..0359466 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1862,6 +1862,7 @@ rpc_xdr_encode(struct rpc_task *task)
 		     req->rq_rbuffer,
 		     req->rq_rcvsize);
 
+	req->rq_reply_bytes_recvd = 0;
 	req->rq_snd_buf.head[0].iov_len = 0;
 	xdr_init_encode(&xdr, &req->rq_snd_buf,
 			req->rq_snd_buf.head[0].iov_base, req);
@@ -1881,6 +1882,8 @@ call_encode(struct rpc_task *task)
 	if (!rpc_task_need_encode(task))
 		goto out;
 	dprint_status(task);
+	/* Dequeue task from the receive queue while we're encoding */
+	xprt_request_dequeue_xprt(task);
 	/* Encode here so that rpcsec_gss can use correct sequence number. */
 	rpc_xdr_encode(task);
 	/* Did the encode result in an error condition? */
@@ -2501,9 +2504,6 @@ call_decode(struct rpc_task *task)
 		return;
 	case -EAGAIN:
 		task->tk_status = 0;
-		xdr_free_bvec(&req->rq_rcv_buf);
-		req->rq_reply_bytes_recvd = 0;
-		req->rq_rcv_buf.len = 0;
 		if (task->tk_client->cl_discrtry)
 			xprt_conditional_disconnect(req->rq_xprt,
 						    req->rq_connect_cookie);
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 783748d..02d5b21 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1324,6 +1324,36 @@ xprt_request_dequeue_transmit(struct rpc_task *task)
 }
 
 /**
+ * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmit and receive queues, and ensure that
+ * it is not pinned by the receive work item.
+ */
+void
+xprt_request_dequeue_xprt(struct rpc_task *task)
+{
+	struct rpc_rqst	*req = task->tk_rqstp;
+	struct rpc_xprt *xprt = req->rq_xprt;
+
+	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
+	    test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
+	    xprt_is_pinned_rqst(req)) {
+		spin_lock(&xprt->queue_lock);
+		xprt_request_dequeue_transmit_locked(task);
+		xprt_request_dequeue_receive_locked(task);
+		while (xprt_is_pinned_rqst(req)) {
+			set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
+			spin_unlock(&xprt->queue_lock);
+			xprt_wait_on_pinned_rqst(req);
+			spin_lock(&xprt->queue_lock);
+			clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
+		}
+		spin_unlock(&xprt->queue_lock);
+	}
+}
+
+/**
  * xprt_request_prepare - prepare an encoded request for transport
  * @req: pointer to rpc_rqst
  *
@@ -1754,28 +1784,6 @@ void xprt_retry_reserve(struct rpc_task *task)
 	xprt_do_reserve(xprt, task);
 }
 
-static void
-xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req)
-{
-	struct rpc_xprt *xprt = req->rq_xprt;
-
-	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
-	    test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
-	    xprt_is_pinned_rqst(req)) {
-		spin_lock(&xprt->queue_lock);
-		xprt_request_dequeue_transmit_locked(task);
-		xprt_request_dequeue_receive_locked(task);
-		while (xprt_is_pinned_rqst(req)) {
-			set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
-			spin_unlock(&xprt->queue_lock);
-			xprt_wait_on_pinned_rqst(req);
-			spin_lock(&xprt->queue_lock);
-			clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
-		}
-		spin_unlock(&xprt->queue_lock);
-	}
-}
-
 /**
  * xprt_release - release an RPC request slot
  * @task: task which is finished with the slot
@@ -1795,7 +1803,7 @@ void xprt_release(struct rpc_task *task)
 	}
 
 	xprt = req->rq_xprt;
-	xprt_request_dequeue_all(task, req);
+	xprt_request_dequeue_xprt(task);
 	spin_lock(&xprt->transport_lock);
 	xprt->ops->release_xprt(xprt, task);
 	if (xprt->ops->release_request)