[PATCH] ocfs2: dlm recovery fixes

when starting lock mastery (excepting the recovery lock) wait on any nodes
needing recovery. fix one instance where lock resources were left attached
to the recovery list after recovery completed.  ensure that the node_down
code is run uniformly regardless of which node found the dead node first.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 78ac3a0..940be4c 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -239,6 +239,8 @@
 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
 				       struct dlm_lock_resource *res,
 				       u8 target);
+static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
+				       struct dlm_lock_resource *res);
 
 
 int dlm_is_host_down(int errno)
@@ -677,6 +679,7 @@
 	struct dlm_node_iter iter;
 	unsigned int namelen;
 	int tries = 0;
+	int bit, wait_on_recovery = 0;
 
 	BUG_ON(!lockid);
 
@@ -762,6 +765,18 @@
 		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
 		set_bit(dlm->node_num, mle->maybe_map);
 		list_add(&mle->list, &dlm->master_list);
+
+		/* still holding the dlm spinlock, check the recovery map
+		 * to see if there are any nodes that still need to be 
+		 * considered.  these will not appear in the mle nodemap
+		 * but they might own this lockres.  wait on them. */
+		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
+		if (bit < O2NM_MAX_NODES) {
+			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
+			     "recover before lock mastery can begin\n",
+			     dlm->name, namelen, (char *)lockid, bit);
+			wait_on_recovery = 1;
+		}
 	}
 
 	/* at this point there is either a DLM_MLE_BLOCK or a
@@ -779,6 +794,39 @@
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->spinlock);
 
+	while (wait_on_recovery) {
+		/* any cluster changes that occurred after dropping the
+		 * dlm spinlock would be detectable be a change on the mle,
+		 * so we only need to clear out the recovery map once. */
+		if (dlm_is_recovery_lock(lockid, namelen)) {
+			mlog(ML_NOTICE, "%s: recovery map is not empty, but "
+			     "must master $RECOVERY lock now\n", dlm->name);
+			if (!dlm_pre_master_reco_lockres(dlm, res))
+				wait_on_recovery = 0;
+			else {
+				mlog(0, "%s: waiting 500ms for heartbeat state "
+				    "change\n", dlm->name);
+				msleep(500);
+			}
+			continue;
+		} 
+
+		dlm_kick_recovery_thread(dlm);
+		msleep(100);
+		dlm_wait_for_recovery(dlm);
+
+		spin_lock(&dlm->spinlock);
+		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
+		if (bit < O2NM_MAX_NODES) {
+			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
+			     "recover before lock mastery can begin\n",
+			     dlm->name, namelen, (char *)lockid, bit);
+			wait_on_recovery = 1;
+		} else
+			wait_on_recovery = 0;
+		spin_unlock(&dlm->spinlock);
+	}
+
 	/* must wait for lock to be mastered elsewhere */
 	if (blocked)
 		goto wait;
@@ -1835,6 +1883,61 @@
 	mlog(0, "finished with dlm_assert_master_worker\n");
 }
 
+/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
+ * We cannot wait for node recovery to complete to begin mastering this
+ * lockres because this lockres is used to kick off recovery! ;-)
+ * So, do a pre-check on all living nodes to see if any of those nodes
+ * think that $RECOVERY is currently mastered by a dead node.  If so,
+ * we wait a short time to allow that node to get notified by its own
+ * heartbeat stack, then check again.  All $RECOVERY lock resources
+ * mastered by dead nodes are purged when the hearbeat callback is 
+ * fired, so we can know for sure that it is safe to continue once
+ * the node returns a live node or no node.  */
+static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
+				       struct dlm_lock_resource *res)
+{
+	struct dlm_node_iter iter;
+	int nodenum;
+	int ret = 0;
+	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
+
+	spin_lock(&dlm->spinlock);
+	dlm_node_iter_init(dlm->domain_map, &iter);
+	spin_unlock(&dlm->spinlock);
+
+	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+		/* do not send to self */
+		if (nodenum == dlm->node_num)
+			continue;
+		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
+		if (ret < 0) {
+			mlog_errno(ret);
+			if (!dlm_is_host_down(ret))
+				BUG();
+			/* host is down, so answer for that node would be
+			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
+		}
+
+		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
+			/* check to see if this master is in the recovery map */
+			spin_lock(&dlm->spinlock);
+			if (test_bit(master, dlm->recovery_map)) {
+				mlog(ML_NOTICE, "%s: node %u has not seen "
+				     "node %u go down yet, and thinks the "
+				     "dead node is mastering the recovery "
+				     "lock.  must wait.\n", dlm->name,
+				     nodenum, master);
+				ret = -EAGAIN;
+			}
+			spin_unlock(&dlm->spinlock);
+			mlog(0, "%s: reco lock master is %u\n", dlm->name, 
+			     master);
+			break;
+		}
+	}
+	return ret;
+}
+
 
 /*
  * DLM_MIGRATE_LOCKRES