blk-throttle: implement dispatch looping

throtl_select_dispatch() only dispatches throtl_quantum bios on each
invocation.  blk_throtl_dispatch_work_fn() in turn depends on
throtl_schedule_next_dispatch() scheduling the next dispatch window
immediately so that undue delays aren't incurred.  This effectively
chains multiple dispatch work item executions back-to-back when there
are more than throtl_quantum bios to dispatch on a given tick.

There is no reason to finish the current work item just to repeat it
immediately.  This patch makes throtl_schedule_next_dispatch() return
%false without doing anything if the current dispatch window is still
open and updates blk_throtl_dispatch_work_fn() repeat dispatching
after cpu_relax() on %false return.

This change will help implementing hierarchy support as dispatching
will be done from pending_timer and immediate reschedule of timer
function isn't supported and doesn't make much sense.

While this patch changes how dispatch behaves when there are more than
throtl_quantum bios to dispatch on a single tick, the behavior change
is immaterial.

Signed-off-by: Tejun Heo <tj@kernel.org>
Acked-by: Vivek Goyal <vgoyal@redhat.com>
diff --git a/block/blk-throttle.c b/block/blk-throttle.c
index a8d23f0..8ee8e4e 100644
--- a/block/blk-throttle.c
+++ b/block/blk-throttle.c
@@ -467,24 +467,41 @@
 		   expires - jiffies, jiffies);
 }
 
-static void throtl_schedule_next_dispatch(struct throtl_service_queue *sq)
+/**
+ * throtl_schedule_next_dispatch - schedule the next dispatch cycle
+ * @sq: the service_queue to schedule dispatch for
+ * @force: force scheduling
+ *
+ * Arm @sq->pending_timer so that the next dispatch cycle starts on the
+ * dispatch time of the first pending child.  Returns %true if either timer
+ * is armed or there's no pending child left.  %false if the current
+ * dispatch window is still open and the caller should continue
+ * dispatching.
+ *
+ * If @force is %true, the dispatch timer is always scheduled and this
+ * function is guaranteed to return %true.  This is to be used when the
+ * caller can't dispatch itself and needs to invoke pending_timer
+ * unconditionally.  Note that forced scheduling is likely to induce short
+ * delay before dispatch starts even if @sq->first_pending_disptime is not
+ * in the future and thus shouldn't be used in hot paths.
+ */
+static bool throtl_schedule_next_dispatch(struct throtl_service_queue *sq,
+					  bool force)
 {
-	struct throtl_data *td = sq_to_td(sq);
-
 	/* any pending children left? */
 	if (!sq->nr_pending)
-		return;
+		return true;
 
 	update_min_dispatch_time(sq);
 
 	/* is the next dispatch time in the future? */
-	if (time_after(sq->first_pending_disptime, jiffies)) {
+	if (force || time_after(sq->first_pending_disptime, jiffies)) {
 		throtl_schedule_pending_timer(sq, sq->first_pending_disptime);
-		return;
+		return true;
 	}
 
-	/* kick immediate execution */
-	queue_work(kthrotld_workqueue, &td->dispatch_work);
+	/* tell the caller to continue dispatching */
+	return false;
 }
 
 static inline void throtl_start_new_slice(struct throtl_grp *tg, bool rw)
@@ -930,31 +947,39 @@
 					      dispatch_work);
 	struct throtl_service_queue *sq = &td->service_queue;
 	struct request_queue *q = td->queue;
-	unsigned int nr_disp = 0;
 	struct bio_list bio_list_on_stack;
 	struct bio *bio;
 	struct blk_plug plug;
-	int rw;
+	bool dispatched = false;
+	int rw, ret;
 
 	spin_lock_irq(q->queue_lock);
 
 	bio_list_init(&bio_list_on_stack);
 
-	throtl_log(sq, "dispatch nr_queued=%u read=%u write=%u",
-		   td->nr_queued[READ] + td->nr_queued[WRITE],
-		   td->nr_queued[READ], td->nr_queued[WRITE]);
+	while (true) {
+		throtl_log(sq, "dispatch nr_queued=%u read=%u write=%u",
+			   td->nr_queued[READ] + td->nr_queued[WRITE],
+			   td->nr_queued[READ], td->nr_queued[WRITE]);
 
-	nr_disp = throtl_select_dispatch(sq);
-
-	if (nr_disp) {
-		for (rw = READ; rw <= WRITE; rw++) {
-			bio_list_merge(&bio_list_on_stack, &sq->bio_lists[rw]);
-			bio_list_init(&sq->bio_lists[rw]);
+		ret = throtl_select_dispatch(sq);
+		if (ret) {
+			for (rw = READ; rw <= WRITE; rw++) {
+				bio_list_merge(&bio_list_on_stack, &sq->bio_lists[rw]);
+				bio_list_init(&sq->bio_lists[rw]);
+			}
+			throtl_log(sq, "bios disp=%u", ret);
+			dispatched = true;
 		}
-		throtl_log(sq, "bios disp=%u", nr_disp);
-	}
 
-	throtl_schedule_next_dispatch(sq);
+		if (throtl_schedule_next_dispatch(sq, false))
+			break;
+
+		/* this dispatch windows is still open, relax and repeat */
+		spin_unlock_irq(q->queue_lock);
+		cpu_relax();
+		spin_lock_irq(q->queue_lock);
+	}
 
 	spin_unlock_irq(q->queue_lock);
 
@@ -962,7 +987,7 @@
 	 * If we dispatched some requests, unplug the queue to make sure
 	 * immediate dispatch
 	 */
-	if (nr_disp) {
+	if (dispatched) {
 		blk_start_plug(&plug);
 		while((bio = bio_list_pop(&bio_list_on_stack)))
 			generic_make_request(bio);
@@ -1078,7 +1103,7 @@
 
 	if (tg->flags & THROTL_TG_PENDING) {
 		tg_update_disptime(tg);
-		throtl_schedule_next_dispatch(sq->parent_sq);
+		throtl_schedule_next_dispatch(sq->parent_sq, true);
 	}
 
 	blkg_conf_finish(&ctx);
@@ -1229,10 +1254,15 @@
 	throtl_add_bio_tg(bio, tg);
 	throttled = true;
 
-	/* update @tg's dispatch time if @tg was empty before @bio */
+	/*
+	 * Update @tg's dispatch time and force schedule dispatch if @tg
+	 * was empty before @bio.  The forced scheduling isn't likely to
+	 * cause undue delay as @bio is likely to be dispatched directly if
+	 * its @tg's disptime is not in the future.
+	 */
 	if (tg->flags & THROTL_TG_WAS_EMPTY) {
 		tg_update_disptime(tg);
-		throtl_schedule_next_dispatch(tg->service_queue.parent_sq);
+		throtl_schedule_next_dispatch(tg->service_queue.parent_sq, true);
 	}
 
 out_unlock: