Merge tag 'for-6.2/io_uring-next-2022-12-08' of git://git.kernel.dk/linux

Pull io_uring updates part two from Jens Axboe:

 - Misc fixes (me, Lin)

 - Series from Pavel extending the single task exclusive ring mode,
   yielding nice improvements for the common case of having a single
   ring per thread (Pavel)

 - Cleanup for MSG_RING, removing our IOPOLL hack (Pavel)

 - Further poll cleanups and fixes (Pavel)

 - Misc cleanups and fixes (Pavel)

* tag 'for-6.2/io_uring-next-2022-12-08' of git://git.kernel.dk/linux: (22 commits)
  io_uring/msg_ring: flag target ring as having task_work, if needed
  io_uring: skip spinlocking for ->task_complete
  io_uring: do msg_ring in target task via tw
  io_uring: extract a io_msg_install_complete helper
  io_uring: get rid of double locking
  io_uring: never run tw and fallback in parallel
  io_uring: use tw for putting rsrc
  io_uring: force multishot CQEs into task context
  io_uring: complete all requests in task context
  io_uring: don't check overflow flush failures
  io_uring: skip overflow CQE posting for dying ring
  io_uring: improve io_double_lock_ctx fail handling
  io_uring: dont remove file from msg_ring reqs
  io_uring: reshuffle issue_flags
  io_uring: don't reinstall quiesce node for each tw
  io_uring: improve rsrc quiesce refs checks
  io_uring: don't raw spin unlock to match cq_lock
  io_uring: combine poll tw handlers
  io_uring: improve poll warning handling
  io_uring: remove ctx variable in io_poll_check_events
  ...
diff --git a/include/linux/io_uring.h b/include/linux/io_uring.h
index 0ded9e2..934e5dd 100644
--- a/include/linux/io_uring.h
+++ b/include/linux/io_uring.h
@@ -9,16 +9,17 @@
 enum io_uring_cmd_flags {
 	IO_URING_F_COMPLETE_DEFER	= 1,
 	IO_URING_F_UNLOCKED		= 2,
+	/* the request is executed from poll, it should not be freed */
+	IO_URING_F_MULTISHOT		= 4,
+	/* executed by io-wq */
+	IO_URING_F_IOWQ			= 8,
 	/* int's last bit, sign checks are usually faster than a bit test */
 	IO_URING_F_NONBLOCK		= INT_MIN,
 
 	/* ctx state flags, for URING_CMD */
-	IO_URING_F_SQE128		= 4,
-	IO_URING_F_CQE32		= 8,
-	IO_URING_F_IOPOLL		= 16,
-
-	/* the request is executed from poll, it should not be freed */
-	IO_URING_F_MULTISHOT		= 32,
+	IO_URING_F_SQE128		= (1 << 8),
+	IO_URING_F_CQE32		= (1 << 9),
+	IO_URING_F_IOPOLL		= (1 << 10),
 };
 
 struct io_uring_cmd {
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
index accdfec..dcd8a56 100644
--- a/include/linux/io_uring_types.h
+++ b/include/linux/io_uring_types.h
@@ -208,6 +208,8 @@ struct io_ring_ctx {
 		unsigned int		drain_disabled: 1;
 		unsigned int		has_evfd: 1;
 		unsigned int		syscall_iopoll: 1;
+		/* all CQEs should be posted only by the submitter task */
+		unsigned int		task_complete: 1;
 	} ____cacheline_aligned_in_smp;
 
 	/* submission data */
@@ -326,6 +328,7 @@ struct io_ring_ctx {
 	struct io_rsrc_data		*buf_data;
 
 	struct delayed_work		rsrc_put_work;
+	struct callback_head		rsrc_put_tw;
 	struct llist_head		rsrc_put_llist;
 	struct list_head		rsrc_ref_list;
 	spinlock_t			rsrc_ref_lock;
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 3436e0b..b521186 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -149,6 +149,7 @@ static void io_clean_op(struct io_kiocb *req);
 static void io_queue_sqe(struct io_kiocb *req);
 static void io_move_task_work_from_local(struct io_ring_ctx *ctx);
 static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
+static __cold void io_fallback_tw(struct io_uring_task *tctx);
 
 static struct kmem_cache *req_cachep;
 
@@ -326,6 +327,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	spin_lock_init(&ctx->rsrc_ref_lock);
 	INIT_LIST_HEAD(&ctx->rsrc_ref_list);
 	INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
+	init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw);
 	init_llist_head(&ctx->rsrc_put_llist);
 	init_llist_head(&ctx->work_llist);
 	INIT_LIST_HEAD(&ctx->tctx_list);
@@ -582,13 +584,25 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
 		io_eventfd_flush_signal(ctx);
 }
 
+static inline void __io_cq_lock(struct io_ring_ctx *ctx)
+	__acquires(ctx->completion_lock)
+{
+	if (!ctx->task_complete)
+		spin_lock(&ctx->completion_lock);
+}
+
+static inline void __io_cq_unlock(struct io_ring_ctx *ctx)
+{
+	if (!ctx->task_complete)
+		spin_unlock(&ctx->completion_lock);
+}
+
 /* keep it inlined for io_submit_flush_completions() */
-static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx)
+static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
 	__releases(ctx->completion_lock)
 {
 	io_commit_cqring(ctx);
-	spin_unlock(&ctx->completion_lock);
-
+	__io_cq_unlock(ctx);
 	io_commit_cqring_flush(ctx);
 	io_cqring_wake(ctx);
 }
@@ -596,17 +610,37 @@ static inline void io_cq_unlock_post_inline(struct io_ring_ctx *ctx)
 void io_cq_unlock_post(struct io_ring_ctx *ctx)
 	__releases(ctx->completion_lock)
 {
-	io_cq_unlock_post_inline(ctx);
+	io_commit_cqring(ctx);
+	spin_unlock(&ctx->completion_lock);
+	io_commit_cqring_flush(ctx);
+	io_cqring_wake(ctx);
 }
 
 /* Returns true if there are no backlogged entries after the flush */
-static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
+static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
 {
-	bool all_flushed;
+	struct io_overflow_cqe *ocqe;
+	LIST_HEAD(list);
+
+	io_cq_lock(ctx);
+	list_splice_init(&ctx->cq_overflow_list, &list);
+	clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
+	io_cq_unlock(ctx);
+
+	while (!list_empty(&list)) {
+		ocqe = list_first_entry(&list, struct io_overflow_cqe, list);
+		list_del(&ocqe->list);
+		kfree(ocqe);
+	}
+}
+
+/* Returns true if there are no backlogged entries after the flush */
+static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx)
+{
 	size_t cqe_size = sizeof(struct io_uring_cqe);
 
-	if (!force && __io_cqring_events(ctx) == ctx->cq_entries)
-		return false;
+	if (__io_cqring_events(ctx) == ctx->cq_entries)
+		return;
 
 	if (ctx->flags & IORING_SETUP_CQE32)
 		cqe_size <<= 1;
@@ -616,43 +650,32 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
 		struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true);
 		struct io_overflow_cqe *ocqe;
 
-		if (!cqe && !force)
+		if (!cqe)
 			break;
 		ocqe = list_first_entry(&ctx->cq_overflow_list,
 					struct io_overflow_cqe, list);
-		if (cqe)
-			memcpy(cqe, &ocqe->cqe, cqe_size);
-		else
-			io_account_cq_overflow(ctx);
-
+		memcpy(cqe, &ocqe->cqe, cqe_size);
 		list_del(&ocqe->list);
 		kfree(ocqe);
 	}
 
-	all_flushed = list_empty(&ctx->cq_overflow_list);
-	if (all_flushed) {
+	if (list_empty(&ctx->cq_overflow_list)) {
 		clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
 		atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
 	}
-
 	io_cq_unlock_post(ctx);
-	return all_flushed;
 }
 
-static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
+static void io_cqring_overflow_flush(struct io_ring_ctx *ctx)
 {
-	bool ret = true;
-
 	if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
 		/* iopoll syncs against uring_lock, not completion_lock */
 		if (ctx->flags & IORING_SETUP_IOPOLL)
 			mutex_lock(&ctx->uring_lock);
-		ret = __io_cqring_overflow_flush(ctx, false);
+		__io_cqring_overflow_flush(ctx);
 		if (ctx->flags & IORING_SETUP_IOPOLL)
 			mutex_unlock(&ctx->uring_lock);
 	}
-
-	return ret;
 }
 
 void __io_put_task(struct task_struct *task, int nr)
@@ -777,12 +800,13 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow)
 	return &rings->cqes[off];
 }
 
-static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
-			    bool allow_overflow)
+static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
+			      u32 cflags)
 {
 	struct io_uring_cqe *cqe;
 
-	lockdep_assert_held(&ctx->completion_lock);
+	if (!ctx->task_complete)
+		lockdep_assert_held(&ctx->completion_lock);
 
 	ctx->cq_extra++;
 
@@ -805,10 +829,6 @@ static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32
 		}
 		return true;
 	}
-
-	if (allow_overflow)
-		return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
-
 	return false;
 }
 
@@ -822,7 +842,17 @@ static void __io_flush_post_cqes(struct io_ring_ctx *ctx)
 	for (i = 0; i < state->cqes_count; i++) {
 		struct io_uring_cqe *cqe = &state->cqes[i];
 
-		io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags, true);
+		if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) {
+			if (ctx->task_complete) {
+				spin_lock(&ctx->completion_lock);
+				io_cqring_event_overflow(ctx, cqe->user_data,
+							cqe->res, cqe->flags, 0, 0);
+				spin_unlock(&ctx->completion_lock);
+			} else {
+				io_cqring_event_overflow(ctx, cqe->user_data,
+							cqe->res, cqe->flags, 0, 0);
+			}
+		}
 	}
 	state->cqes_count = 0;
 }
@@ -833,7 +863,10 @@ static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u
 	bool filled;
 
 	io_cq_lock(ctx);
-	filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow);
+	filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
+	if (!filled && allow_overflow)
+		filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
+
 	io_cq_unlock_post(ctx);
 	return filled;
 }
@@ -857,10 +890,10 @@ bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32
 	lockdep_assert_held(&ctx->uring_lock);
 
 	if (ctx->submit_state.cqes_count == length) {
-		io_cq_lock(ctx);
+		__io_cq_lock(ctx);
 		__io_flush_post_cqes(ctx);
 		/* no need to flush - flush is deferred */
-		spin_unlock(&ctx->completion_lock);
+		__io_cq_unlock_post(ctx);
 	}
 
 	/* For defered completions this is not as strict as it is otherwise,
@@ -915,8 +948,11 @@ static void __io_req_complete_post(struct io_kiocb *req)
 
 void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
 {
-	if (!(issue_flags & IO_URING_F_UNLOCKED) ||
-	    !(req->ctx->flags & IORING_SETUP_IOPOLL)) {
+	if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) {
+		req->io_task_work.func = io_req_task_complete;
+		io_req_task_work_add(req);
+	} else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
+		   !(req->ctx->flags & IORING_SETUP_IOPOLL)) {
 		__io_req_complete_post(req);
 	} else {
 		struct io_ring_ctx *ctx = req->ctx;
@@ -1139,10 +1175,17 @@ void tctx_task_work(struct callback_head *cb)
 	struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
 						  task_work);
 	struct llist_node fake = {};
-	struct llist_node *node = io_llist_xchg(&tctx->task_list, &fake);
+	struct llist_node *node;
 	unsigned int loops = 1;
-	unsigned int count = handle_tw_list(node, &ctx, &uring_locked, NULL);
+	unsigned int count;
 
+	if (unlikely(current->flags & PF_EXITING)) {
+		io_fallback_tw(tctx);
+		return;
+	}
+
+	node = io_llist_xchg(&tctx->task_list, &fake);
+	count = handle_tw_list(node, &ctx, &uring_locked, NULL);
 	node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
 	while (node != &fake) {
 		loops++;
@@ -1385,7 +1428,7 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
 	struct io_wq_work_node *node, *prev;
 	struct io_submit_state *state = &ctx->submit_state;
 
-	io_cq_lock(ctx);
+	__io_cq_lock(ctx);
 	/* must come first to preserve CQE ordering in failure cases */
 	if (state->cqes_count)
 		__io_flush_post_cqes(ctx);
@@ -1393,10 +1436,18 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
 		struct io_kiocb *req = container_of(node, struct io_kiocb,
 					    comp_list);
 
-		if (!(req->flags & REQ_F_CQE_SKIP))
-			__io_fill_cqe_req(ctx, req);
+		if (!(req->flags & REQ_F_CQE_SKIP) &&
+		    unlikely(!__io_fill_cqe_req(ctx, req))) {
+			if (ctx->task_complete) {
+				spin_lock(&ctx->completion_lock);
+				io_req_cqe_overflow(req);
+				spin_unlock(&ctx->completion_lock);
+			} else {
+				io_req_cqe_overflow(req);
+			}
+		}
 	}
-	io_cq_unlock_post_inline(ctx);
+	__io_cq_unlock_post(ctx);
 
 	if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
 		io_free_batch_list(ctx, state->compl_reqs.first);
@@ -1467,7 +1518,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
 	check_cq = READ_ONCE(ctx->check_cq);
 	if (unlikely(check_cq)) {
 		if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
-			__io_cqring_overflow_flush(ctx, false);
+			__io_cqring_overflow_flush(ctx);
 		/*
 		 * Similarly do not spin if we have not informed the user of any
 		 * dropped CQE.
@@ -1799,7 +1850,7 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
 		return ret;
 
 	/* If the op doesn't have a file, we're not polling for it */
-	if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file)
+	if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue)
 		io_iopoll_req_issued(req, issue_flags);
 
 	return 0;
@@ -1808,8 +1859,6 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
 int io_poll_issue(struct io_kiocb *req, bool *locked)
 {
 	io_tw_lock(req->ctx, locked);
-	if (unlikely(req->task->flags & PF_EXITING))
-		return -EFAULT;
 	return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
 				 IO_URING_F_COMPLETE_DEFER);
 }
@@ -1826,7 +1875,7 @@ void io_wq_submit_work(struct io_wq_work *work)
 {
 	struct io_kiocb *req = container_of(work, struct io_kiocb, work);
 	const struct io_op_def *def = &io_op_defs[req->opcode];
-	unsigned int issue_flags = IO_URING_F_UNLOCKED;
+	unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
 	bool needs_poll = false;
 	int ret = 0, err = -ECANCELED;
 
@@ -2482,11 +2531,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 
 	trace_io_uring_cqring_wait(ctx, min_events);
 	do {
-		/* if we can't even flush overflow, don't wait for more */
-		if (!io_cqring_overflow_flush(ctx)) {
-			ret = -EBUSY;
-			break;
-		}
+		io_cqring_overflow_flush(ctx);
 		prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
 						TASK_INTERRUPTIBLE);
 		ret = io_cqring_wait_schedule(ctx, &iowq, timeout);
@@ -2637,8 +2682,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 		__io_sqe_buffers_unregister(ctx);
 	if (ctx->file_data)
 		__io_sqe_files_unregister(ctx);
-	if (ctx->rings)
-		__io_cqring_overflow_flush(ctx, true);
+	io_cqring_overflow_kill(ctx);
 	io_eventfd_unregister(ctx);
 	io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
 	io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
@@ -2781,6 +2825,12 @@ static __cold void io_ring_exit_work(struct work_struct *work)
 	 * as nobody else will be looking for them.
 	 */
 	do {
+		if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
+			mutex_lock(&ctx->uring_lock);
+			io_cqring_overflow_kill(ctx);
+			mutex_unlock(&ctx->uring_lock);
+		}
+
 		if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
 			io_move_task_work_from_local(ctx);
 
@@ -2846,8 +2896,6 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 
 	mutex_lock(&ctx->uring_lock);
 	percpu_ref_kill(&ctx->refs);
-	if (ctx->rings)
-		__io_cqring_overflow_flush(ctx, true);
 	xa_for_each(&ctx->personalities, index, creds)
 		io_unregister_personality(ctx, index);
 	if (ctx->rings)
@@ -3489,6 +3537,11 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
 	if (!ctx)
 		return -ENOMEM;
 
+	if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
+	    !(ctx->flags & IORING_SETUP_IOPOLL) &&
+	    !(ctx->flags & IORING_SETUP_SQPOLL))
+		ctx->task_complete = true;
+
 	/*
 	 * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
 	 * space applications don't need to do io completion events
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index 062899b..1b2f0b2 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -93,6 +93,11 @@ static inline void io_cq_lock(struct io_ring_ctx *ctx)
 	spin_lock(&ctx->completion_lock);
 }
 
+static inline void io_cq_unlock(struct io_ring_ctx *ctx)
+{
+	spin_unlock(&ctx->completion_lock);
+}
+
 void io_cq_unlock_post(struct io_ring_ctx *ctx);
 
 static inline struct io_uring_cqe *io_get_cqe_overflow(struct io_ring_ctx *ctx,
@@ -128,7 +133,7 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
 	 */
 	cqe = io_get_cqe(ctx);
 	if (unlikely(!cqe))
-		return io_req_cqe_overflow(req);
+		return false;
 
 	trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
 				req->cqe.res, req->cqe.flags,
@@ -151,6 +156,14 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
 	return true;
 }
 
+static inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
+				   struct io_kiocb *req)
+{
+	if (likely(__io_fill_cqe_req(ctx, req)))
+		return true;
+	return io_req_cqe_overflow(req);
+}
+
 static inline void req_set_fail(struct io_kiocb *req)
 {
 	req->flags |= REQ_F_FAIL;
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index afb543a..2d3cd94 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -15,6 +15,8 @@
 
 struct io_msg {
 	struct file			*file;
+	struct file			*src_file;
+	struct callback_head		tw;
 	u64 user_data;
 	u32 len;
 	u32 cmd;
@@ -23,6 +25,34 @@ struct io_msg {
 	u32 flags;
 };
 
+void io_msg_ring_cleanup(struct io_kiocb *req)
+{
+	struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
+
+	if (WARN_ON_ONCE(!msg->src_file))
+		return;
+
+	fput(msg->src_file);
+	msg->src_file = NULL;
+}
+
+static void io_msg_tw_complete(struct callback_head *head)
+{
+	struct io_msg *msg = container_of(head, struct io_msg, tw);
+	struct io_kiocb *req = cmd_to_io_kiocb(msg);
+	struct io_ring_ctx *target_ctx = req->file->private_data;
+	int ret = 0;
+
+	if (current->flags & PF_EXITING)
+		ret = -EOWNERDEAD;
+	else if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0))
+		ret = -EOVERFLOW;
+
+	if (ret < 0)
+		req_set_fail(req);
+	io_req_queue_tw_complete(req, ret);
+}
+
 static int io_msg_ring_data(struct io_kiocb *req)
 {
 	struct io_ring_ctx *target_ctx = req->file->private_data;
@@ -31,23 +61,29 @@ static int io_msg_ring_data(struct io_kiocb *req)
 	if (msg->src_fd || msg->dst_fd || msg->flags)
 		return -EINVAL;
 
+	if (target_ctx->task_complete && current != target_ctx->submitter_task) {
+		init_task_work(&msg->tw, io_msg_tw_complete);
+		if (task_work_add(target_ctx->submitter_task, &msg->tw,
+				  TWA_SIGNAL_NO_IPI))
+			return -EOWNERDEAD;
+
+		atomic_or(IORING_SQ_TASKRUN, &target_ctx->rings->sq_flags);
+		return IOU_ISSUE_SKIP_COMPLETE;
+	}
+
 	if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0))
 		return 0;
 
 	return -EOVERFLOW;
 }
 
-static void io_double_unlock_ctx(struct io_ring_ctx *ctx,
-				 struct io_ring_ctx *octx,
+static void io_double_unlock_ctx(struct io_ring_ctx *octx,
 				 unsigned int issue_flags)
 {
-	if (issue_flags & IO_URING_F_UNLOCKED)
-		mutex_unlock(&ctx->uring_lock);
 	mutex_unlock(&octx->uring_lock);
 }
 
-static int io_double_lock_ctx(struct io_ring_ctx *ctx,
-			      struct io_ring_ctx *octx,
+static int io_double_lock_ctx(struct io_ring_ctx *octx,
 			      unsigned int issue_flags)
 {
 	/*
@@ -60,56 +96,49 @@ static int io_double_lock_ctx(struct io_ring_ctx *ctx,
 			return -EAGAIN;
 		return 0;
 	}
-
-	/* Always grab smallest value ctx first. We know ctx != octx. */
-	if (ctx < octx) {
-		mutex_lock(&ctx->uring_lock);
-		mutex_lock(&octx->uring_lock);
-	} else {
-		mutex_lock(&octx->uring_lock);
-		mutex_lock(&ctx->uring_lock);
-	}
-
+	mutex_lock(&octx->uring_lock);
 	return 0;
 }
 
-static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
+static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
+	struct io_ring_ctx *ctx = req->ctx;
+	struct file *file = NULL;
+	unsigned long file_ptr;
+	int idx = msg->src_fd;
+
+	io_ring_submit_lock(ctx, issue_flags);
+	if (likely(idx < ctx->nr_user_files)) {
+		idx = array_index_nospec(idx, ctx->nr_user_files);
+		file_ptr = io_fixed_file_slot(&ctx->file_table, idx)->file_ptr;
+		file = (struct file *) (file_ptr & FFS_MASK);
+		if (file)
+			get_file(file);
+	}
+	io_ring_submit_unlock(ctx, issue_flags);
+	return file;
+}
+
+static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_ring_ctx *target_ctx = req->file->private_data;
 	struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
-	struct io_ring_ctx *ctx = req->ctx;
-	unsigned long file_ptr;
-	struct file *src_file;
+	struct file *src_file = msg->src_file;
 	int ret;
 
-	if (target_ctx == ctx)
-		return -EINVAL;
-
-	ret = io_double_lock_ctx(ctx, target_ctx, issue_flags);
-	if (unlikely(ret))
-		return ret;
-
-	ret = -EBADF;
-	if (unlikely(msg->src_fd >= ctx->nr_user_files))
-		goto out_unlock;
-
-	msg->src_fd = array_index_nospec(msg->src_fd, ctx->nr_user_files);
-	file_ptr = io_fixed_file_slot(&ctx->file_table, msg->src_fd)->file_ptr;
-	if (!file_ptr)
-		goto out_unlock;
-
-	src_file = (struct file *) (file_ptr & FFS_MASK);
-	get_file(src_file);
+	if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
+		return -EAGAIN;
 
 	ret = __io_fixed_fd_install(target_ctx, src_file, msg->dst_fd);
-	if (ret < 0) {
-		fput(src_file);
+	if (ret < 0)
 		goto out_unlock;
-	}
+
+	msg->src_file = NULL;
+	req->flags &= ~REQ_F_NEED_CLEANUP;
 
 	if (msg->flags & IORING_MSG_RING_CQE_SKIP)
 		goto out_unlock;
-
 	/*
 	 * If this fails, the target still received the file descriptor but
 	 * wasn't notified of the fact. This means that if this request
@@ -119,10 +148,51 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
 	if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0))
 		ret = -EOVERFLOW;
 out_unlock:
-	io_double_unlock_ctx(ctx, target_ctx, issue_flags);
+	io_double_unlock_ctx(target_ctx, issue_flags);
 	return ret;
 }
 
+static void io_msg_tw_fd_complete(struct callback_head *head)
+{
+	struct io_msg *msg = container_of(head, struct io_msg, tw);
+	struct io_kiocb *req = cmd_to_io_kiocb(msg);
+	int ret = -EOWNERDEAD;
+
+	if (!(current->flags & PF_EXITING))
+		ret = io_msg_install_complete(req, IO_URING_F_UNLOCKED);
+	if (ret < 0)
+		req_set_fail(req);
+	io_req_queue_tw_complete(req, ret);
+}
+
+static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_ring_ctx *target_ctx = req->file->private_data;
+	struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
+	struct io_ring_ctx *ctx = req->ctx;
+	struct file *src_file = msg->src_file;
+
+	if (target_ctx == ctx)
+		return -EINVAL;
+	if (!src_file) {
+		src_file = io_msg_grab_file(req, issue_flags);
+		if (!src_file)
+			return -EBADF;
+		msg->src_file = src_file;
+		req->flags |= REQ_F_NEED_CLEANUP;
+	}
+
+	if (target_ctx->task_complete && current != target_ctx->submitter_task) {
+		init_task_work(&msg->tw, io_msg_tw_fd_complete);
+		if (task_work_add(target_ctx->submitter_task, &msg->tw,
+				  TWA_SIGNAL))
+			return -EOWNERDEAD;
+
+		return IOU_ISSUE_SKIP_COMPLETE;
+	}
+	return io_msg_install_complete(req, issue_flags);
+}
+
 int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 {
 	struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
@@ -130,6 +200,7 @@ int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	if (unlikely(sqe->buf_index || sqe->personality))
 		return -EINVAL;
 
+	msg->src_file = NULL;
 	msg->user_data = READ_ONCE(sqe->off);
 	msg->len = READ_ONCE(sqe->len);
 	msg->cmd = READ_ONCE(sqe->addr);
@@ -164,12 +235,11 @@ int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags)
 	}
 
 done:
-	if (ret < 0)
+	if (ret < 0) {
+		if (ret == -EAGAIN || ret == IOU_ISSUE_SKIP_COMPLETE)
+			return ret;
 		req_set_fail(req);
+	}
 	io_req_set_res(req, ret, 0);
-	/* put file to avoid an attempt to IOPOLL the req */
-	if (!(req->flags & REQ_F_FIXED_FILE))
-		io_put_file(req->file);
-	req->file = NULL;
 	return IOU_OK;
 }
diff --git a/io_uring/msg_ring.h b/io_uring/msg_ring.h
index fb9601f..3987ee6 100644
--- a/io_uring/msg_ring.h
+++ b/io_uring/msg_ring.h
@@ -2,3 +2,4 @@
 
 int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
 int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags);
+void io_msg_ring_cleanup(struct io_kiocb *req);
diff --git a/io_uring/net.c b/io_uring/net.c
index cb83132..5229976 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -67,6 +67,19 @@ struct io_sr_msg {
 	struct io_kiocb 		*notif;
 };
 
+static inline bool io_check_multishot(struct io_kiocb *req,
+				      unsigned int issue_flags)
+{
+	/*
+	 * When ->locked_cq is set we only allow to post CQEs from the original
+	 * task context. Usual request completions will be handled in other
+	 * generic paths but multipoll may decide to post extra cqes.
+	 */
+	return !(issue_flags & IO_URING_F_IOWQ) ||
+		!(issue_flags & IO_URING_F_MULTISHOT) ||
+		!req->ctx->task_complete;
+}
+
 int io_shutdown_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 {
 	struct io_shutdown *shutdown = io_kiocb_to_cmd(req, struct io_shutdown);
@@ -730,6 +743,9 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	    (sr->flags & IORING_RECVSEND_POLL_FIRST))
 		return io_setup_async_msg(req, kmsg, issue_flags);
 
+	if (!io_check_multishot(req, issue_flags))
+		return io_setup_async_msg(req, kmsg, issue_flags);
+
 retry_multishot:
 	if (io_do_buffer_select(req)) {
 		void __user *buf;
@@ -829,6 +845,9 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 	    (sr->flags & IORING_RECVSEND_POLL_FIRST))
 		return -EAGAIN;
 
+	if (!io_check_multishot(req, issue_flags))
+		return -EAGAIN;
+
 	sock = sock_from_file(req->file);
 	if (unlikely(!sock))
 		return -ENOTSOCK;
@@ -1280,6 +1299,8 @@ int io_accept(struct io_kiocb *req, unsigned int issue_flags)
 	struct file *file;
 	int ret, fd;
 
