drbd: make sure disk cleanup happens in worker context

The recent fix to put_ldev() (correct ordering of access to local_cnt
and state.disk; memory barrier in __drbd_set_state) guarantees
that the cleanup happens exactly once.

However it does not yet guarantee that the cleanup happens from worker
context, the last put_ldev() may still happen from atomic context,
which must not happen: blkdev_put() may sleep.

Fix this by scheduling the cleanup to the worker instead,
using a couple more bits in device->flags and a new helper,
drbd_device_post_work().

Generalized the "resync progress" work to cover these new work bits.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index bafb62e..00bf490 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1813,10 +1813,9 @@
 	mutex_unlock(device->state_mutex);
 }
 
-static void update_on_disk_bitmap(struct drbd_device *device)
+static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
 {
 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
-	bool resync_done = test_and_clear_bit(RS_DONE, &device->flags);
 	device->rs_last_bcast = jiffies;
 
 	if (!get_ldev(device))
@@ -1832,7 +1831,87 @@
 	put_ldev(device);
 }
 
-static void try_update_all_on_disk_bitmaps(struct drbd_connection *connection)
+static void drbd_ldev_destroy(struct drbd_device *device)
+{
+	lc_destroy(device->resync);
+	device->resync = NULL;
+	lc_destroy(device->act_log);
+	device->act_log = NULL;
+	__no_warn(local,
+		drbd_free_ldev(device->ldev);
+		device->ldev = NULL;);
+	clear_bit(GOING_DISKLESS, &device->flags);
+	wake_up(&device->misc_wait);
+}
+
+static void go_diskless(struct drbd_device *device)
+{
+	D_ASSERT(device, device->state.disk == D_FAILED);
+	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
+	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
+	 * the protected members anymore, though, so once put_ldev reaches zero
+	 * again, it will be safe to free them. */
+
+	/* Try to write changed bitmap pages, read errors may have just
+	 * set some bits outside the area covered by the activity log.
+	 *
+	 * If we have an IO error during the bitmap writeout,
+	 * we will want a full sync next time, just in case.
+	 * (Do we want a specific meta data flag for this?)
+	 *
+	 * If that does not make it to stable storage either,
+	 * we cannot do anything about that anymore.
+	 *
+	 * We still need to check if both bitmap and ldev are present, we may
+	 * end up here after a failed attach, before ldev was even assigned.
+	 */
+	if (device->bitmap && device->ldev) {
+		/* An interrupted resync or similar is allowed to recounts bits
+		 * while we detach.
+		 * Any modifications would not be expected anymore, though.
+		 */
+		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
+					"detach", BM_LOCKED_TEST_ALLOWED)) {
+			if (test_bit(WAS_READ_ERROR, &device->flags)) {
+				drbd_md_set_flag(device, MDF_FULL_SYNC);
+				drbd_md_sync(device);
+			}
+		}
+	}
+
+	drbd_force_state(device, NS(disk, D_DISKLESS));
+}
+
+#define WORK_PENDING(work_bit, todo)	(todo & (1UL << work_bit))
+static void do_device_work(struct drbd_device *device, const unsigned long todo)
+{
+	if (WORK_PENDING(RS_DONE, todo) ||
+	    WORK_PENDING(RS_PROGRESS, todo))
+		update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo));
+	if (WORK_PENDING(GO_DISKLESS, todo))
+		go_diskless(device);
+	if (WORK_PENDING(DESTROY_DISK, todo))
+		drbd_ldev_destroy(device);
+}
+
+#define DRBD_DEVICE_WORK_MASK	\
+	((1UL << GO_DISKLESS)	\
+	|(1UL << DESTROY_DISK)	\
+	|(1UL << RS_PROGRESS)	\
+	|(1UL << RS_DONE)	\
+	)
+
+static unsigned long get_work_bits(unsigned long *flags)
+{
+	unsigned long old, new;
+	do {
+		old = *flags;
+		new = old & ~DRBD_DEVICE_WORK_MASK;
+	} while (cmpxchg(flags, old, new) != old);
+	return old & DRBD_DEVICE_WORK_MASK;
+}
+
+static void do_unqueued_work(struct drbd_connection *connection)
 {
 	struct drbd_peer_device *peer_device;
 	int vnr;
@@ -1840,12 +1919,13 @@
 	rcu_read_lock();
 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
 		struct drbd_device *device = peer_device->device;
-		if (!test_and_clear_bit(RS_PROGRESS, &device->flags))
+		unsigned long todo = get_work_bits(&device->flags);
+		if (!todo)
 			continue;
 
 		kref_get(&device->kref);
 		rcu_read_unlock();
-		update_on_disk_bitmap(device);
+		do_device_work(device, todo);
 		kref_put(&device->kref, drbd_destroy_device);
 		rcu_read_lock();
 	}
@@ -1927,7 +2007,7 @@
 			maybe_send_barrier(connection,
 					connection->send.current_epoch_nr + 1);
 
-		if (test_bit(CONN_RS_PROGRESS, &connection->flags))
+		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
 			break;
 
 		/* drbd_send() may have called flush_signals() */
@@ -1972,8 +2052,8 @@
 		if (list_empty(&work_list))
 			wait_for_work(connection, &work_list);
 
-		if (test_and_clear_bit(CONN_RS_PROGRESS, &connection->flags))
-			try_update_all_on_disk_bitmaps(connection);
+		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags))
+			do_unqueued_work(connection);
 
 		if (signal_pending(current)) {
 			flush_signals(current);
@@ -1998,13 +2078,15 @@
 	}
 
 	do {
+		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags))
+			do_unqueued_work(connection);
 		while (!list_empty(&work_list)) {
 			w = list_first_entry(&work_list, struct drbd_work, list);
 			list_del_init(&w->list);
 			w->cb(w, 1);
 		}
 		dequeue_work_batch(&connection->sender_work, &work_list);
-	} while (!list_empty(&work_list));
+	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
 
 	rcu_read_lock();
 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {