block: switch polling to be bio based

Replace the blk_poll interface that requires the caller to keep a queue
and cookie from the submissions with polling based on the bio.

Polling for the bio itself leads to a few advantages:

 - the cookie construction can made entirely private in blk-mq.c
 - the caller does not need to remember the request_queue and cookie
   separately and thus sidesteps their lifetime issues
 - keeping the device and the cookie inside the bio allows to trivially
   support polling BIOs remapping by stacking drivers
 - a lot of code to propagate the cookie back up the submission path can
   be removed entirely.

Signed-off-by: Christoph Hellwig <hch@lst.de>
Tested-by: Mark Wunderlich <mark.wunderlich@intel.com>
Link: https://lore.kernel.org/r/20211012111226.760968-15-hch@lst.de
Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/block/blk-core.c b/block/blk-core.c
index 8eb0e08..f008c38 100644
--- a/block/blk-core.c
+++ b/block/blk-core.c
@@ -915,25 +915,22 @@ static noinline_for_stack bool submit_bio_checks(struct bio *bio)
 	return false;
 }
 
-static blk_qc_t __submit_bio(struct bio *bio)
+static void __submit_bio(struct bio *bio)
 {
 	struct gendisk *disk = bio->bi_bdev->bd_disk;
-	blk_qc_t ret = BLK_QC_T_NONE;
 
 	if (unlikely(bio_queue_enter(bio) != 0))
-		return BLK_QC_T_NONE;
+		return;
 
 	if (!submit_bio_checks(bio) || !blk_crypto_bio_prep(&bio))
 		goto queue_exit;
-	if (disk->fops->submit_bio) {
-		ret = disk->fops->submit_bio(bio);
-		goto queue_exit;
+	if (!disk->fops->submit_bio) {
+		blk_mq_submit_bio(bio);
+		return;
 	}
-	return blk_mq_submit_bio(bio);
-
+	disk->fops->submit_bio(bio);
 queue_exit:
 	blk_queue_exit(disk->queue);
-	return ret;
 }
 
 /*
@@ -955,10 +952,9 @@ static blk_qc_t __submit_bio(struct bio *bio)
  * bio_list_on_stack[1] contains bios that were submitted before the current
  *	->submit_bio_bio, but that haven't been processed yet.
  */
-static blk_qc_t __submit_bio_noacct(struct bio *bio)
+static void __submit_bio_noacct(struct bio *bio)
 {
 	struct bio_list bio_list_on_stack[2];
-	blk_qc_t ret = BLK_QC_T_NONE;
 
 	BUG_ON(bio->bi_next);
 
@@ -975,7 +971,7 @@ static blk_qc_t __submit_bio_noacct(struct bio *bio)
 		bio_list_on_stack[1] = bio_list_on_stack[0];
 		bio_list_init(&bio_list_on_stack[0]);
 
-		ret = __submit_bio(bio);
+		__submit_bio(bio);
 
 		/*
 		 * Sort new bios into those for a lower level and those for the
@@ -998,22 +994,19 @@ static blk_qc_t __submit_bio_noacct(struct bio *bio)
 	} while ((bio = bio_list_pop(&bio_list_on_stack[0])));
 
 	current->bio_list = NULL;
-	return ret;
 }
 
-static blk_qc_t __submit_bio_noacct_mq(struct bio *bio)
+static void __submit_bio_noacct_mq(struct bio *bio)
 {
 	struct bio_list bio_list[2] = { };
-	blk_qc_t ret;
 
 	current->bio_list = bio_list;
 
 	do {
-		ret = __submit_bio(bio);
+		__submit_bio(bio);
 	} while ((bio = bio_list_pop(&bio_list[0])));
 
 	current->bio_list = NULL;
-	return ret;
 }
 
 /**
@@ -1025,7 +1018,7 @@ static blk_qc_t __submit_bio_noacct_mq(struct bio *bio)
  * systems and other upper level users of the block layer should use
  * submit_bio() instead.
  */
-blk_qc_t submit_bio_noacct(struct bio *bio)
+void submit_bio_noacct(struct bio *bio)
 {
 	/*
 	 * We only want one ->submit_bio to be active at a time, else stack
@@ -1033,14 +1026,12 @@ blk_qc_t submit_bio_noacct(struct bio *bio)
 	 * to collect a list of requests submited by a ->submit_bio method while
 	 * it is active, and then process them after it returned.
 	 */
-	if (current->bio_list) {
+	if (current->bio_list)
 		bio_list_add(&current->bio_list[0], bio);
-		return BLK_QC_T_NONE;
-	}
-
-	if (!bio->bi_bdev->bd_disk->fops->submit_bio)
-		return __submit_bio_noacct_mq(bio);
-	return __submit_bio_noacct(bio);
+	else if (!bio->bi_bdev->bd_disk->fops->submit_bio)
+		__submit_bio_noacct_mq(bio);
+	else
+		__submit_bio_noacct(bio);
 }
 EXPORT_SYMBOL(submit_bio_noacct);
 
@@ -1057,10 +1048,10 @@ EXPORT_SYMBOL(submit_bio_noacct);
  * in @bio.  The bio must NOT be touched by thecaller until ->bi_end_io() has
  * been called.
  */
-blk_qc_t submit_bio(struct bio *bio)
+void submit_bio(struct bio *bio)
 {
 	if (blkcg_punt_bio_submit(bio))
-		return BLK_QC_T_NONE;
+		return;
 
 	/*
 	 * If it's a regular read/write or a barrier with data attached,
@@ -1092,20 +1083,92 @@ blk_qc_t submit_bio(struct bio *bio)
 	if (unlikely(bio_op(bio) == REQ_OP_READ &&
 	    bio_flagged(bio, BIO_WORKINGSET))) {
 		unsigned long pflags;
-		blk_qc_t ret;
 
 		psi_memstall_enter(&pflags);
-		ret = submit_bio_noacct(bio);
+		submit_bio_noacct(bio);
 		psi_memstall_leave(&pflags);
-
-		return ret;
+		return;
 	}
 
-	return submit_bio_noacct(bio);
+	submit_bio_noacct(bio);
 }
 EXPORT_SYMBOL(submit_bio);
 
 /**
+ * bio_poll - poll for BIO completions
+ * @bio: bio to poll for
+ * @flags: BLK_POLL_* flags that control the behavior
+ *
+ * Poll for completions on queue associated with the bio. Returns number of
+ * completed entries found.
+ *
+ * Note: the caller must either be the context that submitted @bio, or
+ * be in a RCU critical section to prevent freeing of @bio.
+ */
+int bio_poll(struct bio *bio, unsigned int flags)
+{
+	struct request_queue *q = bio->bi_bdev->bd_disk->queue;
+	blk_qc_t cookie = READ_ONCE(bio->bi_cookie);
+	int ret;
+
+	if (cookie == BLK_QC_T_NONE ||
+	    !test_bit(QUEUE_FLAG_POLL, &q->queue_flags))
+		return 0;
+
+	if (current->plug)
+		blk_flush_plug_list(current->plug, false);
+
+	if (blk_queue_enter(q, BLK_MQ_REQ_NOWAIT))
+		return 0;
+	if (WARN_ON_ONCE(!queue_is_mq(q)))
+		ret = 0;	/* not yet implemented, should not happen */
+	else
+		ret = blk_mq_poll(q, cookie, flags);
+	blk_queue_exit(q);
+	return ret;
+}
+EXPORT_SYMBOL_GPL(bio_poll);
+
+/*
+ * Helper to implement file_operations.iopoll.  Requires the bio to be stored
+ * in iocb->private, and cleared before freeing the bio.
+ */
+int iocb_bio_iopoll(struct kiocb *kiocb, unsigned int flags)
+{
+	struct bio *bio;
+	int ret = 0;
+
+	/*
+	 * Note: the bio cache only uses SLAB_TYPESAFE_BY_RCU, so bio can
+	 * point to a freshly allocated bio at this point.  If that happens
+	 * we have a few cases to consider:
+	 *
+	 *  1) the bio is beeing initialized and bi_bdev is NULL.  We can just
+	 *     simply nothing in this case
+	 *  2) the bio points to a not poll enabled device.  bio_poll will catch
+	 *     this and return 0
+	 *  3) the bio points to a poll capable device, including but not
+	 *     limited to the one that the original bio pointed to.  In this
+	 *     case we will call into the actual poll method and poll for I/O,
+	 *     even if we don't need to, but it won't cause harm either.
+	 *
+	 * For cases 2) and 3) above the RCU grace period ensures that bi_bdev
+	 * is still allocated. Because partitions hold a reference to the whole
+	 * device bdev and thus disk, the disk is also still valid.  Grabbing
+	 * a reference to the queue in bio_poll() ensures the hctxs and requests
+	 * are still valid as well.
+	 */
+	rcu_read_lock();
+	bio = READ_ONCE(kiocb->private);
+	if (bio && bio->bi_bdev)
+		ret = bio_poll(bio, flags);
+	rcu_read_unlock();
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(iocb_bio_iopoll);
+
+/**
  * blk_cloned_rq_check_limits - Helper function to check a cloned request
  *                              for the new queue limits
  * @q:  the queue