io_uring: Fix broken links with offloading

io_sq_thread() processes sqes by 8 without considering links. As a
result, links will be randomely subdivided.

The easiest way to fix it is to call io_get_sqring() inside
io_submit_sqes() as do io_ring_submit().

Downsides:
1. This removes optimisation of not grabbing mm_struct for fixed files
2. It submitting all sqes in one go, without finer-grained sheduling
with cq processing.

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 0e141d9..949c82a 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -735,6 +735,14 @@ static unsigned io_cqring_events(struct io_rings *rings)
 	return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
 }
 
+static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
+{
+	struct io_rings *rings = ctx->rings;
+
+	/* make sure SQ entry isn't read before tail */
+	return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
+}
+
 /*
  * Find and free completed poll iocbs
  */
@@ -2560,8 +2568,8 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
 	return false;
 }
 
-static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
-			  unsigned int nr, bool has_user, bool mm_fault)
+static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
+			  bool has_user, bool mm_fault)
 {
 	struct io_submit_state state, *statep = NULL;
 	struct io_kiocb *link = NULL;
@@ -2575,6 +2583,11 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 	}
 
 	for (i = 0; i < nr; i++) {
+		struct sqe_submit s;
+
+		if (!io_get_sqring(ctx, &s))
+			break;
+
 		/*
 		 * If previous wasn't linked and we have a linked command,
 		 * that's the end of the chain. Submit the previous link.
@@ -2584,9 +2597,9 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 			link = NULL;
 			shadow_req = NULL;
 		}
-		prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
+		prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
 
-		if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) {
+		if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
 			if (!shadow_req) {
 				shadow_req = io_get_req(ctx, NULL);
 				if (unlikely(!shadow_req))
@@ -2594,18 +2607,18 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 				shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
 				refcount_dec(&shadow_req->refs);
 			}
-			shadow_req->sequence = sqes[i].sequence;
+			shadow_req->sequence = s.sequence;
 		}
 
 out:
 		if (unlikely(mm_fault)) {
-			io_cqring_add_event(ctx, sqes[i].sqe->user_data,
+			io_cqring_add_event(ctx, s.sqe->user_data,
 						-EFAULT);
 		} else {
-			sqes[i].has_user = has_user;
-			sqes[i].needs_lock = true;
-			sqes[i].needs_fixed_file = true;
-			io_submit_sqe(ctx, &sqes[i], statep, &link);
+			s.has_user = has_user;
+			s.needs_lock = true;
+			s.needs_fixed_file = true;
+			io_submit_sqe(ctx, &s, statep, &link);
 			submitted++;
 		}
 	}
@@ -2620,7 +2633,6 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 
 static int io_sq_thread(void *data)
 {
-	struct sqe_submit sqes[IO_IOPOLL_BATCH];
 	struct io_ring_ctx *ctx = data;
 	struct mm_struct *cur_mm = NULL;
 	mm_segment_t old_fs;
@@ -2635,8 +2647,8 @@ static int io_sq_thread(void *data)
 
 	timeout = inflight = 0;
 	while (!kthread_should_park()) {
-		bool all_fixed, mm_fault = false;
-		int i;
+		bool mm_fault = false;
+		unsigned int to_submit;
 
 		if (inflight) {
 			unsigned nr_events = 0;
@@ -2656,7 +2668,8 @@ static int io_sq_thread(void *data)
 				timeout = jiffies + ctx->sq_thread_idle;
 		}
 
-		if (!io_get_sqring(ctx, &sqes[0])) {
+		to_submit = io_sqring_entries(ctx);
+		if (!to_submit) {
 			/*
 			 * We're polling. If we're within the defined idle
 			 * period, then let us spin without work before going
@@ -2687,7 +2700,8 @@ static int io_sq_thread(void *data)
 			/* make sure to read SQ tail after writing flags */
 			smp_mb();
 
-			if (!io_get_sqring(ctx, &sqes[0])) {
+			to_submit = io_sqring_entries(ctx);
+			if (!to_submit) {
 				if (kthread_should_park()) {
 					finish_wait(&ctx->sqo_wait, &wait);
 					break;
@@ -2705,19 +2719,8 @@ static int io_sq_thread(void *data)
 			ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
 		}
 
-		i = 0;
-		all_fixed = true;
-		do {
-			if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
-				all_fixed = false;
-
-			i++;
-			if (i == ARRAY_SIZE(sqes))
-				break;
-		} while (io_get_sqring(ctx, &sqes[i]));
-
 		/* Unless all new commands are FIXED regions, grab mm */
-		if (!all_fixed && !cur_mm) {
+		if (!cur_mm) {
 			mm_fault = !mmget_not_zero(ctx->sqo_mm);
 			if (!mm_fault) {
 				use_mm(ctx->sqo_mm);
@@ -2725,8 +2728,9 @@ static int io_sq_thread(void *data)
 			}
 		}
 
-		inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
-						mm_fault);
+		to_submit = min(to_submit, ctx->sq_entries);
+		inflight += io_submit_sqes(ctx, to_submit, cur_mm != NULL,
+					   mm_fault);
 
 		/* Commit SQ ring head once we've consumed all SQEs */
 		io_commit_sqring(ctx);