+	if (!io_check_multishot(req, issue_flags))
+		return -EAGAIN;
 retry:
 	if (!fixed) {
 		fd = __get_unused_fd_flags(accept->flags, accept->nofile);
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index 83dc0f9..3aa0d65 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -63,6 +63,7 @@ const struct io_op_def io_op_defs[] = {
 		.audit_skip		= 1,
 		.ioprio			= 1,
 		.iopoll			= 1,
+		.iopoll_queue		= 1,
 		.async_size		= sizeof(struct io_async_rw),
 		.name			= "READV",
 		.prep			= io_prep_rw,
@@ -80,6 +81,7 @@ const struct io_op_def io_op_defs[] = {
 		.audit_skip		= 1,
 		.ioprio			= 1,
 		.iopoll			= 1,
+		.iopoll_queue		= 1,
 		.async_size		= sizeof(struct io_async_rw),
 		.name			= "WRITEV",
 		.prep			= io_prep_rw,
@@ -103,6 +105,7 @@ const struct io_op_def io_op_defs[] = {
 		.audit_skip		= 1,
 		.ioprio			= 1,
 		.iopoll			= 1,
+		.iopoll_queue		= 1,
 		.async_size		= sizeof(struct io_async_rw),
 		.name			= "READ_FIXED",
 		.prep			= io_prep_rw,
@@ -118,6 +121,7 @@ const struct io_op_def io_op_defs[] = {
 		.audit_skip		= 1,
 		.ioprio			= 1,
 		.iopoll			= 1,
+		.iopoll_queue		= 1,
 		.async_size		= sizeof(struct io_async_rw),
 		.name			= "WRITE_FIXED",
 		.prep			= io_prep_rw,
@@ -277,6 +281,7 @@ const struct io_op_def io_op_defs[] = {
 		.audit_skip		= 1,
 		.ioprio			= 1,
 		.iopoll			= 1,
+		.iopoll_queue		= 1,
 		.async_size		= sizeof(struct io_async_rw),
 		.name			= "READ",
 		.prep			= io_prep_rw,
@@ -292,6 +297,7 @@ const struct io_op_def io_op_defs[] = {
 		.audit_skip		= 1,
 		.ioprio			= 1,
 		.iopoll			= 1,
+		.iopoll_queue		= 1,
 		.async_size		= sizeof(struct io_async_rw),
 		.name			= "WRITE",
 		.prep			= io_prep_rw,
@@ -439,6 +445,7 @@ const struct io_op_def io_op_defs[] = {
 		.name			= "MSG_RING",
 		.prep			= io_msg_ring_prep,
 		.issue			= io_msg_ring,
+		.cleanup		= io_msg_ring_cleanup,
 	},
 	[IORING_OP_FSETXATTR] = {
 		.needs_file = 1,
@@ -481,6 +488,7 @@ const struct io_op_def io_op_defs[] = {
 		.plug			= 1,
 		.name			= "URING_CMD",
 		.iopoll			= 1,
+		.iopoll_queue		= 1,
 		.async_size		= uring_cmd_pdu_size(1),
 		.prep			= io_uring_cmd_prep,
 		.issue			= io_uring_cmd,
diff --git a/io_uring/opdef.h b/io_uring/opdef.h
index 3efe06d..df7e13d 100644
--- a/io_uring/opdef.h
+++ b/io_uring/opdef.h
@@ -25,6 +25,8 @@ struct io_op_def {
 	unsigned		ioprio : 1;
 	/* supports iopoll */
 	unsigned		iopoll : 1;
+	/* have to be put into the iopoll list */
+	unsigned		iopoll_queue : 1;
 	/* opcode specific path will handle ->async_data allocation if needed */
 	unsigned		manual_alloc : 1;
 	/* size of async data needed, if any */
diff --git a/io_uring/poll.c b/io_uring/poll.c
index 599ba28..ee7da61 100644
--- a/io_uring/poll.c
+++ b/io_uring/poll.c
@@ -237,7 +237,6 @@ enum {
  */
 static int io_poll_check_events(struct io_kiocb *req, bool *locked)
 {
-	struct io_ring_ctx *ctx = req->ctx;
 	int v, ret;
 
 	/* req->task == current here, checking PF_EXITING is safe */
@@ -247,27 +246,30 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
 	do {
 		v = atomic_read(&req->poll_refs);
 
-		/* tw handler should be the owner, and so have some references */
-		if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK)))
-			return IOU_POLL_DONE;
-		if (v & IO_POLL_CANCEL_FLAG)
-			return -ECANCELED;
-		/*
-		 * cqe.res contains only events of the first wake up
-		 * and all others are be lost. Redo vfs_poll() to get
-		 * up to date state.
-		 */
-		if ((v & IO_POLL_REF_MASK) != 1)
-			req->cqe.res = 0;
-		if (v & IO_POLL_RETRY_FLAG) {
-			req->cqe.res = 0;
+		if (unlikely(v != 1)) {
+			/* tw should be the owner and so have some refs */
+			if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK)))
+				return IOU_POLL_NO_ACTION;
+			if (v & IO_POLL_CANCEL_FLAG)
+				return -ECANCELED;
 			/*
-			 * We won't find new events that came in between
-			 * vfs_poll and the ref put unless we clear the flag
-			 * in advance.
+			 * cqe.res contains only events of the first wake up
+			 * and all others are to be lost. Redo vfs_poll() to get
+			 * up to date state.
 			 */
-			atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs);
-			v &= ~IO_POLL_RETRY_FLAG;
+			if ((v & IO_POLL_REF_MASK) != 1)
+				req->cqe.res = 0;
+
+			if (v & IO_POLL_RETRY_FLAG) {
+				req->cqe.res = 0;
+				/*
+				 * We won't find new events that came in between
+				 * vfs_poll and the ref put unless we clear the
+				 * flag in advance.
+				 */
+				atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs);
+				v &= ~IO_POLL_RETRY_FLAG;
+			}
 		}
 
 		/* the mask was stashed in __io_poll_execute */
@@ -286,7 +288,7 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
 			__poll_t mask = mangle_poll(req->cqe.res &
 						    req->apoll_events);
 
-			if (!io_aux_cqe(ctx, *locked, req->cqe.user_data,
+			if (!io_aux_cqe(req->ctx, *locked, req->cqe.user_data,
 					mask, IORING_CQE_F_MORE, false)) {
 				io_req_set_res(req, mask, 0);
 				return IOU_POLL_REMOVE_POLL_USE_RES;
@@ -319,50 +321,38 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
 	ret = io_poll_check_events(req, locked);
 	if (ret == IOU_POLL_NO_ACTION)
 		return;
-
-	if (ret == IOU_POLL_DONE) {
-		struct io_poll *poll = io_kiocb_to_cmd(req, struct io_poll);
-		req->cqe.res = mangle_poll(req->cqe.res & poll->events);
-	} else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) {
-		req->cqe.res = ret;
-		req_set_fail(req);
-	}
-
 	io_poll_remove_entries(req);
 	io_poll_tw_hash_eject(req, locked);
 
-	io_req_set_res(req, req->cqe.res, 0);
-	io_req_task_complete(req, locked);
-}
+	if (req->opcode == IORING_OP_POLL_ADD) {
+		if (ret == IOU_POLL_DONE) {
+			struct io_poll *poll;
 
-static void io_apoll_task_func(struct io_kiocb *req, bool *locked)
-{
-	int ret;
+			poll = io_kiocb_to_cmd(req, struct io_poll);
+			req->cqe.res = mangle_poll(req->cqe.res & poll->events);
+		} else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) {
+			req->cqe.res = ret;
+			req_set_fail(req);
+		}
 
-	ret = io_poll_check_events(req, locked);
-	if (ret == IOU_POLL_NO_ACTION)
-		return;
-
-	io_tw_lock(req->ctx, locked);
-	io_poll_remove_entries(req);
-	io_poll_tw_hash_eject(req, locked);
-
-	if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
+		io_req_set_res(req, req->cqe.res, 0);
 		io_req_task_complete(req, locked);
-	else if (ret == IOU_POLL_DONE)
-		io_req_task_submit(req, locked);
-	else
-		io_req_defer_failed(req, ret);
+	} else {
+		io_tw_lock(req->ctx, locked);
+
+		if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
+			io_req_task_complete(req, locked);
+		else if (ret == IOU_POLL_DONE)
+			io_req_task_submit(req, locked);
+		else
+			io_req_defer_failed(req, ret);
+	}
 }
 
 static void __io_poll_execute(struct io_kiocb *req, int mask)
 {
 	io_req_set_res(req, mask, 0);
-
-	if (req->opcode == IORING_OP_POLL_ADD)
-		req->io_task_work.func = io_poll_task_func;
-	else
-		req->io_task_work.func = io_apoll_task_func;
+	req->io_task_work.func = io_poll_task_func;
 
 	trace_io_uring_task_add(req, mask);
 	io_req_task_work_add(req);
diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c
index 1336082..18de10c 100644
--- a/io_uring/rsrc.c
+++ b/io_uring/rsrc.c
@@ -204,6 +204,14 @@ void io_rsrc_put_work(struct work_struct *work)
 	}
 }
 
+void io_rsrc_put_tw(struct callback_head *cb)
+{
+	struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
+					       rsrc_put_tw);
+
+	io_rsrc_put_work(&ctx->rsrc_put_work.work);
+}
+
 void io_wait_rsrc_data(struct io_rsrc_data *data)
 {
 	if (data && !atomic_dec_and_test(&data->refs))
@@ -242,8 +250,15 @@ static __cold void io_rsrc_node_ref_zero(struct percpu_ref *ref)
 	}
 	spin_unlock_irqrestore(&ctx->rsrc_ref_lock, flags);
 
-	if (first_add)
-		mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay);
+	if (!first_add)
+		return;
+
+	if (ctx->submitter_task) {
+		if (!task_work_add(ctx->submitter_task, &ctx->rsrc_put_tw,
+				   ctx->notify_method))
+			return;
+	}
+	mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay);
 }
 
 static struct io_rsrc_node *io_rsrc_node_alloc(void)
@@ -309,46 +324,41 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
 	/* As we may drop ->uring_lock, other task may have started quiesce */
 	if (data->quiesce)
 		return -ENXIO;
+	ret = io_rsrc_node_switch_start(ctx);
+	if (ret)
+		return ret;
+	io_rsrc_node_switch(ctx, data);
+
+	/* kill initial ref, already quiesced if zero */
+	if (atomic_dec_and_test(&data->refs))
+		return 0;
 
 	data->quiesce = true;
+	mutex_unlock(&ctx->uring_lock);
 	do {
-		ret = io_rsrc_node_switch_start(ctx);
-		if (ret)
-			break;
-		io_rsrc_node_switch(ctx, data);
-
-		/* kill initial ref, already quiesced if zero */
-		if (atomic_dec_and_test(&data->refs))
-			break;
-		mutex_unlock(&ctx->uring_lock);
-
 		ret = io_run_task_work_sig(ctx);
-		if (ret < 0)
-			goto reinit;
+		if (ret < 0) {
+			atomic_inc(&data->refs);
+			/* wait for all works potentially completing data->done */
+			flush_delayed_work(&ctx->rsrc_put_work);
+			reinit_completion(&data->done);
+			mutex_lock(&ctx->uring_lock);
+			break;
+		}
 
 		flush_delayed_work(&ctx->rsrc_put_work);
 		ret = wait_for_completion_interruptible(&data->done);
 		if (!ret) {
 			mutex_lock(&ctx->uring_lock);
-			if (atomic_read(&data->refs) > 0) {
-				/*
-				 * it has been revived by another thread while
-				 * we were unlocked
-				 */
-				mutex_unlock(&ctx->uring_lock);
-			} else {
+			if (atomic_read(&data->refs) <= 0)
 				break;
-			}
+			/*
+			 * it has been revived by another thread while
+			 * we were unlocked
+			 */
+			mutex_unlock(&ctx->uring_lock);
 		}
-
-reinit:
-		atomic_inc(&data->refs);
-		/* wait for all works potentially completing data->done */
-		flush_delayed_work(&ctx->rsrc_put_work);
-		reinit_completion(&data->done);
-
-		mutex_lock(&ctx->uring_lock);
-	} while (ret >= 0);
+	} while (1);
 	data->quiesce = false;
 
 	return ret;
diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h
index 81445a4..2b87436 100644
--- a/io_uring/rsrc.h
+++ b/io_uring/rsrc.h
@@ -53,6 +53,7 @@ struct io_mapped_ubuf {
 	struct bio_vec	bvec[];
 };
 
+void io_rsrc_put_tw(struct callback_head *cb);
 void io_rsrc_put_work(struct work_struct *work);
 void io_rsrc_refs_refill(struct io_ring_ctx *ctx);
 void io_wait_rsrc_data(struct io_rsrc_data *data);