dm thin: fix a race condition between discarding and provisioning a block

The discard passdown was being issued after the block was unmapped,
which meant the block could be reprovisioned whilst the passdown discard
was still in flight.

We can only identify unshared blocks (safe to do a passdown a discard
to) once they're unmapped and their ref count hits zero.  Block ref
counts are now used to guard against concurrent allocation of these
blocks that are being discarded.  So now we unmap the block, issue
passdown discards, and the immediately increment ref counts for regions
that have been discarded via passed down (this is safe because
allocation occurs within the same thread).  We then decrement ref counts
once the passdown discard IO is complete -- signaling these blocks may
now be allocated.

This fixes the potential for corruption that was reported here:
https://www.redhat.com/archives/dm-devel/2016-June/msg00311.html

Reported-by: Dennis Yang <dennisyang@qnap.com>
Signed-off-by: Joe Thornber <ejt@redhat.com>
Signed-off-by: Mike Snitzer <snitzer@redhat.com>
diff --git a/drivers/md/dm-thin-metadata.c b/drivers/md/dm-thin-metadata.c
index 43824d7..a15091a 100644
--- a/drivers/md/dm-thin-metadata.c
+++ b/drivers/md/dm-thin-metadata.c
@@ -1677,6 +1677,36 @@
 	return r;
 }
 
+int dm_pool_inc_data_range(struct dm_pool_metadata *pmd, dm_block_t b, dm_block_t e)
+{
+	int r = 0;
+
+	down_write(&pmd->root_lock);
+	for (; b != e; b++) {
+		r = dm_sm_inc_block(pmd->data_sm, b);
+		if (r)
+			break;
+	}
+	up_write(&pmd->root_lock);
+
+	return r;
+}
+
+int dm_pool_dec_data_range(struct dm_pool_metadata *pmd, dm_block_t b, dm_block_t e)
+{
+	int r = 0;
+
+	down_write(&pmd->root_lock);
+	for (; b != e; b++) {
+		r = dm_sm_dec_block(pmd->data_sm, b);
+		if (r)
+			break;
+	}
+	up_write(&pmd->root_lock);
+
+	return r;
+}
+
 bool dm_thin_changed_this_transaction(struct dm_thin_device *td)
 {
 	int r;
diff --git a/drivers/md/dm-thin-metadata.h b/drivers/md/dm-thin-metadata.h
index a938bab..35e954e 100644
--- a/drivers/md/dm-thin-metadata.h
+++ b/drivers/md/dm-thin-metadata.h
@@ -197,6 +197,9 @@
 
 int dm_pool_block_is_used(struct dm_pool_metadata *pmd, dm_block_t b, bool *result);
 
+int dm_pool_inc_data_range(struct dm_pool_metadata *pmd, dm_block_t b, dm_block_t e);
+int dm_pool_dec_data_range(struct dm_pool_metadata *pmd, dm_block_t b, dm_block_t e);
+
 /*
  * Returns -ENOSPC if the new size is too small and already allocated
  * blocks would be lost.
diff --git a/drivers/md/dm-thin.c b/drivers/md/dm-thin.c
index 5f9e3d7..197ea20 100644
--- a/drivers/md/dm-thin.c
+++ b/drivers/md/dm-thin.c
@@ -253,6 +253,7 @@
 	struct bio_list deferred_flush_bios;
 	struct list_head prepared_mappings;
 	struct list_head prepared_discards;
+	struct list_head prepared_discards_pt2;
 	struct list_head active_thins;
 
 	struct dm_deferred_set *shared_read_ds;
@@ -269,6 +270,7 @@
 
 	process_mapping_fn process_prepared_mapping;
 	process_mapping_fn process_prepared_discard;
+	process_mapping_fn process_prepared_discard_pt2;
 
 	struct dm_bio_prison_cell **cell_sort_array;
 };
@@ -1001,7 +1003,8 @@
 
 /*----------------------------------------------------------------*/
 
-static void passdown_double_checking_shared_status(struct dm_thin_new_mapping *m)
+static void passdown_double_checking_shared_status(struct dm_thin_new_mapping *m,
+						   struct bio *discard_parent)
 {
 	/*
 	 * We've already unmapped this range of blocks, but before we
@@ -1014,7 +1017,7 @@
 	dm_block_t b = m->data_block, e, end = m->data_block + m->virt_end - m->virt_begin;
 	struct discard_op op;
 
-	begin_discard(&op, tc, m->bio);
+	begin_discard(&op, tc, discard_parent);
 	while (b != end) {
 		/* find start of unmapped run */
 		for (; b < end; b++) {
@@ -1049,27 +1052,100 @@
 	end_discard(&op, r);
 }
 
-static void process_prepared_discard_passdown(struct dm_thin_new_mapping *m)
+static void queue_passdown_pt2(struct dm_thin_new_mapping *m)
+{
+	unsigned long flags;
+	struct pool *pool = m->tc->pool;
+
+	spin_lock_irqsave(&pool->lock, flags);
+	list_add_tail(&m->list, &pool->prepared_discards_pt2);
+	spin_unlock_irqrestore(&pool->lock, flags);
+	wake_worker(pool);
+}
+
+static void passdown_endio(struct bio *bio)
+{
+	/*
+	 * It doesn't matter if the passdown discard failed, we still want
+	 * to unmap (we ignore err).
+	 */
+	queue_passdown_pt2(bio->bi_private);
+}
+
+static void process_prepared_discard_passdown_pt1(struct dm_thin_new_mapping *m)
+{
+	int r;
+	struct thin_c *tc = m->tc;
+	struct pool *pool = tc->pool;
+	struct bio *discard_parent;
+	dm_block_t data_end = m->data_block + (m->virt_end - m->virt_begin);
+
+	/*
+	 * Only this thread allocates blocks, so we can be sure that the
+	 * newly unmapped blocks will not be allocated before the end of
+	 * the function.
+	 */
+	r = dm_thin_remove_range(tc->td, m->virt_begin, m->virt_end);
+	if (r) {
+		metadata_operation_failed(pool, "dm_thin_remove_range", r);
+		bio_io_error(m->bio);
+		cell_defer_no_holder(tc, m->cell);
+		mempool_free(m, pool->mapping_pool);
+		return;
+	}
+
+	discard_parent = bio_alloc(GFP_NOIO, 1);
+	if (!discard_parent) {
+		DMWARN("%s: unable to allocate top level discard bio for passdown. Skipping passdown.",
+		       dm_device_name(tc->pool->pool_md));
+		queue_passdown_pt2(m);
+
+	} else {
+		discard_parent->bi_end_io = passdown_endio;
+		discard_parent->bi_private = m;
+
+		if (m->maybe_shared)
+			passdown_double_checking_shared_status(m, discard_parent);
+		else {
+			struct discard_op op;
+
+			begin_discard(&op, tc, discard_parent);
+			r = issue_discard(&op, m->data_block, data_end);
+			end_discard(&op, r);
+		}
+	}
+
+	/*
+	 * Increment the unmapped blocks.  This prevents a race between the
+	 * passdown io and reallocation of freed blocks.
+	 */
+	r = dm_pool_inc_data_range(pool->pmd, m->data_block, data_end);
+	if (r) {
+		metadata_operation_failed(pool, "dm_pool_inc_data_range", r);
+		bio_io_error(m->bio);
+		cell_defer_no_holder(tc, m->cell);
+		mempool_free(m, pool->mapping_pool);
+		return;
+	}
+}
+
+static void process_prepared_discard_passdown_pt2(struct dm_thin_new_mapping *m)
 {
 	int r;
 	struct thin_c *tc = m->tc;
 	struct pool *pool = tc->pool;
 
-	r = dm_thin_remove_range(tc->td, m->virt_begin, m->virt_end);
+	/*
+	 * The passdown has completed, so now we can decrement all those
+	 * unmapped blocks.
+	 */
+	r = dm_pool_dec_data_range(pool->pmd, m->data_block,
+				   m->data_block + (m->virt_end - m->virt_begin));
 	if (r) {
-		metadata_operation_failed(pool, "dm_thin_remove_range", r);
+		metadata_operation_failed(pool, "dm_pool_dec_data_range", r);
 		bio_io_error(m->bio);
-
-	} else if (m->maybe_shared) {
-		passdown_double_checking_shared_status(m);
-
-	} else {
-		struct discard_op op;
-		begin_discard(&op, tc, m->bio);
-		r = issue_discard(&op, m->data_block,
-				  m->data_block + (m->virt_end - m->virt_begin));
-		end_discard(&op, r);
-	}
+	} else
+		bio_endio(m->bio);
 
 	cell_defer_no_holder(tc, m->cell);
 	mempool_free(m, pool->mapping_pool);
@@ -2215,6 +2291,8 @@
 	throttle_work_update(&pool->throttle);
 	process_prepared(pool, &pool->prepared_discards, &pool->process_prepared_discard);
 	throttle_work_update(&pool->throttle);
+	process_prepared(pool, &pool->prepared_discards_pt2, &pool->process_prepared_discard_pt2);
+	throttle_work_update(&pool->throttle);
 	process_deferred_bios(pool);
 	throttle_work_complete(&pool->throttle);
 }
@@ -2343,7 +2421,8 @@
 
 	if (passdown_enabled(pt)) {
 		pool->process_discard_cell = process_discard_cell_passdown;
-		pool->process_prepared_discard = process_prepared_discard_passdown;
+		pool->process_prepared_discard = process_prepared_discard_passdown_pt1;
+		pool->process_prepared_discard_pt2 = process_prepared_discard_passdown_pt2;
 	} else {
 		pool->process_discard_cell = process_discard_cell_no_passdown;
 		pool->process_prepared_discard = process_prepared_discard_no_passdown;
@@ -2830,6 +2909,7 @@
 	bio_list_init(&pool->deferred_flush_bios);
 	INIT_LIST_HEAD(&pool->prepared_mappings);
 	INIT_LIST_HEAD(&pool->prepared_discards);
+	INIT_LIST_HEAD(&pool->prepared_discards_pt2);
 	INIT_LIST_HEAD(&pool->active_thins);
 	pool->low_water_triggered = false;
 	pool->suspended = true;