ocfs2_dlm: fix cluster-wide refcounting of lock resources

This was previously broken and migration of some locks had to be temporarily
disabled. We use a new (and backward-incompatible) set of network messages
to account for all references to a lock resources held across the cluster.
once these are all freed, the master node may then free the lock resource
memory once its local references are dropped.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index f0b25f2..3995de3 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -125,10 +125,10 @@
 	hlist_add_head(&res->hash_node, bucket);
 }
 
-struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
-						const char *name,
-						unsigned int len,
-						unsigned int hash)
+struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
+						     const char *name,
+						     unsigned int len,
+						     unsigned int hash)
 {
 	struct hlist_head *bucket;
 	struct hlist_node *list;
@@ -154,6 +154,37 @@
 	return NULL;
 }
 
+/* intended to be called by functions which do not care about lock
+ * resources which are being purged (most net _handler functions).
+ * this will return NULL for any lock resource which is found but
+ * currently in the process of dropping its mastery reference.
+ * use __dlm_lookup_lockres_full when you need the lock resource
+ * regardless (e.g. dlm_get_lock_resource) */
+struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
+						const char *name,
+						unsigned int len,
+						unsigned int hash)
+{
+	struct dlm_lock_resource *res = NULL;
+
+	mlog_entry("%.*s\n", len, name);
+
+	assert_spin_locked(&dlm->spinlock);
+
+	res = __dlm_lookup_lockres_full(dlm, name, len, hash);
+	if (res) {
+		spin_lock(&res->spinlock);
+		if (res->state & DLM_LOCK_RES_DROPPING_REF) {
+			spin_unlock(&res->spinlock);
+			dlm_lockres_put(res);
+			return NULL;
+		}
+		spin_unlock(&res->spinlock);
+	}
+
+	return res;
+}
+
 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
 				    const char *name,
 				    unsigned int len)
@@ -330,43 +361,60 @@
 	wake_up(&dlm_domain_events);
 }
 
-static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
+static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
 {
-	int i;
+	int i, num, n, ret = 0;
 	struct dlm_lock_resource *res;
+	struct hlist_node *iter;
+	struct hlist_head *bucket;
+	int dropped;
 
 	mlog(0, "Migrating locks from domain %s\n", dlm->name);
-restart:
+
+	num = 0;
 	spin_lock(&dlm->spinlock);
 	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
-		while (!hlist_empty(dlm_lockres_hash(dlm, i))) {
-			res = hlist_entry(dlm_lockres_hash(dlm, i)->first,
-					  struct dlm_lock_resource, hash_node);
-			/* need reference when manually grabbing lockres */
+redo_bucket:
+		n = 0;
+		bucket = dlm_lockres_hash(dlm, i);
+		iter = bucket->first;
+		while (iter) {
+			n++;
+			res = hlist_entry(iter, struct dlm_lock_resource,
+					  hash_node);
 			dlm_lockres_get(res);
-			/* this should unhash the lockres
-			 * and exit with dlm->spinlock */
-			mlog(0, "purging res=%p\n", res);
-			if (dlm_lockres_is_dirty(dlm, res)) {
-				/* HACK!  this should absolutely go.
-				 * need to figure out why some empty
-				 * lockreses are still marked dirty */
-				mlog(ML_ERROR, "lockres %.*s dirty!\n",
-				     res->lockname.len, res->lockname.name);
+			/* migrate, if necessary.  this will drop the dlm
+			 * spinlock and retake it if it does migration. */
+			dropped = dlm_empty_lockres(dlm, res);
 
-				spin_unlock(&dlm->spinlock);
-				dlm_kick_thread(dlm, res);
-				wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
-				dlm_lockres_put(res);
-				goto restart;
-			}
-			dlm_purge_lockres(dlm, res);
+			spin_lock(&res->spinlock);
+			__dlm_lockres_calc_usage(dlm, res);
+			iter = res->hash_node.next;
+			spin_unlock(&res->spinlock);
+
 			dlm_lockres_put(res);
+
+			cond_resched_lock(&dlm->spinlock);
+
+			if (dropped)
+				goto redo_bucket;
 		}
+		num += n;
+		mlog(0, "%s: touched %d lockreses in bucket %d "
+		     "(tot=%d)\n", dlm->name, n, i, num);
 	}
 	spin_unlock(&dlm->spinlock);
+	wake_up(&dlm->dlm_thread_wq);
 
+	/* let the dlm thread take care of purging, keep scanning until
+	 * nothing remains in the hash */
+	if (num) {
+		mlog(0, "%s: %d lock resources in hash last pass\n",
+		     dlm->name, num);
+		ret = -EAGAIN;
+	}
 	mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
+	return ret;
 }
 
 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
@@ -571,7 +619,9 @@
 		/* We changed dlm state, notify the thread */
 		dlm_kick_thread(dlm, NULL);
 
-		dlm_migrate_all_locks(dlm);
+		while (dlm_migrate_all_locks(dlm)) {
+			mlog(0, "%s: more migration to do\n", dlm->name);
+		}
 		dlm_mark_domain_leaving(dlm);
 		dlm_leave_domain(dlm);
 		dlm_complete_dlm_shutdown(dlm);
@@ -1082,6 +1132,13 @@
 	if (status)
 		goto bail;
 
+	status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
+					sizeof(struct dlm_deref_lockres),
+					dlm_deref_lockres_handler,
+					dlm, &dlm->dlm_domain_handlers);
+	if (status)
+		goto bail;
+
 	status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
 					sizeof(struct dlm_migrate_request),
 					dlm_migrate_request_handler,