drbd: fix potential deadlock on detach

If we have contention in drbd_al_begin_iod (heavy randon IO),
an administrative request to detach the disk may deadlock
for similar reasons as the recently fixed deadlock if detaching
because of IO-error.

The approach taken here is to either go through the intermediate
cleanup state D_FAILED, or first lock out application io,
don't just go directly to D_DISKLESS.

We need an additional state bit (WAS_IO_ERROR) to distinguish
the -> D_FAILED because of IO-error from other failures.

Sanitize D_ATTACHING -> D_FAILED to D_ATTACHING -> D_DISKLESS.
If only attaching, ldev may be missing still, but would be referenced
from within the after_state_ch for -> D_FAILED, potentially
dereferencing a NULL pointer.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index e0e0bf6..03c15e3 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -852,7 +852,8 @@
 	BITMAP_IO,		/* suspend application io;
 				   once no more io in flight, start bitmap io */
 	BITMAP_IO_QUEUED,       /* Started bitmap IO */
-	GO_DISKLESS,		/* Disk failed, local_cnt reached zero, we are going diskless */
+	GO_DISKLESS,		/* Disk is being detached, on io-error or admin request. */
+	WAS_IO_ERROR,		/* Local disk failed returned IO error */
 	RESYNC_AFTER_NEG,       /* Resync after online grow after the attach&negotiate finished. */
 	NET_CONGESTED,		/* The data socket is congested */
 
@@ -1281,6 +1282,7 @@
 extern int drbd_bmio_clear_n_write(struct drbd_conf *mdev);
 extern int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why);
 extern void drbd_go_diskless(struct drbd_conf *mdev);
+extern void drbd_ldev_destroy(struct drbd_conf *mdev);
 
 
 /* Meta data layout
@@ -1798,17 +1800,17 @@
 	case EP_PASS_ON:
 		if (!forcedetach) {
 			if (__ratelimit(&drbd_ratelimit_state))
-				dev_err(DEV, "Local IO failed in %s."
-					     "Passing error on...\n", where);
+				dev_err(DEV, "Local IO failed in %s.\n", where);
 			break;
 		}
 		/* NOTE fall through to detach case if forcedetach set */
 	case EP_DETACH:
 	case EP_CALL_HELPER:
+		set_bit(WAS_IO_ERROR, &mdev->flags);
 		if (mdev->state.disk > D_FAILED) {
 			_drbd_set_state(_NS(mdev, disk, D_FAILED), CS_HARD, NULL);
-			dev_err(DEV, "Local IO failed in %s."
-				     "Detaching...\n", where);
+			dev_err(DEV,
+				"Local IO failed in %s. Detaching...\n", where);
 		}
 		break;
 	}
@@ -2127,7 +2129,11 @@
 	__release(local);
 	D_ASSERT(i >= 0);
 	if (i == 0) {
+		if (mdev->state.disk == D_DISKLESS)
+			/* even internal references gone, safe to destroy */
+			drbd_ldev_destroy(mdev);
 		if (mdev->state.disk == D_FAILED)
+			/* all application IO references gone. */
 			drbd_go_diskless(mdev);
 		wake_up(&mdev->misc_wait);
 	}
@@ -2138,6 +2144,10 @@
 {
 	int io_allowed;
 
+	/* never get a reference while D_DISKLESS */
+	if (mdev->state.disk == D_DISKLESS)
+		return 0;
+
 	atomic_inc(&mdev->local_cnt);
 	io_allowed = (mdev->state.disk >= mins);
 	if (!io_allowed)
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 8d029b1..992c3ae 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -834,6 +834,15 @@
 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
 		ns.conn = os.conn;
 
+	/* we cannot fail (again) if we already detached */
+	if (ns.disk == D_FAILED && os.disk == D_DISKLESS)
+		ns.disk = D_DISKLESS;
+
+	/* if we are only D_ATTACHING yet,
+	 * we can (and should) go directly to D_DISKLESS. */
+	if (ns.disk == D_FAILED && os.disk == D_ATTACHING)
+		ns.disk = D_DISKLESS;
+
 	/* After C_DISCONNECTING only C_STANDALONE may follow */
 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
 		ns.conn = os.conn;
@@ -1055,7 +1064,15 @@
 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
 		set_bit(DEVICE_DYING, &mdev->flags);
 
-	mdev->state.i = ns.i;
+	/* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
+	 * on the ldev here, to be sure the transition -> D_DISKLESS resp.
+	 * drbd_ldev_destroy() won't happen before our corresponding
+	 * after_state_ch works run, where we put_ldev again. */
+	if ((os.disk != D_FAILED && ns.disk == D_FAILED) ||
+	    (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
+		atomic_inc(&mdev->local_cnt);
+
+	mdev->state = ns;
 	wake_up(&mdev->misc_wait);
 	wake_up(&mdev->state_wait);
 
@@ -1363,63 +1380,64 @@
 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
 
-	/* first half of local IO error */
-	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
-		enum drbd_io_error_p eh = EP_PASS_ON;
+	/* first half of local IO error, failure to attach,
+	 * or administrative detach */
+	if (os.disk != D_FAILED && ns.disk == D_FAILED) {
+		enum drbd_io_error_p eh;
+		int was_io_error;
+		/* corresponding get_ldev was in __drbd_set_state, to serialize
+		 * our cleanup here with the transition to D_DISKLESS,
+		 * so it is safe to dreference ldev here. */
+		eh = mdev->ldev->dc.on_io_error;
+		was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
+
+		/* current state still has to be D_FAILED,
+		 * there is only one way out: to D_DISKLESS,
+		 * and that may only happen after our put_ldev below. */
+		if (mdev->state.disk != D_FAILED)
+			dev_err(DEV,
+				"ASSERT FAILED: disk is %s during detach\n",
+				drbd_disk_str(mdev->state.disk));
 
 		if (drbd_send_state(mdev))
-			dev_warn(DEV, "Notified peer that my disk is broken.\n");
+			dev_warn(DEV, "Notified peer that I am detaching my disk\n");
 		else
-			dev_err(DEV, "Sending state for drbd_io_error() failed\n");
+			dev_err(DEV, "Sending state for detaching disk failed\n");
 
 		drbd_rs_cancel_all(mdev);
 
-		if (get_ldev_if_state(mdev, D_FAILED)) {
-			eh = mdev->ldev->dc.on_io_error;
-			put_ldev(mdev);
-		}
-		if (eh == EP_CALL_HELPER)
+		/* In case we want to get something to stable storage still,
+		 * this may be the last chance.
+		 * Following put_ldev may transition to D_DISKLESS. */
+		drbd_md_sync(mdev);
+		put_ldev(mdev);
+
+		if (was_io_error && eh == EP_CALL_HELPER)
 			drbd_khelper(mdev, "local-io-error");
 	}
 
+        /* second half of local IO error, failure to attach,
+         * or administrative detach,
+         * after local_cnt references have reached zero again */
+        if (os.disk != D_DISKLESS && ns.disk == D_DISKLESS) {
+                /* We must still be diskless,
+                 * re-attach has to be serialized with this! */
+                if (mdev->state.disk != D_DISKLESS)
+                        dev_err(DEV,
+                                "ASSERT FAILED: disk is %s while going diskless\n",
+                                drbd_disk_str(mdev->state.disk));
 
-	/* second half of local IO error handling,
-	 * after local_cnt references have reached zero: */
-	if (os.disk == D_FAILED && ns.disk == D_DISKLESS) {
-		mdev->rs_total = 0;
-		mdev->rs_failed = 0;
-		atomic_set(&mdev->rs_pending_cnt, 0);
-	}
+                mdev->rs_total = 0;
+                mdev->rs_failed = 0;
+                atomic_set(&mdev->rs_pending_cnt, 0);
 
-	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
-		/* We must still be diskless,
-		 * re-attach has to be serialized with this! */
-		if (mdev->state.disk != D_DISKLESS)
-			dev_err(DEV,
-				"ASSERT FAILED: disk is %s while going diskless\n",
-				drbd_disk_str(mdev->state.disk));
-
-		/* we cannot assert local_cnt == 0 here, as get_ldev_if_state
-		 * will inc/dec it frequently. Since we became D_DISKLESS, no
-		 * one has touched the protected members anymore, though, so we
-		 * are safe to free them here. */
 		if (drbd_send_state(mdev))
-			dev_warn(DEV, "Notified peer that I detached my disk.\n");
+			dev_warn(DEV, "Notified peer that I'm now diskless.\n");
 		else
-			dev_err(DEV, "Sending state for detach failed\n");
-
-		lc_destroy(mdev->resync);
-		mdev->resync = NULL;
-		lc_destroy(mdev->act_log);
-		mdev->act_log = NULL;
-		__no_warn(local,
-			drbd_free_bc(mdev->ldev);
-			mdev->ldev = NULL;);
-
-		if (mdev->md_io_tmpp) {
-			__free_page(mdev->md_io_tmpp);
-			mdev->md_io_tmpp = NULL;
-		}
+			dev_err(DEV, "Sending state for being diskless failed\n");
+		/* corresponding get_ldev in __drbd_set_state
+		 * this may finaly trigger drbd_ldev_destroy. */
+		put_ldev(mdev);
 	}
 
 	/* Disks got bigger while they were detached */
@@ -2897,7 +2915,6 @@
 	D_ASSERT(list_empty(&mdev->resync_work.list));
 	D_ASSERT(list_empty(&mdev->unplug_work.list));
 	D_ASSERT(list_empty(&mdev->go_diskless.list));
-
 }
 
 
@@ -3756,19 +3773,31 @@
 	return 1;
 }
 
+void drbd_ldev_destroy(struct drbd_conf *mdev)
+{
+	lc_destroy(mdev->resync);
+	mdev->resync = NULL;
+	lc_destroy(mdev->act_log);
+	mdev->act_log = NULL;
+	__no_warn(local,
+		drbd_free_bc(mdev->ldev);
+		mdev->ldev = NULL;);
+
+	if (mdev->md_io_tmpp) {
+		__free_page(mdev->md_io_tmpp);
+		mdev->md_io_tmpp = NULL;
+	}
+	clear_bit(GO_DISKLESS, &mdev->flags);
+}
+
 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
 {
 	D_ASSERT(mdev->state.disk == D_FAILED);
 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
-	 * the protected members anymore, though, so in the after_state_ch work
-	 * it will be safe to free them. */
+	 * the protected members anymore, though, so once put_ldev reaches zero
+	 * again, it will be safe to free them. */
 	drbd_force_state(mdev, NS(disk, D_DISKLESS));
-	/* We need to wait for return of references checked out while we still
-	 * have been D_FAILED, though (drbd_md_sync, bitmap io). */
-	wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
-
-	clear_bit(GO_DISKLESS, &mdev->flags);
 	return 1;
 }
 
@@ -3777,9 +3806,6 @@
 	D_ASSERT(mdev->state.disk == D_FAILED);
 	if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
 		drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
-		/* don't drbd_queue_work_front,
-		 * we need to serialize with the after_state_ch work
-		 * of the -> D_FAILED transition. */
 }
 
 /**
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index c498c48..0cba7d3 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -870,6 +870,11 @@
 		retcode = ERR_DISK_CONFIGURED;
 		goto fail;
 	}
+	/* It may just now have detached because of IO error.  Make sure
+	 * drbd_ldev_destroy is done already, we may end up here very fast,
+	 * e.g. if someone calls attach from the on-io-error handler,
+	 * to realize a "hot spare" feature (not that I'd recommend that) */
+	wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
 
 	/* allocation not in the IO path, cqueue thread context */
 	nbc = kzalloc(sizeof(struct drbd_backing_dev), GFP_KERNEL);
@@ -1262,7 +1267,7 @@
  force_diskless_dec:
 	put_ldev(mdev);
  force_diskless:
-	drbd_force_state(mdev, NS(disk, D_DISKLESS));
+	drbd_force_state(mdev, NS(disk, D_FAILED));
 	drbd_md_sync(mdev);
  release_bdev2_fail:
 	if (nbc)
@@ -1285,10 +1290,19 @@
 	return 0;
 }
 
+/* Detaching the disk is a process in multiple stages.  First we need to lock
+ * out application IO, in-flight IO, IO stuck in drbd_al_begin_io.
+ * Then we transition to D_DISKLESS, and wait for put_ldev() to return all
+ * internal references as well.
+ * Only then we have finally detached. */
 static int drbd_nl_detach(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp,
 			  struct drbd_nl_cfg_reply *reply)
 {
+	drbd_suspend_io(mdev); /* so no-one is stuck in drbd_al_begin_io */
 	reply->ret_code = drbd_request_state(mdev, NS(disk, D_DISKLESS));
+	if (mdev->state.disk == D_DISKLESS)
+		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
+	drbd_resume_io(mdev);
 	return 0;
 }
 
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index 6ec922c..04a823b 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -3363,7 +3363,7 @@
 		if (ns.conn == C_MASK) {
 			ns.conn = C_CONNECTED;
 			if (mdev->state.disk == D_NEGOTIATING) {
-				drbd_force_state(mdev, NS(disk, D_DISKLESS));
+				drbd_force_state(mdev, NS(disk, D_FAILED));
 			} else if (peer_state.disk == D_NEGOTIATING) {
 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
 				peer_state.disk = D_DISKLESS;