Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2

* 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mfasheh/ocfs2: (28 commits)
  ocfs2: Teach ocfs2_drop_lock() to use ->set_lvb() callback
  ocfs2: Remove ->unblock lockres operation
  ocfs2: move downconvert worker to lockres ops
  ocfs2: Remove unused dlmglue functions
  ocfs2: Have the metadata lock use generic dlmglue functions
  ocfs2: Add ->set_lvb callback in dlmglue
  ocfs2: Add ->check_downconvert callback in dlmglue
  ocfs2: Check for refreshing locks in generic unblock function
  ocfs2: don't unconditionally pass LVB flags
  ocfs2: combine inode and generic blocking AST functions
  ocfs2: Add ->get_osb() dlmglue locking operation
  ocfs2: remove ->unlock_ast() callback from ocfs2_lock_res_ops
  ocfs2: combine inode and generic AST functions
  ocfs2: Clean up lock resource refresh flags
  ocfs2: Remove i_generation from inode lock names
  ocfs2: Encode i_generation in the meta data lvb
  ocfs2: Free up some space in the lvb
  ocfs2: Remove special casing for inode creation in ocfs2_dentry_attach_lock()
  ocfs2: manually d_move() during ocfs2_rename()
  [PATCH] Allow file systems to manually d_move() inside of ->rename()
  ...
diff --git a/fs/namei.c b/fs/namei.c
index 432d6bc..6b591c0 100644
--- a/fs/namei.c
+++ b/fs/namei.c
@@ -2370,7 +2370,8 @@
 		dput(new_dentry);
 	}
 	if (!error)
-		d_move(old_dentry,new_dentry);
+		if (!(old_dir->i_sb->s_type->fs_flags & FS_RENAME_DOES_D_MOVE))
+			d_move(old_dentry,new_dentry);
 	return error;
 }
 
@@ -2393,8 +2394,7 @@
 	else
 		error = old_dir->i_op->rename(old_dir, old_dentry, new_dir, new_dentry);
 	if (!error) {
-		/* The following d_move() should become unconditional */
-		if (!(old_dir->i_sb->s_type->fs_flags & FS_ODD_RENAME))
+		if (!(old_dir->i_sb->s_type->fs_flags & FS_RENAME_DOES_D_MOVE))
 			d_move(old_dentry, new_dentry);
 	}
 	if (target)
diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c
index 3419c2d..7432f1a 100644
--- a/fs/nfs/dir.c
+++ b/fs/nfs/dir.c
@@ -1669,8 +1669,7 @@
 	if (rehash)
 		d_rehash(rehash);
 	if (!error) {
-		if (!S_ISDIR(old_inode->i_mode))
-			d_move(old_dentry, new_dentry);
+		d_move(old_dentry, new_dentry);
 		nfs_renew_times(new_dentry);
 		nfs_set_verifier(new_dentry, nfs_save_change_attribute(new_dir));
 	}
diff --git a/fs/nfs/super.c b/fs/nfs/super.c
index b99113b..e8d4003 100644
--- a/fs/nfs/super.c
+++ b/fs/nfs/super.c
@@ -71,7 +71,7 @@
 	.name		= "nfs",
 	.get_sb		= nfs_get_sb,
 	.kill_sb	= nfs_kill_super,
-	.fs_flags	= FS_ODD_RENAME|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
+	.fs_flags	= FS_RENAME_DOES_D_MOVE|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
 };
 
 struct file_system_type nfs_xdev_fs_type = {
@@ -79,7 +79,7 @@
 	.name		= "nfs",
 	.get_sb		= nfs_xdev_get_sb,
 	.kill_sb	= nfs_kill_super,
-	.fs_flags	= FS_ODD_RENAME|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
+	.fs_flags	= FS_RENAME_DOES_D_MOVE|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
 };
 
 static struct super_operations nfs_sops = {
@@ -107,7 +107,7 @@
 	.name		= "nfs4",
 	.get_sb		= nfs4_get_sb,
 	.kill_sb	= nfs4_kill_super,
-	.fs_flags	= FS_ODD_RENAME|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
+	.fs_flags	= FS_RENAME_DOES_D_MOVE|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
 };
 
 struct file_system_type nfs4_xdev_fs_type = {
@@ -115,7 +115,7 @@
 	.name		= "nfs4",
 	.get_sb		= nfs4_xdev_get_sb,
 	.kill_sb	= nfs4_kill_super,
-	.fs_flags	= FS_ODD_RENAME|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
+	.fs_flags	= FS_RENAME_DOES_D_MOVE|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
 };
 
 struct file_system_type nfs4_referral_fs_type = {
@@ -123,7 +123,7 @@
 	.name		= "nfs4",
 	.get_sb		= nfs4_referral_get_sb,
 	.kill_sb	= nfs4_kill_super,
-	.fs_flags	= FS_ODD_RENAME|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
+	.fs_flags	= FS_RENAME_DOES_D_MOVE|FS_REVAL_DOT|FS_BINARY_MOUNTDATA,
 };
 
 static struct super_operations nfs4_sops = {
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index ff9e2e2..4b46aac 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -44,11 +44,17 @@
  * locking semantics of the file system using the protocol.  It should 
  * be somewhere else, I'm sure, but right now it isn't.
  *
+ * New in version 4:
+ * 	- Remove i_generation from lock names for better stat performance.
+ *
+ * New in version 3:
+ * 	- Replace dentry votes with a cluster lock
+ *
  * New in version 2:
  * 	- full 64 bit i_size in the metadata lock lvbs
  * 	- introduction of "rw" lock and pushing meta/data locking down
  */
-#define O2NET_PROTOCOL_VERSION 2ULL
+#define O2NET_PROTOCOL_VERSION 4ULL
 struct o2net_handshake {
 	__be64	protocol_version;
 	__be64	connector_id;
diff --git a/fs/ocfs2/dcache.c b/fs/ocfs2/dcache.c
index 1a01380..014e739 100644
--- a/fs/ocfs2/dcache.c
+++ b/fs/ocfs2/dcache.c
@@ -35,15 +35,17 @@
 
 #include "alloc.h"
 #include "dcache.h"
+#include "dlmglue.h"
 #include "file.h"
 #include "inode.h"
 
+
 static int ocfs2_dentry_revalidate(struct dentry *dentry,
 				   struct nameidata *nd)
 {
 	struct inode *inode = dentry->d_inode;
 	int ret = 0;    /* if all else fails, just return false */
-	struct ocfs2_super *osb;
+	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
 
 	mlog_entry("(0x%p, '%.*s')\n", dentry,
 		   dentry->d_name.len, dentry->d_name.name);
@@ -55,28 +57,31 @@
 		goto bail;
 	}
 
-	osb = OCFS2_SB(inode->i_sb);
-
 	BUG_ON(!osb);
 
-	if (inode != osb->root_inode) {
-		spin_lock(&OCFS2_I(inode)->ip_lock);
-		/* did we or someone else delete this inode? */
-		if (OCFS2_I(inode)->ip_flags & OCFS2_INODE_DELETED) {
-			spin_unlock(&OCFS2_I(inode)->ip_lock);
-			mlog(0, "inode (%llu) deleted, returning false\n",
-			     (unsigned long long)OCFS2_I(inode)->ip_blkno);
-			goto bail;
-		}
-		spin_unlock(&OCFS2_I(inode)->ip_lock);
+	if (inode == osb->root_inode || is_bad_inode(inode))
+		goto bail;
 
-		if (!inode->i_nlink) {
-			mlog(0, "Inode %llu orphaned, returning false "
-			     "dir = %d\n",
-			     (unsigned long long)OCFS2_I(inode)->ip_blkno,
-			     S_ISDIR(inode->i_mode));
-			goto bail;
-		}
+	spin_lock(&OCFS2_I(inode)->ip_lock);
+	/* did we or someone else delete this inode? */
+	if (OCFS2_I(inode)->ip_flags & OCFS2_INODE_DELETED) {
+		spin_unlock(&OCFS2_I(inode)->ip_lock);
+		mlog(0, "inode (%llu) deleted, returning false\n",
+		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
+		goto bail;
+	}
+	spin_unlock(&OCFS2_I(inode)->ip_lock);
+
+	/*
+	 * We don't need a cluster lock to test this because once an
+	 * inode nlink hits zero, it never goes back.
+	 */
+	if (inode->i_nlink == 0) {
+		mlog(0, "Inode %llu orphaned, returning false "
+		     "dir = %d\n",
+		     (unsigned long long)OCFS2_I(inode)->ip_blkno,
+		     S_ISDIR(inode->i_mode));
+		goto bail;
 	}
 
 	ret = 1;
@@ -87,6 +92,322 @@
 	return ret;
 }
 
+static int ocfs2_match_dentry(struct dentry *dentry,
+			      u64 parent_blkno,
+			      int skip_unhashed)
+{
+	struct inode *parent;
+
+	/*
+	 * ocfs2_lookup() does a d_splice_alias() _before_ attaching
+	 * to the lock data, so we skip those here, otherwise
+	 * ocfs2_dentry_attach_lock() will get its original dentry
+	 * back.
+	 */
+	if (!dentry->d_fsdata)
+		return 0;
+
+	if (!dentry->d_parent)
+		return 0;
+
+	if (skip_unhashed && d_unhashed(dentry))
+		return 0;
+
+	parent = dentry->d_parent->d_inode;
+	/* Negative parent dentry? */
+	if (!parent)
+		return 0;
+
+	/* Name is in a different directory. */
+	if (OCFS2_I(parent)->ip_blkno != parent_blkno)
+		return 0;
+
+	return 1;
+}
+
+/*
+ * Walk the inode alias list, and find a dentry which has a given
+ * parent. ocfs2_dentry_attach_lock() wants to find _any_ alias as it
+ * is looking for a dentry_lock reference. The vote thread is looking
+ * to unhash aliases, so we allow it to skip any that already have
+ * that property.
+ */
+struct dentry *ocfs2_find_local_alias(struct inode *inode,
+				      u64 parent_blkno,
+				      int skip_unhashed)
+{
+	struct list_head *p;
+	struct dentry *dentry = NULL;
+
+	spin_lock(&dcache_lock);
+
+	list_for_each(p, &inode->i_dentry) {
+		dentry = list_entry(p, struct dentry, d_alias);
+
+		if (ocfs2_match_dentry(dentry, parent_blkno, skip_unhashed)) {
+			mlog(0, "dentry found: %.*s\n",
+			     dentry->d_name.len, dentry->d_name.name);
+
+			dget_locked(dentry);
+			break;
+		}
+
+		dentry = NULL;
+	}
+
+	spin_unlock(&dcache_lock);
+
+	return dentry;
+}
+
+DEFINE_SPINLOCK(dentry_attach_lock);
+
+/*
+ * Attach this dentry to a cluster lock.
+ *
+ * Dentry locks cover all links in a given directory to a particular
+ * inode. We do this so that ocfs2 can build a lock name which all
+ * nodes in the cluster can agree on at all times. Shoving full names
+ * in the cluster lock won't work due to size restrictions. Covering
+ * links inside of a directory is a good compromise because it still
+ * allows us to use the parent directory lock to synchronize
+ * operations.
+ *
+ * Call this function with the parent dir semaphore and the parent dir
+ * cluster lock held.
+ *
+ * The dir semaphore will protect us from having to worry about
+ * concurrent processes on our node trying to attach a lock at the
+ * same time.
+ *
+ * The dir cluster lock (held at either PR or EX mode) protects us
+ * from unlink and rename on other nodes.
+ *
+ * A dput() can happen asynchronously due to pruning, so we cover
+ * attaching and detaching the dentry lock with a
+ * dentry_attach_lock.
+ *
+ * A node which has done lookup on a name retains a protected read
+ * lock until final dput. If the user requests and unlink or rename,
+ * the protected read is upgraded to an exclusive lock. Other nodes
+ * who have seen the dentry will then be informed that they need to
+ * downgrade their lock, which will involve d_delete on the
+ * dentry. This happens in ocfs2_dentry_convert_worker().
+ */
+int ocfs2_dentry_attach_lock(struct dentry *dentry,
+			     struct inode *inode,
+			     u64 parent_blkno)
+{
+	int ret;
+	struct dentry *alias;
+	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+
+	mlog(0, "Attach \"%.*s\", parent %llu, fsdata: %p\n",
+	     dentry->d_name.len, dentry->d_name.name,
+	     (unsigned long long)parent_blkno, dl);
+
+	/*
+	 * Negative dentry. We ignore these for now.
+	 *
+	 * XXX: Could we can improve ocfs2_dentry_revalidate() by
+	 * tracking these?
+	 */
+	if (!inode)
+		return 0;
+
+	if (dl) {
+		mlog_bug_on_msg(dl->dl_parent_blkno != parent_blkno,
+				" \"%.*s\": old parent: %llu, new: %llu\n",
+				dentry->d_name.len, dentry->d_name.name,
+				(unsigned long long)parent_blkno,
+				(unsigned long long)dl->dl_parent_blkno);
+		return 0;
+	}
+
+	alias = ocfs2_find_local_alias(inode, parent_blkno, 0);
+	if (alias) {
+		/*
+		 * Great, an alias exists, which means we must have a
+		 * dentry lock already. We can just grab the lock off
+		 * the alias and add it to the list.
+		 *
+		 * We're depending here on the fact that this dentry
+		 * was found and exists in the dcache and so must have
+		 * a reference to the dentry_lock because we can't
+		 * race creates. Final dput() cannot happen on it
+		 * since we have it pinned, so our reference is safe.
+		 */
+		dl = alias->d_fsdata;
+		mlog_bug_on_msg(!dl, "parent %llu, ino %llu\n",
+				(unsigned long long)parent_blkno,
+				(unsigned long long)OCFS2_I(inode)->ip_blkno);
+
+		mlog_bug_on_msg(dl->dl_parent_blkno != parent_blkno,
+				" \"%.*s\": old parent: %llu, new: %llu\n",
+				dentry->d_name.len, dentry->d_name.name,
+				(unsigned long long)parent_blkno,
+				(unsigned long long)dl->dl_parent_blkno);
+
+		mlog(0, "Found: %s\n", dl->dl_lockres.l_name);
+
+		goto out_attach;
+	}
+
+	/*
+	 * There are no other aliases
+	 */
+	dl = kmalloc(sizeof(*dl), GFP_NOFS);
+	if (!dl) {
+		ret = -ENOMEM;
+		mlog_errno(ret);
+		return ret;
+	}
+
+	dl->dl_count = 0;
+	/*
+	 * Does this have to happen below, for all attaches, in case
+	 * the struct inode gets blown away by votes?
+	 */
+	dl->dl_inode = igrab(inode);
+	dl->dl_parent_blkno = parent_blkno;
+	ocfs2_dentry_lock_res_init(dl, parent_blkno, inode);
+
+out_attach:
+	spin_lock(&dentry_attach_lock);
+	dentry->d_fsdata = dl;
+	dl->dl_count++;
+	spin_unlock(&dentry_attach_lock);
+
+	/*
+	 * This actually gets us our PRMODE level lock. From now on,
+	 * we'll have a notification if one of these names is
+	 * destroyed on another node.
+	 */
+	ret = ocfs2_dentry_lock(dentry, 0);
+	if (!ret)
+		ocfs2_dentry_unlock(dentry, 0);
+	else
+		mlog_errno(ret);
+
+	dput(alias);
+
+	return ret;
+}
+
+/*
+ * ocfs2_dentry_iput() and friends.
+ *
+ * At this point, our particular dentry is detached from the inodes
+ * alias list, so there's no way that the locking code can find it.
+ *
+ * The interesting stuff happens when we determine that our lock needs
+ * to go away because this is the last subdir alias in the
+ * system. This function needs to handle a couple things:
+ *
+ * 1) Synchronizing lock shutdown with the downconvert threads. This
+ *    is already handled for us via the lockres release drop function
+ *    called in ocfs2_release_dentry_lock()
+ *
+ * 2) A race may occur when we're doing our lock shutdown and
+ *    another process wants to create a new dentry lock. Right now we
+ *    let them race, which means that for a very short while, this
+ *    node might have two locks on a lock resource. This should be a
+ *    problem though because one of them is in the process of being
+ *    thrown out.
+ */
+static void ocfs2_drop_dentry_lock(struct ocfs2_super *osb,
+				   struct ocfs2_dentry_lock *dl)
+{
+	ocfs2_simple_drop_lockres(osb, &dl->dl_lockres);
+	ocfs2_lock_res_free(&dl->dl_lockres);
+	iput(dl->dl_inode);
+	kfree(dl);
+}
+
+void ocfs2_dentry_lock_put(struct ocfs2_super *osb,
+			   struct ocfs2_dentry_lock *dl)
+{
+	int unlock = 0;
+
+	BUG_ON(dl->dl_count == 0);
+
+	spin_lock(&dentry_attach_lock);
+	dl->dl_count--;
+	unlock = !dl->dl_count;
+	spin_unlock(&dentry_attach_lock);
+
+	if (unlock)
+		ocfs2_drop_dentry_lock(osb, dl);
+}
+
+static void ocfs2_dentry_iput(struct dentry *dentry, struct inode *inode)
+{
+	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+
+	mlog_bug_on_msg(!dl && !(dentry->d_flags & DCACHE_DISCONNECTED),
+			"dentry: %.*s\n", dentry->d_name.len,
+			dentry->d_name.name);
+
+	if (!dl)
+		goto out;
+
+	mlog_bug_on_msg(dl->dl_count == 0, "dentry: %.*s, count: %u\n",
+			dentry->d_name.len, dentry->d_name.name,
+			dl->dl_count);
+
+	ocfs2_dentry_lock_put(OCFS2_SB(dentry->d_sb), dl);
+
+out:
+	iput(inode);
+}
+
+/*
+ * d_move(), but keep the locks in sync.
+ *
+ * When we are done, "dentry" will have the parent dir and name of
+ * "target", which will be thrown away.
+ *
+ * We manually update the lock of "dentry" if need be.
+ *
+ * "target" doesn't have it's dentry lock touched - we allow the later
+ * dput() to handle this for us.
+ *
+ * This is called during ocfs2_rename(), while holding parent
+ * directory locks. The dentries have already been deleted on other
+ * nodes via ocfs2_remote_dentry_delete().
+ *
+ * Normally, the VFS handles the d_move() for the file sytem, after
+ * the ->rename() callback. OCFS2 wants to handle this internally, so
+ * the new lock can be created atomically with respect to the cluster.
+ */
+void ocfs2_dentry_move(struct dentry *dentry, struct dentry *target,
+		       struct inode *old_dir, struct inode *new_dir)
+{
+	int ret;
+	struct ocfs2_super *osb = OCFS2_SB(old_dir->i_sb);
+	struct inode *inode = dentry->d_inode;
+
+	/*
+	 * Move within the same directory, so the actual lock info won't
+	 * change.
+	 *
+	 * XXX: Is there any advantage to dropping the lock here?
+	 */
+	if (old_dir == new_dir)
+		goto out_move;
+
+	ocfs2_dentry_lock_put(osb, dentry->d_fsdata);
+
+	dentry->d_fsdata = NULL;
+	ret = ocfs2_dentry_attach_lock(dentry, inode, OCFS2_I(new_dir)->ip_blkno);
+	if (ret)
+		mlog_errno(ret);
+
+out_move:
+	d_move(dentry, target);
+}
+
 struct dentry_operations ocfs2_dentry_ops = {
 	.d_revalidate		= ocfs2_dentry_revalidate,
+	.d_iput			= ocfs2_dentry_iput,
 };
diff --git a/fs/ocfs2/dcache.h b/fs/ocfs2/dcache.h
index 9007277..c091c34 100644
--- a/fs/ocfs2/dcache.h
+++ b/fs/ocfs2/dcache.h
@@ -28,4 +28,31 @@
 
 extern struct dentry_operations ocfs2_dentry_ops;
 
+struct ocfs2_dentry_lock {
+	unsigned int		dl_count;
+	u64			dl_parent_blkno;
+
+	/*
+	 * The ocfs2_dentry_lock keeps an inode reference until
+	 * dl_lockres has been destroyed. This is usually done in
+	 * ->d_iput() anyway, so there should be minimal impact.
+	 */
+	struct inode		*dl_inode;
+	struct ocfs2_lock_res	dl_lockres;
+};
+
+int ocfs2_dentry_attach_lock(struct dentry *dentry, struct inode *inode,
+			     u64 parent_blkno);
+
+void ocfs2_dentry_lock_put(struct ocfs2_super *osb,
+			   struct ocfs2_dentry_lock *dl);
+
+struct dentry *ocfs2_find_local_alias(struct inode *inode, u64 parent_blkno,
+				      int skip_unhashed);
+
+void ocfs2_dentry_move(struct dentry *dentry, struct dentry *target,
+		       struct inode *old_dir, struct inode *new_dir);
+
+extern spinlock_t dentry_attach_lock;
+
 #endif /* OCFS2_DCACHE_H */
diff --git a/fs/ocfs2/dlm/dlmapi.h b/fs/ocfs2/dlm/dlmapi.h
index 53652f5..cfd5cb65 100644
--- a/fs/ocfs2/dlm/dlmapi.h
+++ b/fs/ocfs2/dlm/dlmapi.h
@@ -182,6 +182,7 @@
 			struct dlm_lockstatus *lksb,
 			int flags,
 			const char *name,
+			int namelen,
 			dlm_astlockfunc_t *ast,
 			void *data,
 			dlm_bastlockfunc_t *bast);
diff --git a/fs/ocfs2/dlm/dlmast.c b/fs/ocfs2/dlm/dlmast.c
index f13a4ba..681046d 100644
--- a/fs/ocfs2/dlm/dlmast.c
+++ b/fs/ocfs2/dlm/dlmast.c
@@ -320,8 +320,8 @@
 
 	res = dlm_lookup_lockres(dlm, name, locklen);
 	if (!res) {
-		mlog(ML_ERROR, "got %sast for unknown lockres! "
-			       "cookie=%u:%llu, name=%.*s, namelen=%u\n",
+		mlog(0, "got %sast for unknown lockres! "
+		     "cookie=%u:%llu, name=%.*s, namelen=%u\n",
 		     past->type == DLM_AST ? "" : "b",
 		     dlm_get_lock_cookie_node(cookie),
 		     dlm_get_lock_cookie_seq(cookie),
@@ -462,7 +462,7 @@
 			mlog(ML_ERROR, "sent AST to node %u, it returned "
 			     "DLM_MIGRATING!\n", lock->ml.node);
 			BUG();
-		} else if (status != DLM_NORMAL) {
+		} else if (status != DLM_NORMAL && status != DLM_IVLOCKID) {
 			mlog(ML_ERROR, "AST to node %u returned %d!\n",
 			     lock->ml.node, status);
 			/* ignore it */
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 14530ee..fa96818 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -747,6 +747,7 @@
 			      u8 owner);
 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
 						 const char *lockid,
+						 int namelen,
 						 int flags);
 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
 					  const char *name,
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 5ca57ec..42a1b91 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -540,8 +540,8 @@
 
 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
 			struct dlm_lockstatus *lksb, int flags,
-			const char *name, dlm_astlockfunc_t *ast, void *data,
-			dlm_bastlockfunc_t *bast)
+			const char *name, int namelen, dlm_astlockfunc_t *ast,
+			void *data, dlm_bastlockfunc_t *bast)
 {
 	enum dlm_status status;
 	struct dlm_lock_resource *res = NULL;
@@ -571,7 +571,7 @@
 	recovery = (flags & LKM_RECOVERY);
 
 	if (recovery &&
-	    (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) {
+	    (!dlm_is_recovery_lock(name, namelen) || convert) ) {
 		dlm_error(status);
 		goto error;
 	}
@@ -643,7 +643,7 @@
 		}
 
 		status = DLM_IVBUFLEN;
-		if (strlen(name) > DLM_LOCKID_NAME_MAX || strlen(name) < 1) {
+		if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
 			dlm_error(status);
 			goto error;
 		}
@@ -659,7 +659,7 @@
 			dlm_wait_for_recovery(dlm);
 
 		/* find or create the lock resource */
-		res = dlm_get_lock_resource(dlm, name, flags);
+		res = dlm_get_lock_resource(dlm, name, namelen, flags);
 		if (!res) {
 			status = DLM_IVLOCKID;
 			dlm_error(status);
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 9503240..f784177 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -740,6 +740,7 @@
  */
 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
 					  const char *lockid,
+					  int namelen,
 					  int flags)
 {
 	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
@@ -748,13 +749,12 @@
 	int blocked = 0;
 	int ret, nodenum;
 	struct dlm_node_iter iter;
-	unsigned int namelen, hash;
+	unsigned int hash;
 	int tries = 0;
 	int bit, wait_on_recovery = 0;
 
 	BUG_ON(!lockid);
 
-	namelen = strlen(lockid);
 	hash = dlm_lockid_hash(lockid, namelen);
 
 	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 594745f..9d950d7 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -2285,7 +2285,8 @@
 	memset(&lksb, 0, sizeof(lksb));
 
 	ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
-		      DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
+		      DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN,
+		      dlm_reco_ast, dlm, dlm_reco_bast);
 
 	mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
 	     dlm->name, ret, lksb.status);
diff --git a/fs/ocfs2/dlm/userdlm.c b/fs/ocfs2/dlm/userdlm.c
index e641b08..eead48b 100644
--- a/fs/ocfs2/dlm/userdlm.c
+++ b/fs/ocfs2/dlm/userdlm.c
@@ -102,10 +102,10 @@
 	spin_unlock(&lockres->l_lock);
 }
 
-#define user_log_dlm_error(_func, _stat, _lockres) do {		\
-	mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "	\
-		"resource %s: %s\n", dlm_errname(_stat), _func,	\
-		_lockres->l_name, dlm_errmsg(_stat));		\
+#define user_log_dlm_error(_func, _stat, _lockres) do {			\
+	mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "		\
+		"resource %.*s: %s\n", dlm_errname(_stat), _func,	\
+		_lockres->l_namelen, _lockres->l_name, dlm_errmsg(_stat)); \
 } while (0)
 
 /* WARNING: This function lives in a world where the only three lock
@@ -127,21 +127,22 @@
 	struct user_lock_res *lockres = opaque;
 	struct dlm_lockstatus *lksb;
 
-	mlog(0, "AST fired for lockres %s\n", lockres->l_name);
+	mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen,
+	     lockres->l_name);
 
 	spin_lock(&lockres->l_lock);
 
 	lksb = &(lockres->l_lksb);
 	if (lksb->status != DLM_NORMAL) {
-		mlog(ML_ERROR, "lksb status value of %u on lockres %s\n",
-		     lksb->status, lockres->l_name);
+		mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n",
+		     lksb->status, lockres->l_namelen, lockres->l_name);
 		spin_unlock(&lockres->l_lock);
 		return;
 	}
 
 	mlog_bug_on_msg(lockres->l_requested == LKM_IVMODE,
-			"Lockres %s, requested ivmode. flags 0x%x\n",
-			lockres->l_name, lockres->l_flags);
+			"Lockres %.*s, requested ivmode. flags 0x%x\n",
+			lockres->l_namelen, lockres->l_name, lockres->l_flags);
 
 	/* we're downconverting. */
 	if (lockres->l_requested < lockres->l_level) {
@@ -213,8 +214,8 @@
 {
 	struct user_lock_res *lockres = opaque;
 
-	mlog(0, "Blocking AST fired for lockres %s. Blocking level %d\n",
-		lockres->l_name, level);
+	mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n",
+	     lockres->l_namelen, lockres->l_name, level);
 
 	spin_lock(&lockres->l_lock);
 	lockres->l_flags |= USER_LOCK_BLOCKED;
@@ -231,7 +232,8 @@
 {
 	struct user_lock_res *lockres = opaque;
 
-	mlog(0, "UNLOCK AST called on lock %s\n", lockres->l_name);
+	mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen,
+	     lockres->l_name);
 
 	if (status != DLM_NORMAL && status != DLM_CANCELGRANT)
 		mlog(ML_ERROR, "Dlm returns status %d\n", status);
@@ -244,8 +246,6 @@
 	    && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) {
 		lockres->l_level = LKM_IVMODE;
 	} else if (status == DLM_CANCELGRANT) {
-		mlog(0, "Lock %s, cancel fails, flags 0x%x\n",
-		     lockres->l_name, lockres->l_flags);
 		/* We tried to cancel a convert request, but it was
 		 * already granted. Don't clear the busy flag - the
 		 * ast should've done this already. */
@@ -255,8 +255,6 @@
 	} else {
 		BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL));
 		/* Cancel succeeded, we want to re-queue */
-		mlog(0, "Lock %s, cancel succeeds, flags 0x%x\n",
-		     lockres->l_name, lockres->l_flags);
 		lockres->l_requested = LKM_IVMODE; /* cancel an
 						    * upconvert
 						    * request. */
@@ -287,13 +285,14 @@
 	struct user_lock_res *lockres = (struct user_lock_res *) opaque;
 	struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
 
-	mlog(0, "processing lockres %s\n", lockres->l_name);
+	mlog(0, "processing lockres %.*s\n", lockres->l_namelen,
+	     lockres->l_name);
 
 	spin_lock(&lockres->l_lock);
 
 	mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED),
-			"Lockres %s, flags 0x%x\n",
-			lockres->l_name, lockres->l_flags);
+			"Lockres %.*s, flags 0x%x\n",
+			lockres->l_namelen, lockres->l_name, lockres->l_flags);
 
 	/* notice that we don't clear USER_LOCK_BLOCKED here. If it's
 	 * set, we want user_ast clear it. */
@@ -305,22 +304,16 @@
 	 * flag, and finally we might get another bast which re-queues
 	 * us before our ast for the downconvert is called. */
 	if (!(lockres->l_flags & USER_LOCK_BLOCKED)) {
-		mlog(0, "Lockres %s, flags 0x%x: queued but not blocking\n",
-			lockres->l_name, lockres->l_flags);
 		spin_unlock(&lockres->l_lock);
 		goto drop_ref;
 	}
 
 	if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
-		mlog(0, "lock is in teardown so we do nothing\n");
 		spin_unlock(&lockres->l_lock);
 		goto drop_ref;
 	}
 
 	if (lockres->l_flags & USER_LOCK_BUSY) {
-		mlog(0, "Cancel lock %s, flags 0x%x\n",
-		     lockres->l_name, lockres->l_flags);
-
 		if (lockres->l_flags & USER_LOCK_IN_CANCEL) {
 			spin_unlock(&lockres->l_lock);
 			goto drop_ref;
@@ -372,6 +365,7 @@
 			 &lockres->l_lksb,
 			 LKM_CONVERT|LKM_VALBLK,
 			 lockres->l_name,
+			 lockres->l_namelen,
 			 user_ast,
 			 lockres,
 			 user_bast);
@@ -420,16 +414,16 @@
 
 	if (level != LKM_EXMODE &&
 	    level != LKM_PRMODE) {
-		mlog(ML_ERROR, "lockres %s: invalid request!\n",
-		     lockres->l_name);
+		mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
+		     lockres->l_namelen, lockres->l_name);
 		status = -EINVAL;
 		goto bail;
 	}
 
-	mlog(0, "lockres %s: asking for %s lock, passed flags = 0x%x\n",
-		lockres->l_name,
-		(level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE",
-		lkm_flags);
+	mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n",
+	     lockres->l_namelen, lockres->l_name,
+	     (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE",
+	     lkm_flags);
 
 again:
 	if (signal_pending(current)) {
@@ -474,15 +468,13 @@
 		BUG_ON(level == LKM_IVMODE);
 		BUG_ON(level == LKM_NLMODE);
 
-		mlog(0, "lock %s, get lock from %d to level = %d\n",
-			lockres->l_name, lockres->l_level, level);
-
 		/* call dlm_lock to upgrade lock now */
 		status = dlmlock(dlm,
 				 level,
 				 &lockres->l_lksb,
 				 local_flags,
 				 lockres->l_name,
+				 lockres->l_namelen,
 				 user_ast,
 				 lockres,
 				 user_bast);
@@ -498,9 +490,6 @@
 			goto bail;
 		}
 
-		mlog(0, "lock %s, successfull return from dlmlock\n",
-			lockres->l_name);
-
 		user_wait_on_busy_lock(lockres);
 		goto again;
 	}
@@ -508,9 +497,6 @@
 	user_dlm_inc_holders(lockres, level);
 	spin_unlock(&lockres->l_lock);
 
-	mlog(0, "lockres %s: Got %s lock!\n", lockres->l_name,
-		(level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE");
-
 	status = 0;
 bail:
 	return status;
@@ -538,13 +524,11 @@
 {
 	if (level != LKM_EXMODE &&
 	    level != LKM_PRMODE) {
-		mlog(ML_ERROR, "lockres %s: invalid request!\n", lockres->l_name);
+		mlog(ML_ERROR, "lockres %.*s: invalid request!\n",
+		     lockres->l_namelen, lockres->l_name);
 		return;
 	}
 
-	mlog(0, "lockres %s: dropping %s lock\n", lockres->l_name,
-		(level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE");
-
 	spin_lock(&lockres->l_lock);
 	user_dlm_dec_holders(lockres, level);
 	__user_dlm_cond_queue_lockres(lockres);
@@ -602,6 +586,7 @@
 	memcpy(lockres->l_name,
 	       dentry->d_name.name,
 	       dentry->d_name.len);
+	lockres->l_namelen = dentry->d_name.len;
 }
 
 int user_dlm_destroy_lock(struct user_lock_res *lockres)
@@ -609,11 +594,10 @@
 	int status = -EBUSY;
 	struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
 
-	mlog(0, "asked to destroy %s\n", lockres->l_name);
+	mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name);
 
 	spin_lock(&lockres->l_lock);
 	if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) {
-		mlog(0, "Lock is already torn down\n");
 		spin_unlock(&lockres->l_lock);
 		return 0;
 	}
@@ -623,8 +607,6 @@
 	while (lockres->l_flags & USER_LOCK_BUSY) {
 		spin_unlock(&lockres->l_lock);
 
-		mlog(0, "lock %s is busy\n", lockres->l_name);
-
 		user_wait_on_busy_lock(lockres);
 
 		spin_lock(&lockres->l_lock);
@@ -632,14 +614,12 @@
 
 	if (lockres->l_ro_holders || lockres->l_ex_holders) {
 		spin_unlock(&lockres->l_lock);
-		mlog(0, "lock %s has holders\n", lockres->l_name);
 		goto bail;
 	}
 
 	status = 0;
 	if (!(lockres->l_flags & USER_LOCK_ATTACHED)) {
 		spin_unlock(&lockres->l_lock);
-		mlog(0, "lock %s is not attached\n", lockres->l_name);
 		goto bail;
 	}
 
@@ -647,7 +627,6 @@
 	lockres->l_flags |= USER_LOCK_BUSY;
 	spin_unlock(&lockres->l_lock);
 
-	mlog(0, "unlocking lockres %s\n", lockres->l_name);
 	status = dlmunlock(dlm,
 			   &lockres->l_lksb,
 			   LKM_VALBLK,
diff --git a/fs/ocfs2/dlm/userdlm.h b/fs/ocfs2/dlm/userdlm.h
index 04178bc..c400e93 100644
--- a/fs/ocfs2/dlm/userdlm.h
+++ b/fs/ocfs2/dlm/userdlm.h
@@ -53,6 +53,7 @@
 
 #define USER_DLM_LOCK_ID_MAX_LEN  32
 	char                     l_name[USER_DLM_LOCK_ID_MAX_LEN];
+	int                      l_namelen;
 	int                      l_level;
 	unsigned int             l_ro_holders;
 	unsigned int             l_ex_holders;
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 151b417..de88706 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -46,6 +46,7 @@
 #include "ocfs2.h"
 
 #include "alloc.h"
+#include "dcache.h"
 #include "dlmglue.h"
 #include "extent_map.h"
 #include "heartbeat.h"
@@ -66,78 +67,161 @@
 	unsigned long		mw_goal;
 };
 
-static void ocfs2_inode_ast_func(void *opaque);
-static void ocfs2_inode_bast_func(void *opaque,
-				  int level);
-static void ocfs2_super_ast_func(void *opaque);
-static void ocfs2_super_bast_func(void *opaque,
-				  int level);
-static void ocfs2_rename_ast_func(void *opaque);
-static void ocfs2_rename_bast_func(void *opaque,
-				   int level);
+static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
+static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
 
-/* so far, all locks have gotten along with the same unlock ast */
-static void ocfs2_unlock_ast_func(void *opaque,
-				  enum dlm_status status);
-static int ocfs2_do_unblock_meta(struct inode *inode,
-				 int *requeue);
-static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
-			      int *requeue);
-static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
-			      int *requeue);
-static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
-			      int *requeue);
-static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
-				  int *requeue);
-typedef void (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
-static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
-				      struct ocfs2_lock_res *lockres,
-				      int *requeue,
-				      ocfs2_convert_worker_t *worker);
-
-struct ocfs2_lock_res_ops {
-	void (*ast)(void *);
-	void (*bast)(void *, int);
-	void (*unlock_ast)(void *, enum dlm_status);
-	int  (*unblock)(struct ocfs2_lock_res *, int *);
+/*
+ * Return value from ->downconvert_worker functions.
+ *
+ * These control the precise actions of ocfs2_unblock_lock()
+ * and ocfs2_process_blocked_lock()
+ *
+ */
+enum ocfs2_unblock_action {
+	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
+	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
+				      * ->post_unlock callback */
+	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
+				      * ->post_unlock() callback. */
 };
 
+struct ocfs2_unblock_ctl {
+	int requeue;
+	enum ocfs2_unblock_action unblock_action;
+};
+
+static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
+					int new_level);
+static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
+
+static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
+				     int blocking);
+
+static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
+				       int blocking);
+
+static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
+				     struct ocfs2_lock_res *lockres);
+
+/*
+ * OCFS2 Lock Resource Operations
+ *
+ * These fine tune the behavior of the generic dlmglue locking infrastructure.
+ *
+ * The most basic of lock types can point ->l_priv to their respective
+ * struct ocfs2_super and allow the default actions to manage things.
+ *
+ * Right now, each lock type also needs to implement an init function,
+ * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
+ * should be called when the lock is no longer needed (i.e., object
+ * destruction time).
+ */
+struct ocfs2_lock_res_ops {
+	/*
+	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
+	 * this callback if ->l_priv is not an ocfs2_super pointer
+	 */
+	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
+
+	/*
+	 * Optionally called in the downconvert (or "vote") thread
+	 * after a successful downconvert. The lockres will not be
+	 * referenced after this callback is called, so it is safe to
+	 * free memory, etc.
+	 *
+	 * The exact semantics of when this is called are controlled
+	 * by ->downconvert_worker()
+	 */
+	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
+
+	/*
+	 * Allow a lock type to add checks to determine whether it is
+	 * safe to downconvert a lock. Return 0 to re-queue the
+	 * downconvert at a later time, nonzero to continue.
+	 *
+	 * For most locks, the default checks that there are no
+	 * incompatible holders are sufficient.
+	 *
+	 * Called with the lockres spinlock held.
+	 */
+	int (*check_downconvert)(struct ocfs2_lock_res *, int);
+
+	/*
+	 * Allows a lock type to populate the lock value block. This
+	 * is called on downconvert, and when we drop a lock.
+	 *
+	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
+	 * in the flags field.
+	 *
+	 * Called with the lockres spinlock held.
+	 */
+	void (*set_lvb)(struct ocfs2_lock_res *);
+
+	/*
+	 * Called from the downconvert thread when it is determined
+	 * that a lock will be downconverted. This is called without
+	 * any locks held so the function can do work that might
+	 * schedule (syncing out data, etc).
+	 *
+	 * This should return any one of the ocfs2_unblock_action
+	 * values, depending on what it wants the thread to do.
+	 */
+	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
+
+	/*
+	 * LOCK_TYPE_* flags which describe the specific requirements
+	 * of a lock type. Descriptions of each individual flag follow.
+	 */
+	int flags;
+};
+
+/*
+ * Some locks want to "refresh" potentially stale data when a
+ * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
+ * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
+ * individual lockres l_flags member from the ast function. It is
+ * expected that the locking wrapper will clear the
+ * OCFS2_LOCK_NEEDS_REFRESH flag when done.
+ */
+#define LOCK_TYPE_REQUIRES_REFRESH 0x1
+
+/*
+ * Indicate that a lock type makes use of the lock value block. The
+ * ->set_lvb lock type callback must be defined.
+ */
+#define LOCK_TYPE_USES_LVB		0x2
+
 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
-	.ast		= ocfs2_inode_ast_func,
-	.bast		= ocfs2_inode_bast_func,
-	.unlock_ast	= ocfs2_unlock_ast_func,
-	.unblock	= ocfs2_unblock_inode_lock,
+	.get_osb	= ocfs2_get_inode_osb,
+	.flags		= 0,
 };
 
 static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
-	.ast		= ocfs2_inode_ast_func,
-	.bast		= ocfs2_inode_bast_func,
-	.unlock_ast	= ocfs2_unlock_ast_func,
-	.unblock	= ocfs2_unblock_meta,
+	.get_osb	= ocfs2_get_inode_osb,
+	.check_downconvert = ocfs2_check_meta_downconvert,
+	.set_lvb	= ocfs2_set_meta_lvb,
+	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
 };
 
-static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
-				      int blocking);
-
 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
-	.ast		= ocfs2_inode_ast_func,
-	.bast		= ocfs2_inode_bast_func,
-	.unlock_ast	= ocfs2_unlock_ast_func,
-	.unblock	= ocfs2_unblock_data,
+	.get_osb	= ocfs2_get_inode_osb,
+	.downconvert_worker = ocfs2_data_convert_worker,
+	.flags		= 0,
 };
 
 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
-	.ast		= ocfs2_super_ast_func,
-	.bast		= ocfs2_super_bast_func,
-	.unlock_ast	= ocfs2_unlock_ast_func,
-	.unblock	= ocfs2_unblock_osb_lock,
+	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
 };
 
 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
-	.ast		= ocfs2_rename_ast_func,
-	.bast		= ocfs2_rename_bast_func,
-	.unlock_ast	= ocfs2_unlock_ast_func,
-	.unblock	= ocfs2_unblock_osb_lock,
+	.flags		= 0,
+};
+
+static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
+	.get_osb	= ocfs2_get_dentry_osb,
+	.post_unlock	= ocfs2_dentry_post_unlock,
+	.downconvert_worker = ocfs2_dentry_convert_worker,
+	.flags		= 0,
 };
 
 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
@@ -147,24 +231,6 @@
 		lockres->l_type == OCFS2_LOCK_TYPE_RW;
 }
 
-static inline int ocfs2_is_super_lock(struct ocfs2_lock_res *lockres)
-{
-	return lockres->l_type == OCFS2_LOCK_TYPE_SUPER;
-}
-
-static inline int ocfs2_is_rename_lock(struct ocfs2_lock_res *lockres)
-{
-	return lockres->l_type == OCFS2_LOCK_TYPE_RENAME;
-}
-
-static inline struct ocfs2_super *ocfs2_lock_res_super(struct ocfs2_lock_res *lockres)
-{
-	BUG_ON(!ocfs2_is_super_lock(lockres)
-	       && !ocfs2_is_rename_lock(lockres));
-
-	return (struct ocfs2_super *) lockres->l_priv;
-}
-
 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
 {
 	BUG_ON(!ocfs2_is_inode_lock(lockres));
@@ -172,6 +238,21 @@
 	return (struct inode *) lockres->l_priv;
 }
 
+static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
+{
+	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
+
+	return (struct ocfs2_dentry_lock *)lockres->l_priv;
+}
+
+static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
+{
+	if (lockres->l_ops->get_osb)
+		return lockres->l_ops->get_osb(lockres);
+
+	return (struct ocfs2_super *)lockres->l_priv;
+}
+
 static int ocfs2_lock_create(struct ocfs2_super *osb,
 			     struct ocfs2_lock_res *lockres,
 			     int level,
@@ -200,25 +281,6 @@
 				  struct buffer_head **bh);
 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
 static inline int ocfs2_highest_compat_lock_level(int level);
-static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
-						  struct ocfs2_lock_res *lockres,
-						  int new_level);
-
-static char *ocfs2_lock_type_strings[] = {
-	[OCFS2_LOCK_TYPE_META] = "Meta",
-	[OCFS2_LOCK_TYPE_DATA] = "Data",
-	[OCFS2_LOCK_TYPE_SUPER] = "Super",
-	[OCFS2_LOCK_TYPE_RENAME] = "Rename",
-	/* Need to differntiate from [R]ename.. serializing writes is the
-	 * important job it does, anyway. */
-	[OCFS2_LOCK_TYPE_RW] = "Write/Read",
-};
-
-static char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
-{
-	mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type);
-	return ocfs2_lock_type_strings[type];
-}
 
 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
 				  u64 blkno,
@@ -265,13 +327,9 @@
 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
 				       struct ocfs2_lock_res *res,
 				       enum ocfs2_lock_type type,
-				       u64 blkno,
-				       u32 generation,
 				       struct ocfs2_lock_res_ops *ops,
 				       void *priv)
 {
-	ocfs2_build_lock_name(type, blkno, generation, res->l_name);
-
 	res->l_type          = type;
 	res->l_ops           = ops;
 	res->l_priv          = priv;
@@ -299,6 +357,7 @@
 
 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
 			       enum ocfs2_lock_type type,
+			       unsigned int generation,
 			       struct inode *inode)
 {
 	struct ocfs2_lock_res_ops *ops;
@@ -319,9 +378,73 @@
 			break;
 	};
 
-	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type,
-				   OCFS2_I(inode)->ip_blkno,
-				   inode->i_generation, ops, inode);
+	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
+			      generation, res->l_name);
+	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
+}
+
+static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
+{
+	struct inode *inode = ocfs2_lock_res_inode(lockres);
+
+	return OCFS2_SB(inode->i_sb);
+}
+
+static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
+{
+	__be64 inode_blkno_be;
+
+	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
+	       sizeof(__be64));
+
+	return be64_to_cpu(inode_blkno_be);
+}
+
+static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
+{
+	struct ocfs2_dentry_lock *dl = lockres->l_priv;
+
+	return OCFS2_SB(dl->dl_inode->i_sb);
+}
+
+void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
+				u64 parent, struct inode *inode)
+{
+	int len;
+	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
+	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
+	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
+
+	ocfs2_lock_res_init_once(lockres);
+
+	/*
+	 * Unfortunately, the standard lock naming scheme won't work
+	 * here because we have two 16 byte values to use. Instead,
+	 * we'll stuff the inode number as a binary value. We still
+	 * want error prints to show something without garbling the
+	 * display, so drop a null byte in there before the inode
+	 * number. A future version of OCFS2 will likely use all
+	 * binary lock names. The stringified names have been a
+	 * tremendous aid in debugging, but now that the debugfs
+	 * interface exists, we can mangle things there if need be.
+	 *
+	 * NOTE: We also drop the standard "pad" value (the total lock
+	 * name size stays the same though - the last part is all
+	 * zeros due to the memset in ocfs2_lock_res_init_once()
+	 */
+	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
+		       "%c%016llx",
+		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
+		       (long long)parent);
+
+	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
+
+	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
+	       sizeof(__be64));
+
+	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
+				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
+				   dl);
 }
 
 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
@@ -330,8 +453,9 @@
 	/* Superblock lockres doesn't come from a slab so we call init
 	 * once on it manually.  */
 	ocfs2_lock_res_init_once(res);
+	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
+			      0, res->l_name);
 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
-				   OCFS2_SUPER_BLOCK_BLKNO, 0,
 				   &ocfs2_super_lops, osb);
 }
 
@@ -341,7 +465,8 @@
 	/* Rename lockres doesn't come from a slab so we call init
 	 * once on it manually.  */
 	ocfs2_lock_res_init_once(res);
-	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 0, 0,
+	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
+	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
 				   &ocfs2_rename_lops, osb);
 }
 
@@ -495,7 +620,8 @@
 	 * information is already up to data. Convert from NL to
 	 * *anything* however should mark ourselves as needing an
 	 * update */
-	if (lockres->l_level == LKM_NLMODE)
+	if (lockres->l_level == LKM_NLMODE &&
+	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
 	lockres->l_level = lockres->l_requested;
@@ -512,7 +638,8 @@
 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
 
 	if (lockres->l_requested > LKM_NLMODE &&
-	    !(lockres->l_flags & OCFS2_LOCK_LOCAL))
+	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
+	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
 
 	lockres->l_level = lockres->l_requested;
@@ -522,68 +649,6 @@
 	mlog_exit_void();
 }
 
-static void ocfs2_inode_ast_func(void *opaque)
-{
-	struct ocfs2_lock_res *lockres = opaque;
-	struct inode *inode;
-	struct dlm_lockstatus *lksb;
-	unsigned long flags;
-
-	mlog_entry_void();
-
-	inode = ocfs2_lock_res_inode(lockres);
-
-	mlog(0, "AST fired for inode %llu, l_action = %u, type = %s\n",
-	     (unsigned long long)OCFS2_I(inode)->ip_blkno, lockres->l_action,
-	     ocfs2_lock_type_string(lockres->l_type));
-
-	BUG_ON(!ocfs2_is_inode_lock(lockres));
-
-	spin_lock_irqsave(&lockres->l_lock, flags);
-
-	lksb = &(lockres->l_lksb);
-	if (lksb->status != DLM_NORMAL) {
-		mlog(ML_ERROR, "ocfs2_inode_ast_func: lksb status value of %u "
-		     "on inode %llu\n", lksb->status,
-		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
-		spin_unlock_irqrestore(&lockres->l_lock, flags);
-		mlog_exit_void();
-		return;
-	}
-
-	switch(lockres->l_action) {
-	case OCFS2_AST_ATTACH:
-		ocfs2_generic_handle_attach_action(lockres);
-		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
-		break;
-	case OCFS2_AST_CONVERT:
-		ocfs2_generic_handle_convert_action(lockres);
-		break;
-	case OCFS2_AST_DOWNCONVERT:
-		ocfs2_generic_handle_downconvert_action(lockres);
-		break;
-	default:
-		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
-		     "lockres flags = 0x%lx, unlock action: %u\n",
-		     lockres->l_name, lockres->l_action, lockres->l_flags,
-		     lockres->l_unlock_action);
-
-		BUG();
-	}
-
-	/* data and rw locking ignores refresh flag for now. */
-	if (lockres->l_type != OCFS2_LOCK_TYPE_META)
-		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
-
-	/* set it to something invalid so if we get called again we
-	 * can catch it. */
-	lockres->l_action = OCFS2_AST_INVALID;
-	spin_unlock_irqrestore(&lockres->l_lock, flags);
-	wake_up(&lockres->l_event);
-
-	mlog_exit_void();
-}
-
 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
 				     int level)
 {
@@ -610,54 +675,33 @@
 	return needs_downconvert;
 }
 
-static void ocfs2_generic_bast_func(struct ocfs2_super *osb,
-				    struct ocfs2_lock_res *lockres,
-				    int level)
+static void ocfs2_blocking_ast(void *opaque, int level)
 {
+	struct ocfs2_lock_res *lockres = opaque;
+	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
 	int needs_downconvert;
 	unsigned long flags;
 
-	mlog_entry_void();
-
 	BUG_ON(level <= LKM_NLMODE);
 
+	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
+	     lockres->l_name, level, lockres->l_level,
+	     ocfs2_lock_type_string(lockres->l_type));
+
 	spin_lock_irqsave(&lockres->l_lock, flags);
 	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
 	if (needs_downconvert)
 		ocfs2_schedule_blocked_lock(osb, lockres);
 	spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-	ocfs2_kick_vote_thread(osb);
-
 	wake_up(&lockres->l_event);
-	mlog_exit_void();
+
+	ocfs2_kick_vote_thread(osb);
 }
 
-static void ocfs2_inode_bast_func(void *opaque, int level)
+static void ocfs2_locking_ast(void *opaque)
 {
 	struct ocfs2_lock_res *lockres = opaque;
-	struct inode *inode;
-	struct ocfs2_super *osb;
-
-	mlog_entry_void();
-
-	BUG_ON(!ocfs2_is_inode_lock(lockres));
-
-	inode = ocfs2_lock_res_inode(lockres);
-	osb = OCFS2_SB(inode->i_sb);
-
-	mlog(0, "BAST fired for inode %llu, blocking %d, level %d type %s\n",
-	     (unsigned long long)OCFS2_I(inode)->ip_blkno, level,
-	     lockres->l_level, ocfs2_lock_type_string(lockres->l_type));
-
-	ocfs2_generic_bast_func(osb, lockres, level);
-
-	mlog_exit_void();
-}
-
-static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres,
-				   int ignore_refresh)
-{
 	struct dlm_lockstatus *lksb = &lockres->l_lksb;
 	unsigned long flags;
 
@@ -673,6 +717,7 @@
 	switch(lockres->l_action) {
 	case OCFS2_AST_ATTACH:
 		ocfs2_generic_handle_attach_action(lockres);
+		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
 		break;
 	case OCFS2_AST_CONVERT:
 		ocfs2_generic_handle_convert_action(lockres);
@@ -681,80 +726,19 @@
 		ocfs2_generic_handle_downconvert_action(lockres);
 		break;
 	default:
+		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
+		     "lockres flags = 0x%lx, unlock action: %u\n",
+		     lockres->l_name, lockres->l_action, lockres->l_flags,
+		     lockres->l_unlock_action);
 		BUG();
 	}
 
-	if (ignore_refresh)
-		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
-
 	/* set it to something invalid so if we get called again we
 	 * can catch it. */
 	lockres->l_action = OCFS2_AST_INVALID;
-	spin_unlock_irqrestore(&lockres->l_lock, flags);
 
 	wake_up(&lockres->l_event);
-}
-
-static void ocfs2_super_ast_func(void *opaque)
-{
-	struct ocfs2_lock_res *lockres = opaque;
-
-	mlog_entry_void();
-	mlog(0, "Superblock AST fired\n");
-
-	BUG_ON(!ocfs2_is_super_lock(lockres));
-	ocfs2_generic_ast_func(lockres, 0);
-
-	mlog_exit_void();
-}
-
-static void ocfs2_super_bast_func(void *opaque,
-				  int level)
-{
-	struct ocfs2_lock_res *lockres = opaque;
-	struct ocfs2_super *osb;
-
-	mlog_entry_void();
-	mlog(0, "Superblock BAST fired\n");
-
-	BUG_ON(!ocfs2_is_super_lock(lockres));
-       	osb = ocfs2_lock_res_super(lockres);
-	ocfs2_generic_bast_func(osb, lockres, level);
-
-	mlog_exit_void();
-}
-
-static void ocfs2_rename_ast_func(void *opaque)
-{
-	struct ocfs2_lock_res *lockres = opaque;
-
-	mlog_entry_void();
-
-	mlog(0, "Rename AST fired\n");
-
-	BUG_ON(!ocfs2_is_rename_lock(lockres));
-
-	ocfs2_generic_ast_func(lockres, 1);
-
-	mlog_exit_void();
-}
-
-static void ocfs2_rename_bast_func(void *opaque,
-				   int level)
-{
-	struct ocfs2_lock_res *lockres = opaque;
-	struct ocfs2_super *osb;
-
-	mlog_entry_void();
-
-	mlog(0, "Rename BAST fired\n");
-
-	BUG_ON(!ocfs2_is_rename_lock(lockres));
-
-	osb = ocfs2_lock_res_super(lockres);
-	ocfs2_generic_bast_func(osb, lockres, level);
-
-	mlog_exit_void();
+	spin_unlock_irqrestore(&lockres->l_lock, flags);
 }
 
 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
@@ -810,9 +794,10 @@
 			 &lockres->l_lksb,
 			 dlm_flags,
 			 lockres->l_name,
-			 lockres->l_ops->ast,
+			 OCFS2_LOCK_ID_MAX_LEN - 1,
+			 ocfs2_locking_ast,
 			 lockres,
-			 lockres->l_ops->bast);
+			 ocfs2_blocking_ast);
 	if (status != DLM_NORMAL) {
 		ocfs2_log_dlm_error("dlmlock", status, lockres);
 		ret = -EINVAL;
@@ -930,6 +915,9 @@
 
 	ocfs2_init_mask_waiter(&mw);
 
+	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
+		lkm_flags |= LKM_VALBLK;
+
 again:
 	wait = 0;
 
@@ -997,11 +985,12 @@
 		status = dlmlock(osb->dlm,
 				 level,
 				 &lockres->l_lksb,
-				 lkm_flags|LKM_CONVERT|LKM_VALBLK,
+				 lkm_flags|LKM_CONVERT,
 				 lockres->l_name,
-				 lockres->l_ops->ast,
+				 OCFS2_LOCK_ID_MAX_LEN - 1,
+				 ocfs2_locking_ast,
 				 lockres,
-				 lockres->l_ops->bast);
+				 ocfs2_blocking_ast);
 		if (status != DLM_NORMAL) {
 			if ((lkm_flags & LKM_NOQUEUE) &&
 			    (status == DLM_NOTQUEUED))
@@ -1074,18 +1063,21 @@
 	mlog_exit_void();
 }
 
-static int ocfs2_create_new_inode_lock(struct inode *inode,
-				       struct ocfs2_lock_res *lockres)
+int ocfs2_create_new_lock(struct ocfs2_super *osb,
+			  struct ocfs2_lock_res *lockres,
+			  int ex,
+			  int local)
 {
-	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+	int level =  ex ? LKM_EXMODE : LKM_PRMODE;
 	unsigned long flags;
+	int lkm_flags = local ? LKM_LOCAL : 0;
 
 	spin_lock_irqsave(&lockres->l_lock, flags);
 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
 	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
 	spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-	return ocfs2_lock_create(osb, lockres, LKM_EXMODE, LKM_LOCAL);
+	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
 }
 
 /* Grants us an EX lock on the data and metadata resources, skipping
@@ -1097,6 +1089,7 @@
 int ocfs2_create_new_inode_locks(struct inode *inode)
 {
 	int ret;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
 	BUG_ON(!inode);
 	BUG_ON(!ocfs2_inode_is_new(inode));
@@ -1113,22 +1106,23 @@
 	 * on a resource which has an invalid one -- we'll set it
 	 * valid when we release the EX. */
 
-	ret = ocfs2_create_new_inode_lock(inode,
-					  &OCFS2_I(inode)->ip_rw_lockres);
+	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
 	if (ret) {
 		mlog_errno(ret);
 		goto bail;
 	}
 
-	ret = ocfs2_create_new_inode_lock(inode,
-					  &OCFS2_I(inode)->ip_meta_lockres);
+	/*
+	 * We don't want to use LKM_LOCAL on a meta data lock as they
+	 * don't use a generation in their lock names.
+	 */
+	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
 	if (ret) {
 		mlog_errno(ret);
 		goto bail;
 	}
 
-	ret = ocfs2_create_new_inode_lock(inode,
-					  &OCFS2_I(inode)->ip_data_lockres);
+	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
 	if (ret) {
 		mlog_errno(ret);
 		goto bail;
@@ -1317,7 +1311,17 @@
 
 	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
 
-	lvb->lvb_version   = cpu_to_be32(OCFS2_LVB_VERSION);
+	/*
+	 * Invalidate the LVB of a deleted inode - this way other
+	 * nodes are forced to go to disk and discover the new inode
+	 * status.
+	 */
+	if (oi->ip_flags & OCFS2_INODE_DELETED) {
+		lvb->lvb_version = 0;
+		goto out;
+	}
+
+	lvb->lvb_version   = OCFS2_LVB_VERSION;
 	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
 	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
 	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
@@ -1331,7 +1335,9 @@
 	lvb->lvb_imtime_packed =
 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
 	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
+	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
 
+out:
 	mlog_meta_lvb(0, lockres);
 
 	mlog_exit_void();
@@ -1386,11 +1392,13 @@
 	mlog_exit_void();
 }
 
-static inline int ocfs2_meta_lvb_is_trustable(struct ocfs2_lock_res *lockres)
+static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
+					      struct ocfs2_lock_res *lockres)
 {
 	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
 
-	if (be32_to_cpu(lvb->lvb_version) == OCFS2_LVB_VERSION)
+	if (lvb->lvb_version == OCFS2_LVB_VERSION
+	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
 		return 1;
 	return 0;
 }
@@ -1487,7 +1495,7 @@
 	 * map (directories, bitmap files, etc) */
 	ocfs2_extent_map_trunc(inode, 0);
 
-	if (ocfs2_meta_lvb_is_trustable(lockres)) {
+	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
 		mlog(0, "Trusting LVB on inode %llu\n",
 		     (unsigned long long)oi->ip_blkno);
 		ocfs2_refresh_inode_from_lvb(inode);
@@ -1628,6 +1636,18 @@
 		wait_event(osb->recovery_event,
 			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
 
+	/*
+	 * We only see this flag if we're being called from
+	 * ocfs2_read_locked_inode(). It means we're locking an inode
+	 * which hasn't been populated yet, so clear the refresh flag
+	 * and let the caller handle it.
+	 */
+	if (inode->i_state & I_NEW) {
+		status = 0;
+		ocfs2_complete_lock_res_refresh(lockres, 0);
+		goto bail;
+	}
+
 	/* This is fun. The caller may want a bh back, or it may
 	 * not. ocfs2_meta_lock_update definitely wants one in, but
 	 * may or may not read one, depending on what's in the
@@ -1807,6 +1827,34 @@
 	ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
 }
 
+int ocfs2_dentry_lock(struct dentry *dentry, int ex)
+{
+	int ret;
+	int level = ex ? LKM_EXMODE : LKM_PRMODE;
+	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
+
+	BUG_ON(!dl);
+
+	if (ocfs2_is_hard_readonly(osb))
+		return -EROFS;
+
+	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
+	if (ret < 0)
+		mlog_errno(ret);
+
+	return ret;
+}
+
+void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
+{
+	int level = ex ? LKM_EXMODE : LKM_PRMODE;
+	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
+	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
+
+	ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
+}
+
 /* Reference counting of the dlm debug structure. We want this because
  * open references on the debug inodes can live on after a mount, so
  * we can't rely on the ocfs2_super to always exist. */
@@ -1937,9 +1985,16 @@
 	if (!lockres)
 		return -EINVAL;
 
-	seq_printf(m, "0x%x\t"
-		   "%.*s\t"
-		   "%d\t"
+	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
+
+	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
+		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
+			   lockres->l_name,
+			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
+	else
+		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
+
+	seq_printf(m, "%d\t"
 		   "0x%lx\t"
 		   "0x%x\t"
 		   "0x%x\t"
@@ -1947,8 +2002,6 @@
 		   "%u\t"
 		   "%d\t"
 		   "%d\t",
-		   OCFS2_DLM_DEBUG_STR_VERSION,
-		   OCFS2_LOCK_ID_MAX_LEN, lockres->l_name,
 		   lockres->l_level,
 		   lockres->l_flags,
 		   lockres->l_action,
@@ -2138,7 +2191,7 @@
 	mlog_exit_void();
 }
 
-static void ocfs2_unlock_ast_func(void *opaque, enum dlm_status status)
+static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
 {
 	struct ocfs2_lock_res *lockres = opaque;
 	unsigned long flags;
@@ -2194,24 +2247,20 @@
 	mlog_exit_void();
 }
 
-typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
-
-struct drop_lock_cb {
-	ocfs2_pre_drop_cb_t	*drop_func;
-	void			*drop_data;
-};
-
 static int ocfs2_drop_lock(struct ocfs2_super *osb,
-			   struct ocfs2_lock_res *lockres,
-			   struct drop_lock_cb *dcb)
+			   struct ocfs2_lock_res *lockres)
 {
 	enum dlm_status status;
 	unsigned long flags;
+	int lkm_flags = 0;
 
 	/* We didn't get anywhere near actually using this lockres. */
 	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
 		goto out;
 
+	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
+		lkm_flags |= LKM_VALBLK;
+
 	spin_lock_irqsave(&lockres->l_lock, flags);
 
 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
@@ -2234,8 +2283,12 @@
 		spin_lock_irqsave(&lockres->l_lock, flags);
 	}
 
-	if (dcb)
-		dcb->drop_func(lockres, dcb->drop_data);
+	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
+		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
+		    lockres->l_level == LKM_EXMODE &&
+		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
+			lockres->l_ops->set_lvb(lockres);
+	}
 
 	if (lockres->l_flags & OCFS2_LOCK_BUSY)
 		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
@@ -2261,8 +2314,8 @@
 
 	mlog(0, "lock %s\n", lockres->l_name);
 
-	status = dlmunlock(osb->dlm, &lockres->l_lksb, LKM_VALBLK,
-			   lockres->l_ops->unlock_ast, lockres);
+	status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
+			   ocfs2_unlock_ast, lockres);
 	if (status != DLM_NORMAL) {
 		ocfs2_log_dlm_error("dlmunlock", status, lockres);
 		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
@@ -2309,43 +2362,26 @@
 	spin_unlock_irqrestore(&lockres->l_lock, flags);
 }
 
-static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
+void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
+			       struct ocfs2_lock_res *lockres)
 {
-	int status;
+	int ret;
 
-	mlog_entry_void();
-
-	ocfs2_mark_lockres_freeing(&osb->osb_super_lockres);
-
-	status = ocfs2_drop_lock(osb, &osb->osb_super_lockres, NULL);
-	if (status < 0)
-		mlog_errno(status);
-
-	ocfs2_mark_lockres_freeing(&osb->osb_rename_lockres);
-
-	status = ocfs2_drop_lock(osb, &osb->osb_rename_lockres, NULL);
-	if (status < 0)
-		mlog_errno(status);
-
-	mlog_exit(status);
+	ocfs2_mark_lockres_freeing(lockres);
+	ret = ocfs2_drop_lock(osb, lockres);
+	if (ret)
+		mlog_errno(ret);
 }
 
-static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
+static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
 {
-	struct inode *inode = data;
-
-	/* the metadata lock requires a bit more work as we have an
-	 * LVB to worry about. */
-	if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
-	    lockres->l_level == LKM_EXMODE &&
-	    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
-		__ocfs2_stuff_meta_lvb(inode);
+	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
+	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
 }
 
 int ocfs2_drop_inode_locks(struct inode *inode)
 {
 	int status, err;
-	struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
 
 	mlog_entry_void();
 
@@ -2353,24 +2389,21 @@
 	 * ocfs2_clear_inode has done it for us. */
 
 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
-			      &OCFS2_I(inode)->ip_data_lockres,
-			      NULL);
+			      &OCFS2_I(inode)->ip_data_lockres);
 	if (err < 0)
 		mlog_errno(err);
 
 	status = err;
 
 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
-			      &OCFS2_I(inode)->ip_meta_lockres,
-			      &meta_dcb);
+			      &OCFS2_I(inode)->ip_meta_lockres);
 	if (err < 0)
 		mlog_errno(err);
 	if (err < 0 && !status)
 		status = err;
 
 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
-			      &OCFS2_I(inode)->ip_rw_lockres,
-			      NULL);
+			      &OCFS2_I(inode)->ip_rw_lockres);
 	if (err < 0)
 		mlog_errno(err);
 	if (err < 0 && !status)
@@ -2419,9 +2452,10 @@
 			 &lockres->l_lksb,
 			 dlm_flags,
 			 lockres->l_name,
-			 lockres->l_ops->ast,
+			 OCFS2_LOCK_ID_MAX_LEN - 1,
+			 ocfs2_locking_ast,
 			 lockres,
-			 lockres->l_ops->bast);
+			 ocfs2_blocking_ast);
 	if (status != DLM_NORMAL) {
 		ocfs2_log_dlm_error("dlmlock", status, lockres);
 		ret = -EINVAL;
@@ -2480,7 +2514,7 @@
 	status = dlmunlock(osb->dlm,
 			   &lockres->l_lksb,
 			   LKM_CANCEL,
-			   lockres->l_ops->unlock_ast,
+			   ocfs2_unlock_ast,
 			   lockres);
 	if (status != DLM_NORMAL) {
 		ocfs2_log_dlm_error("dlmunlock", status, lockres);
@@ -2494,115 +2528,15 @@
 	return ret;
 }
 
-static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
-						  struct ocfs2_lock_res *lockres,
-						  int new_level)
-{
-	int ret;
-
-	mlog_entry_void();
-
-	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
-
-	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
-		ret = 0;
-		mlog(0, "lockres %s currently being refreshed -- backing "
-		     "off!\n", lockres->l_name);
-	} else if (new_level == LKM_PRMODE)
-		ret = !lockres->l_ex_holders &&
-			ocfs2_inode_fully_checkpointed(inode);
-	else /* Must be NLMODE we're converting to. */
-		ret = !lockres->l_ro_holders && !lockres->l_ex_holders &&
-			ocfs2_inode_fully_checkpointed(inode);
-
-	mlog_exit(ret);
-	return ret;
-}
-
-static int ocfs2_do_unblock_meta(struct inode *inode,
-				 int *requeue)
-{
-	int new_level;
-	int set_lvb = 0;
-	int ret = 0;
-	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
-	unsigned long flags;
-
-	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-
-	mlog_entry_void();
-
-	spin_lock_irqsave(&lockres->l_lock, flags);
-
-	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
-
-	mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level,
-	     lockres->l_blocking);
-
-	BUG_ON(lockres->l_level != LKM_EXMODE &&
-	       lockres->l_level != LKM_PRMODE);
-
-	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
-		*requeue = 1;
-		ret = ocfs2_prepare_cancel_convert(osb, lockres);
-		spin_unlock_irqrestore(&lockres->l_lock, flags);
-		if (ret) {
-			ret = ocfs2_cancel_convert(osb, lockres);
-			if (ret < 0)
-				mlog_errno(ret);
-		}
-		goto leave;
-	}
-
-	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
-
-	mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n",
-	     lockres->l_level, lockres->l_blocking, new_level);
-
-	if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) {
-		if (lockres->l_level == LKM_EXMODE)
-			set_lvb = 1;
-
-		/* If the lock hasn't been refreshed yet (rare), then
-		 * our memory inode values are old and we skip
-		 * stuffing the lvb. There's no need to actually clear
-		 * out the lvb here as it's value is still valid. */
-		if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
-			if (set_lvb)
-				__ocfs2_stuff_meta_lvb(inode);
-		} else
-			mlog(0, "lockres %s: downconverting stale lock!\n",
-			     lockres->l_name);
-
-		mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, "
-		     "l_blocking=%d, new_level=%d\n",
-		     lockres->l_level, lockres->l_blocking, new_level);
-
-		ocfs2_prepare_downconvert(lockres, new_level);
-		spin_unlock_irqrestore(&lockres->l_lock, flags);
-		ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
-		goto leave;
-	}
-	if (!ocfs2_inode_fully_checkpointed(inode))
-		ocfs2_start_checkpoint(osb);
-
-	*requeue = 1;
-	spin_unlock_irqrestore(&lockres->l_lock, flags);
-	ret = 0;
-leave:
-	mlog_exit(ret);
-	return ret;
-}
-
-static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
-				      struct ocfs2_lock_res *lockres,
-				      int *requeue,
-				      ocfs2_convert_worker_t *worker)
+static int ocfs2_unblock_lock(struct ocfs2_super *osb,
+			      struct ocfs2_lock_res *lockres,
+			      struct ocfs2_unblock_ctl *ctl)
 {
 	unsigned long flags;
 	int blocking;
 	int new_level;
 	int ret = 0;
+	int set_lvb = 0;
 
 	mlog_entry_void();
 
@@ -2612,7 +2546,7 @@
 
 recheck:
 	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
-		*requeue = 1;
+		ctl->requeue = 1;
 		ret = ocfs2_prepare_cancel_convert(osb, lockres);
 		spin_unlock_irqrestore(&lockres->l_lock, flags);
 		if (ret) {
@@ -2626,27 +2560,33 @@
 	/* if we're blocking an exclusive and we have *any* holders,
 	 * then requeue. */
 	if ((lockres->l_blocking == LKM_EXMODE)
-	    && (lockres->l_ex_holders || lockres->l_ro_holders)) {
-		spin_unlock_irqrestore(&lockres->l_lock, flags);
-		*requeue = 1;
-		ret = 0;
-		goto leave;
-	}
+	    && (lockres->l_ex_holders || lockres->l_ro_holders))
+		goto leave_requeue;
 
 	/* If it's a PR we're blocking, then only
 	 * requeue if we've got any EX holders */
 	if (lockres->l_blocking == LKM_PRMODE &&
-	    lockres->l_ex_holders) {
-		spin_unlock_irqrestore(&lockres->l_lock, flags);
-		*requeue = 1;
-		ret = 0;
-		goto leave;
-	}
+	    lockres->l_ex_holders)
+		goto leave_requeue;
+
+	/*
+	 * Can we get a lock in this state if the holder counts are
+	 * zero? The meta data unblock code used to check this.
+	 */
+	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
+	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
+		goto leave_requeue;
+
+	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
+
+	if (lockres->l_ops->check_downconvert
+	    && !lockres->l_ops->check_downconvert(lockres, new_level))
+		goto leave_requeue;
 
 	/* If we get here, then we know that there are no more
 	 * incompatible holders (and anyone asking for an incompatible
 	 * lock is blocked). We can now downconvert the lock */
-	if (!worker)
+	if (!lockres->l_ops->downconvert_worker)
 		goto downconvert;
 
 	/* Some lockres types want to do a bit of work before
@@ -2656,7 +2596,10 @@
 	blocking = lockres->l_blocking;
 	spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-	worker(lockres, blocking);
+	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
+
+	if (ctl->unblock_action == UNBLOCK_STOP_POST)
+		goto leave;
 
 	spin_lock_irqsave(&lockres->l_lock, flags);
 	if (blocking != lockres->l_blocking) {
@@ -2666,25 +2609,43 @@
 	}
 
 downconvert:
-	*requeue = 0;
-	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
+	ctl->requeue = 0;
+
+	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
+		if (lockres->l_level == LKM_EXMODE)
+			set_lvb = 1;
+
+		/*
+		 * We only set the lvb if the lock has been fully
+		 * refreshed - otherwise we risk setting stale
+		 * data. Otherwise, there's no need to actually clear
+		 * out the lvb here as it's value is still valid.
+		 */
+		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
+			lockres->l_ops->set_lvb(lockres);
+	}
 
 	ocfs2_prepare_downconvert(lockres, new_level);
 	spin_unlock_irqrestore(&lockres->l_lock, flags);
-	ret = ocfs2_downconvert_lock(osb, lockres, new_level, 0);
+	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
 leave:
 	mlog_exit(ret);
 	return ret;
+
+leave_requeue:
+	spin_unlock_irqrestore(&lockres->l_lock, flags);
+	ctl->requeue = 1;
+
+	mlog_exit(0);
+	return 0;
 }
 
-static void ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
-				      int blocking)
+static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
+				     int blocking)
 {
 	struct inode *inode;
 	struct address_space *mapping;
 
-	mlog_entry_void();
-
        	inode = ocfs2_lock_res_inode(lockres);
 	mapping = inode->i_mapping;
 
@@ -2705,116 +2666,159 @@
 		filemap_fdatawait(mapping);
 	}
 
-	mlog_exit_void();
+	return UNBLOCK_CONTINUE;
 }
 
-int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
-		       int *requeue)
+static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
+					int new_level)
 {
-	int status;
-	struct inode *inode;
-	struct ocfs2_super *osb;
+	struct inode *inode = ocfs2_lock_res_inode(lockres);
+	int checkpointed = ocfs2_inode_fully_checkpointed(inode);
 
-	mlog_entry_void();
+	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
+	BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
 
-	inode = ocfs2_lock_res_inode(lockres);
-	osb = OCFS2_SB(inode->i_sb);
+	if (checkpointed)
+		return 1;
 
-	mlog(0, "unblock inode %llu\n",
-	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
-
-	status = ocfs2_generic_unblock_lock(osb,
-					    lockres,
-					    requeue,
-					    ocfs2_data_convert_worker);
-	if (status < 0)
-		mlog_errno(status);
-
-	mlog(0, "inode %llu, requeue = %d\n",
-	     (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue);
-
-	mlog_exit(status);
-	return status;
+	ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
+	return 0;
 }
 
-static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
-				    int *requeue)
+static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
 {
-	int status;
-	struct inode *inode;
+	struct inode *inode = ocfs2_lock_res_inode(lockres);
 
-	mlog_entry_void();
-
-	mlog(0, "Unblock lockres %s\n", lockres->l_name);
-
-	inode  = ocfs2_lock_res_inode(lockres);
-
-	status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
-					    lockres,
-					    requeue,
-					    NULL);
-	if (status < 0)
-		mlog_errno(status);
-
-	mlog_exit(status);
-	return status;
+	__ocfs2_stuff_meta_lvb(inode);
 }
 
-
-int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
-		       int *requeue)
+/*
+ * Does the final reference drop on our dentry lock. Right now this
+ * happens in the vote thread, but we could choose to simplify the
+ * dlmglue API and push these off to the ocfs2_wq in the future.
+ */
+static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
+				     struct ocfs2_lock_res *lockres)
 {
-	int status;
-	struct inode *inode;
-
-	mlog_entry_void();
-
-       	inode = ocfs2_lock_res_inode(lockres);
-
-	mlog(0, "unblock inode %llu\n",
-	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
-
-	status = ocfs2_do_unblock_meta(inode, requeue);
-	if (status < 0)
-		mlog_errno(status);
-
-	mlog(0, "inode %llu, requeue = %d\n",
-	     (unsigned long long)OCFS2_I(inode)->ip_blkno, *requeue);
-
-	mlog_exit(status);
-	return status;
+	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
+	ocfs2_dentry_lock_put(osb, dl);
 }
 
-/* Generic unblock function for any lockres whose private data is an
- * ocfs2_super pointer. */
-static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
-				  int *requeue)
+/*
+ * d_delete() matching dentries before the lock downconvert.
+ *
+ * At this point, any process waiting to destroy the
+ * dentry_lock due to last ref count is stopped by the
+ * OCFS2_LOCK_QUEUED flag.
+ *
+ * We have two potential problems
+ *
+ * 1) If we do the last reference drop on our dentry_lock (via dput)
+ *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
+ *    the downconvert to finish. Instead we take an elevated
+ *    reference and push the drop until after we've completed our
+ *    unblock processing.
+ *
+ * 2) There might be another process with a final reference,
+ *    waiting on us to finish processing. If this is the case, we
+ *    detect it and exit out - there's no more dentries anyway.
+ */
+static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
+				       int blocking)
 {
-	int status;
-	struct ocfs2_super *osb;
+	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
+	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
+	struct dentry *dentry;
+	unsigned long flags;
+	int extra_ref = 0;
 
-	mlog_entry_void();
+	/*
+	 * This node is blocking another node from getting a read
+	 * lock. This happens when we've renamed within a
+	 * directory. We've forced the other nodes to d_delete(), but
+	 * we never actually dropped our lock because it's still
+	 * valid. The downconvert code will retain a PR for this node,
+	 * so there's no further work to do.
+	 */
+	if (blocking == LKM_PRMODE)
+		return UNBLOCK_CONTINUE;
 
-	mlog(0, "Unblock lockres %s\n", lockres->l_name);
+	/*
+	 * Mark this inode as potentially orphaned. The code in
+	 * ocfs2_delete_inode() will figure out whether it actually
+	 * needs to be freed or not.
+	 */
+	spin_lock(&oi->ip_lock);
+	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
+	spin_unlock(&oi->ip_lock);
 
-	osb = ocfs2_lock_res_super(lockres);
+	/*
+	 * Yuck. We need to make sure however that the check of
+	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
+	 * respect to a reference decrement or the setting of that
+	 * flag.
+	 */
+	spin_lock_irqsave(&lockres->l_lock, flags);
+	spin_lock(&dentry_attach_lock);
+	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
+	    && dl->dl_count) {
+		dl->dl_count++;
+		extra_ref = 1;
+	}
+	spin_unlock(&dentry_attach_lock);
+	spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-	status = ocfs2_generic_unblock_lock(osb,
-					    lockres,
-					    requeue,
-					    NULL);
-	if (status < 0)
-		mlog_errno(status);
+	mlog(0, "extra_ref = %d\n", extra_ref);
 
-	mlog_exit(status);
-	return status;
+	/*
+	 * We have a process waiting on us in ocfs2_dentry_iput(),
+	 * which means we can't have any more outstanding
+	 * aliases. There's no need to do any more work.
+	 */
+	if (!extra_ref)
+		return UNBLOCK_CONTINUE;
+
+	spin_lock(&dentry_attach_lock);
+	while (1) {
+		dentry = ocfs2_find_local_alias(dl->dl_inode,
+						dl->dl_parent_blkno, 1);
+		if (!dentry)
+			break;
+		spin_unlock(&dentry_attach_lock);
+
+		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
+		     dentry->d_name.name);
+
+		/*
+		 * The following dcache calls may do an
+		 * iput(). Normally we don't want that from the
+		 * downconverting thread, but in this case it's ok
+		 * because the requesting node already has an
+		 * exclusive lock on the inode, so it can't be queued
+		 * for a downconvert.
+		 */
+		d_delete(dentry);
+		dput(dentry);
+
+		spin_lock(&dentry_attach_lock);
+	}
+	spin_unlock(&dentry_attach_lock);
+
+	/*
+	 * If we are the last holder of this dentry lock, there is no
+	 * reason to downconvert so skip straight to the unlock.
+	 */
+	if (dl->dl_count == 1)
+		return UNBLOCK_STOP_POST;
+
+	return UNBLOCK_CONTINUE_POST;
 }
 
 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
 				struct ocfs2_lock_res *lockres)
 {
 	int status;
-	int requeue = 0;
+	struct ocfs2_unblock_ctl ctl = {0, 0,};
 	unsigned long flags;
 
 	/* Our reference to the lockres in this function can be
@@ -2825,7 +2829,6 @@
 
 	BUG_ON(!lockres);
 	BUG_ON(!lockres->l_ops);
-	BUG_ON(!lockres->l_ops->unblock);
 
 	mlog(0, "lockres %s blocked.\n", lockres->l_name);
 
@@ -2839,21 +2842,25 @@
 		goto unqueue;
 	spin_unlock_irqrestore(&lockres->l_lock, flags);
 
-	status = lockres->l_ops->unblock(lockres, &requeue);
+	status = ocfs2_unblock_lock(osb, lockres, &ctl);
 	if (status < 0)
 		mlog_errno(status);
 
 	spin_lock_irqsave(&lockres->l_lock, flags);
 unqueue:
-	if (lockres->l_flags & OCFS2_LOCK_FREEING || !requeue) {
+	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
 		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
 	} else
 		ocfs2_schedule_blocked_lock(osb, lockres);
 
 	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
-	     requeue ? "yes" : "no");
+	     ctl.requeue ? "yes" : "no");
 	spin_unlock_irqrestore(&lockres->l_lock, flags);
 
+	if (ctl.unblock_action != UNBLOCK_CONTINUE
+	    && lockres->l_ops->post_unlock)
+		lockres->l_ops->post_unlock(osb, lockres);
+
 	mlog_exit_void();
 }
 
@@ -2896,8 +2903,9 @@
 
 	mlog(level, "LVB information for %s (called from %s:%u):\n",
 	     lockres->l_name, function, line);
-	mlog(level, "version: %u, clusters: %u\n",
-	     be32_to_cpu(lvb->lvb_version), be32_to_cpu(lvb->lvb_iclusters));
+	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
+	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
+	     be32_to_cpu(lvb->lvb_igeneration));
 	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
 	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
 	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
index 243ae86..4a27693 100644
--- a/fs/ocfs2/dlmglue.h
+++ b/fs/ocfs2/dlmglue.h
@@ -27,10 +27,14 @@
 #ifndef DLMGLUE_H
 #define DLMGLUE_H
 
-#define OCFS2_LVB_VERSION 3
+#include "dcache.h"
+
+#define OCFS2_LVB_VERSION 4
 
 struct ocfs2_meta_lvb {
-	__be32       lvb_version;
+	__u8         lvb_version;
+	__u8         lvb_reserved0;
+	__be16       lvb_reserved1;
 	__be32       lvb_iclusters;
 	__be32       lvb_iuid;
 	__be32       lvb_igid;
@@ -41,7 +45,8 @@
 	__be16       lvb_imode;
 	__be16       lvb_inlink;
 	__be32       lvb_iattr;
-	__be32       lvb_reserved[2];
+	__be32       lvb_igeneration;
+	__be32       lvb_reserved2;
 };
 
 /* ocfs2_meta_lock_full() and ocfs2_data_lock_full() 'arg_flags' flags */
@@ -57,9 +62,14 @@
 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res);
 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
 			       enum ocfs2_lock_type type,
+			       unsigned int generation,
 			       struct inode *inode);
+void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
+				u64 parent, struct inode *inode);
 void ocfs2_lock_res_free(struct ocfs2_lock_res *res);
 int ocfs2_create_new_inode_locks(struct inode *inode);
+int ocfs2_create_new_lock(struct ocfs2_super *osb,
+			  struct ocfs2_lock_res *lockres, int ex, int local);
 int ocfs2_drop_inode_locks(struct inode *inode);
 int ocfs2_data_lock_full(struct inode *inode,
 			 int write,
@@ -93,7 +103,12 @@
 			int ex);
 int ocfs2_rename_lock(struct ocfs2_super *osb);
 void ocfs2_rename_unlock(struct ocfs2_super *osb);
+int ocfs2_dentry_lock(struct dentry *dentry, int ex);
+void ocfs2_dentry_unlock(struct dentry *dentry, int ex);
+
 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres);
+void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
+			       struct ocfs2_lock_res *lockres);
 
 /* for the vote thread */
 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
diff --git a/fs/ocfs2/export.c b/fs/ocfs2/export.c
index ec55ab3..fb91089 100644
--- a/fs/ocfs2/export.c
+++ b/fs/ocfs2/export.c
@@ -33,6 +33,7 @@
 
 #include "dir.h"
 #include "dlmglue.h"
+#include "dcache.h"
 #include "export.h"
 #include "inode.h"
 
@@ -57,7 +58,7 @@
 		return ERR_PTR(-ESTALE);
 	}
 
-	inode = ocfs2_iget(OCFS2_SB(sb), handle->ih_blkno);
+	inode = ocfs2_iget(OCFS2_SB(sb), handle->ih_blkno, 0);
 
 	if (IS_ERR(inode)) {
 		mlog_errno(PTR_ERR(inode));
@@ -77,6 +78,7 @@
 		mlog_errno(-ENOMEM);
 		return ERR_PTR(-ENOMEM);
 	}
+	result->d_op = &ocfs2_dentry_ops;
 
 	mlog_exit_ptr(result);
 	return result;
@@ -113,7 +115,7 @@
 		goto bail_unlock;
 	}
 
-	inode = ocfs2_iget(OCFS2_SB(dir->i_sb), blkno);
+	inode = ocfs2_iget(OCFS2_SB(dir->i_sb), blkno, 0);
 	if (IS_ERR(inode)) {
 		mlog(ML_ERROR, "Unable to create inode %llu\n",
 		     (unsigned long long)blkno);
@@ -127,6 +129,8 @@
 		parent = ERR_PTR(-ENOMEM);
 	}
 
+	parent->d_op = &ocfs2_dentry_ops;
+
 bail_unlock:
 	ocfs2_meta_unlock(dir, 0);
 
diff --git a/fs/ocfs2/inode.c b/fs/ocfs2/inode.c
index 7bcf691..69d3db5 100644
--- a/fs/ocfs2/inode.c
+++ b/fs/ocfs2/inode.c
@@ -54,8 +54,6 @@
 
 #include "buffer_head_io.h"
 
-#define OCFS2_FI_FLAG_NOWAIT	0x1
-#define OCFS2_FI_FLAG_DELETE	0x2
 struct ocfs2_find_inode_args
 {
 	u64		fi_blkno;
@@ -109,7 +107,7 @@
 	return ilookup5(osb->sb, args.fi_ino, ocfs2_find_actor, &args);
 }
 
-struct inode *ocfs2_iget(struct ocfs2_super *osb, u64 blkno)
+struct inode *ocfs2_iget(struct ocfs2_super *osb, u64 blkno, int flags)
 {
 	struct inode *inode = NULL;
 	struct super_block *sb = osb->sb;
@@ -127,7 +125,7 @@
 	}
 
 	args.fi_blkno = blkno;
-	args.fi_flags = 0;
+	args.fi_flags = flags;
 	args.fi_ino = ino_from_blkno(sb, blkno);
 
 	inode = iget5_locked(sb, args.fi_ino, ocfs2_find_actor,
@@ -297,15 +295,11 @@
 	OCFS2_I(inode)->ip_orphaned_slot = OCFS2_INVALID_SLOT;
 	OCFS2_I(inode)->ip_attr = le32_to_cpu(fe->i_attr);
 
-	if (create_ino)
-		inode->i_ino = ino_from_blkno(inode->i_sb,
-			       le64_to_cpu(fe->i_blkno));
-
-	mlog(0, "blkno = %llu, ino = %lu, create_ino = %s\n",
-	     (unsigned long long)fe->i_blkno, inode->i_ino, create_ino ? "true" : "false");
-
 	inode->i_nlink = le16_to_cpu(fe->i_links_count);
 
+	if (fe->i_flags & cpu_to_le32(OCFS2_SYSTEM_FL))
+		OCFS2_I(inode)->ip_flags |= OCFS2_INODE_SYSTEM_FILE;
+
 	if (fe->i_flags & cpu_to_le32(OCFS2_LOCAL_ALLOC_FL)) {
 		OCFS2_I(inode)->ip_flags |= OCFS2_INODE_BITMAP;
 		mlog(0, "local alloc inode: i_ino=%lu\n", inode->i_ino);
@@ -343,12 +337,28 @@
 		    break;
 	}
 
+	if (create_ino) {
+		inode->i_ino = ino_from_blkno(inode->i_sb,
+			       le64_to_cpu(fe->i_blkno));
+
+		/*
+		 * If we ever want to create system files from kernel,
+		 * the generation argument to
+		 * ocfs2_inode_lock_res_init() will have to change.
+		 */
+		BUG_ON(fe->i_flags & cpu_to_le32(OCFS2_SYSTEM_FL));
+
+		ocfs2_inode_lock_res_init(&OCFS2_I(inode)->ip_meta_lockres,
+					  OCFS2_LOCK_TYPE_META, 0, inode);
+	}
+
 	ocfs2_inode_lock_res_init(&OCFS2_I(inode)->ip_rw_lockres,
-				  OCFS2_LOCK_TYPE_RW, inode);
-	ocfs2_inode_lock_res_init(&OCFS2_I(inode)->ip_meta_lockres,
-				  OCFS2_LOCK_TYPE_META, inode);
+				  OCFS2_LOCK_TYPE_RW, inode->i_generation,
+				  inode);
+
 	ocfs2_inode_lock_res_init(&OCFS2_I(inode)->ip_data_lockres,
-				  OCFS2_LOCK_TYPE_DATA, inode);
+				  OCFS2_LOCK_TYPE_DATA, inode->i_generation,
+				  inode);
 
 	ocfs2_set_inode_flags(inode);
 	inode->i_flags |= S_NOATIME;
@@ -366,15 +376,15 @@
 	struct ocfs2_super *osb;
 	struct ocfs2_dinode *fe;
 	struct buffer_head *bh = NULL;
-	int status;
-	int sysfile = 0;
+	int status, can_lock;
+	u32 generation = 0;
 
 	mlog_entry("(0x%p, 0x%p)\n", inode, args);
 
 	status = -EINVAL;
 	if (inode == NULL || inode->i_sb == NULL) {
 		mlog(ML_ERROR, "bad inode\n");
-		goto bail;
+		return status;
 	}
 	sb = inode->i_sb;
 	osb = OCFS2_SB(sb);
@@ -382,50 +392,110 @@
 	if (!args) {
 		mlog(ML_ERROR, "bad inode args\n");
 		make_bad_inode(inode);
-		goto bail;
+		return status;
 	}
 
-	/* Read the FE off disk. This is safe because the kernel only
-	 * does one read_inode2 for a new inode, and if it doesn't
-	 * exist yet then nobody can be working on it! */
-	status = ocfs2_read_block(osb, args->fi_blkno, &bh, 0, NULL);
+	/*
+	 * To improve performance of cold-cache inode stats, we take
+	 * the cluster lock here if possible.
+	 *
+	 * Generally, OCFS2 never trusts the contents of an inode
+	 * unless it's holding a cluster lock, so taking it here isn't
+	 * a correctness issue as much as it is a performance
+	 * improvement.
+	 *
+	 * There are three times when taking the lock is not a good idea:
+	 *
+	 * 1) During startup, before we have initialized the DLM.
+	 *
+	 * 2) If we are reading certain system files which never get
+	 *    cluster locks (local alloc, truncate log).
+	 *
+	 * 3) If the process doing the iget() is responsible for
+	 *    orphan dir recovery. We're holding the orphan dir lock and
+	 *    can get into a deadlock with another process on another
+	 *    node in ->delete_inode().
+	 *
+	 * #1 and #2 can be simply solved by never taking the lock
+	 * here for system files (which are the only type we read
+	 * during mount). It's a heavier approach, but our main
+	 * concern is user-accesible files anyway.
+	 *
+	 * #3 works itself out because we'll eventually take the
+	 * cluster lock before trusting anything anyway.
+	 */
+	can_lock = !(args->fi_flags & OCFS2_FI_FLAG_SYSFILE)
+		&& !(args->fi_flags & OCFS2_FI_FLAG_NOLOCK);
+
+	/*
+	 * To maintain backwards compatibility with older versions of
+	 * ocfs2-tools, we still store the generation value for system
+	 * files. The only ones that actually matter to userspace are
+	 * the journals, but it's easier and inexpensive to just flag
+	 * all system files similarly.
+	 */
+	if (args->fi_flags & OCFS2_FI_FLAG_SYSFILE)
+		generation = osb->fs_generation;
+
+	ocfs2_inode_lock_res_init(&OCFS2_I(inode)->ip_meta_lockres,
+				  OCFS2_LOCK_TYPE_META,
+				  generation, inode);
+
+	if (can_lock) {
+		status = ocfs2_meta_lock(inode, NULL, NULL, 0);
+		if (status) {
+			make_bad_inode(inode);
+			mlog_errno(status);
+			return status;
+		}
+	}
+
+	status = ocfs2_read_block(osb, args->fi_blkno, &bh, 0,
+				  can_lock ? inode : NULL);
 	if (status < 0) {
 		mlog_errno(status);
-		make_bad_inode(inode);
 		goto bail;
 	}
 
+	status = -EINVAL;
 	fe = (struct ocfs2_dinode *) bh->b_data;
 	if (!OCFS2_IS_VALID_DINODE(fe)) {
 		mlog(ML_ERROR, "Invalid dinode #%llu: signature = %.*s\n",
 		     (unsigned long long)fe->i_blkno, 7, fe->i_signature);
-		make_bad_inode(inode);
 		goto bail;
 	}
 
-	if (fe->i_flags & cpu_to_le32(OCFS2_SYSTEM_FL))
-		sysfile = 1;
+	/*
+	 * This is a code bug. Right now the caller needs to
+	 * understand whether it is asking for a system file inode or
+	 * not so the proper lock names can be built.
+	 */
+	mlog_bug_on_msg(!!(fe->i_flags & cpu_to_le32(OCFS2_SYSTEM_FL)) !=
+			!!(args->fi_flags & OCFS2_FI_FLAG_SYSFILE),
+			"Inode %llu: system file state is ambigous\n",
+			(unsigned long long)args->fi_blkno);
 
 	if (S_ISCHR(le16_to_cpu(fe->i_mode)) ||
 	    S_ISBLK(le16_to_cpu(fe->i_mode)))
     		inode->i_rdev = huge_decode_dev(le64_to_cpu(fe->id1.dev1.i_rdev));
 
-	status = -EINVAL;
 	if (ocfs2_populate_inode(inode, fe, 0) < 0) {
 		mlog(ML_ERROR, "populate failed! i_blkno=%llu, i_ino=%lu\n",
 		     (unsigned long long)fe->i_blkno, inode->i_ino);
-		make_bad_inode(inode);
 		goto bail;
 	}
 
 	BUG_ON(args->fi_blkno != le64_to_cpu(fe->i_blkno));
 
-	if (sysfile)
-	       OCFS2_I(inode)->ip_flags |= OCFS2_INODE_SYSTEM_FILE;
-
 	status = 0;
 
 bail:
+	if (can_lock)
+		ocfs2_meta_unlock(inode, 0);
+
+	if (status < 0)
+		make_bad_inode(inode);
+
 	if (args && bh)
 		brelse(bh);
 
@@ -898,9 +968,15 @@
 		goto bail_unlock_inode;
 	}
 
-	/* Mark the inode as successfully deleted. This is important
-	 * for ocfs2_clear_inode as it will check this flag and skip
-	 * any checkpointing work */
+	/*
+	 * Mark the inode as successfully deleted.
+	 *
+	 * This is important for ocfs2_clear_inode() as it will check
+	 * this flag and skip any checkpointing work
+	 *
+	 * ocfs2_stuff_meta_lvb() also uses this flag to invalidate
+	 * the LVB for other nodes.
+	 */
 	OCFS2_I(inode)->ip_flags |= OCFS2_INODE_DELETED;
 
 bail_unlock_inode:
@@ -1025,12 +1101,10 @@
 	/* Testing ip_orphaned_slot here wouldn't work because we may
 	 * not have gotten a delete_inode vote from any other nodes
 	 * yet. */
-	if (oi->ip_flags & OCFS2_INODE_MAYBE_ORPHANED) {
-		mlog(0, "Inode was orphaned on another node, clearing nlink.\n");
-		inode->i_nlink = 0;
-	}
-
-	generic_drop_inode(inode);
+	if (oi->ip_flags & OCFS2_INODE_MAYBE_ORPHANED)
+		generic_delete_inode(inode);
+	else
+		generic_drop_inode(inode);
 
 	mlog_exit_void();
 }
diff --git a/fs/ocfs2/inode.h b/fs/ocfs2/inode.h
index 4d1e539..9957810 100644
--- a/fs/ocfs2/inode.h
+++ b/fs/ocfs2/inode.h
@@ -122,7 +122,13 @@
 void ocfs2_clear_inode(struct inode *inode);
 void ocfs2_delete_inode(struct inode *inode);
 void ocfs2_drop_inode(struct inode *inode);
-struct inode *ocfs2_iget(struct ocfs2_super *osb, u64 feoff);
+
+/* Flags for ocfs2_iget() */
+#define OCFS2_FI_FLAG_NOWAIT	0x1
+#define OCFS2_FI_FLAG_DELETE	0x2
+#define OCFS2_FI_FLAG_SYSFILE	0x4
+#define OCFS2_FI_FLAG_NOLOCK	0x8
+struct inode *ocfs2_iget(struct ocfs2_super *osb, u64 feoff, int flags);
 struct inode *ocfs2_ilookup_for_vote(struct ocfs2_super *osb,
 				     u64 blkno,
 				     int delete_vote);
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index f92bf1d..fd9734d 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -1493,7 +1493,8 @@
 			if (de->name_len == 2 && !strncmp("..", de->name, 2))
 				continue;
 
-			iter = ocfs2_iget(osb, le64_to_cpu(de->inode));
+			iter = ocfs2_iget(osb, le64_to_cpu(de->inode),
+					  OCFS2_FI_FLAG_NOLOCK);
 			if (IS_ERR(iter))
 				continue;
 
diff --git a/fs/ocfs2/namei.c b/fs/ocfs2/namei.c
index 0d3e939..849c3b4 100644
--- a/fs/ocfs2/namei.c
+++ b/fs/ocfs2/namei.c
@@ -179,7 +179,7 @@
 	if (status < 0)
 		goto bail_add;
 
-	inode = ocfs2_iget(OCFS2_SB(dir->i_sb), blkno);
+	inode = ocfs2_iget(OCFS2_SB(dir->i_sb), blkno, 0);
 	if (IS_ERR(inode)) {
 		mlog(ML_ERROR, "Unable to create inode %llu\n",
 		     (unsigned long long)blkno);
@@ -199,10 +199,32 @@
 	spin_unlock(&oi->ip_lock);
 
 bail_add:
-
 	dentry->d_op = &ocfs2_dentry_ops;
 	ret = d_splice_alias(inode, dentry);
 
+	if (inode) {
+		/*
+		 * If d_splice_alias() finds a DCACHE_DISCONNECTED
+		 * dentry, it will d_move() it on top of ourse. The
+		 * return value will indicate this however, so in
+		 * those cases, we switch them around for the locking
+		 * code.
+		 *
+		 * NOTE: This dentry already has ->d_op set from
+		 * ocfs2_get_parent() and ocfs2_get_dentry()
+		 */
+		if (ret)
+			dentry = ret;
+
+		status = ocfs2_dentry_attach_lock(dentry, inode,
+						  OCFS2_I(dir)->ip_blkno);
+		if (status) {
+			mlog_errno(status);
+			ret = ERR_PTR(status);
+			goto bail_unlock;
+		}
+	}
+
 bail_unlock:
 	/* Don't drop the cluster lock until *after* the d_add --
 	 * unlink on another node will message us to remove that
@@ -418,6 +440,13 @@
 		goto leave;
 	}
 
+	status = ocfs2_dentry_attach_lock(dentry, inode,
+					  OCFS2_I(dir)->ip_blkno);
+	if (status) {
+		mlog_errno(status);
+		goto leave;
+	}
+
 	insert_inode_hash(inode);
 	dentry->d_op = &ocfs2_dentry_ops;
 	d_instantiate(dentry, inode);
@@ -725,6 +754,12 @@
 		goto bail;
 	}
 
+	err = ocfs2_dentry_attach_lock(dentry, inode, OCFS2_I(dir)->ip_blkno);
+	if (err) {
+		mlog_errno(err);
+		goto bail;
+	}
+
 	atomic_inc(&inode->i_count);
 	dentry->d_op = &ocfs2_dentry_ops;
 	d_instantiate(dentry, inode);
@@ -743,6 +778,23 @@
 	return err;
 }
 
+/*
+ * Takes and drops an exclusive lock on the given dentry. This will
+ * force other nodes to drop it.
+ */
+static int ocfs2_remote_dentry_delete(struct dentry *dentry)
+{
+	int ret;
+
+	ret = ocfs2_dentry_lock(dentry, 1);
+	if (ret)
+		mlog_errno(ret);
+	else
+		ocfs2_dentry_unlock(dentry, 1);
+
+	return ret;
+}
+
 static int ocfs2_unlink(struct inode *dir,
 			struct dentry *dentry)
 {
@@ -832,8 +884,7 @@
 	else
 		inode->i_nlink--;
 
-	status = ocfs2_request_unlink_vote(inode, dentry,
-					   (unsigned int) inode->i_nlink);
+	status = ocfs2_remote_dentry_delete(dentry);
 	if (status < 0) {
 		/* This vote should succeed under all normal
 		 * circumstances. */
@@ -1019,7 +1070,6 @@
 	struct buffer_head *old_inode_de_bh = NULL; // if old_dentry is a dir,
 						    // this is the 1st dirent bh
 	nlink_t old_dir_nlink = old_dir->i_nlink, new_dir_nlink = new_dir->i_nlink;
-	unsigned int links_count;
 
 	/* At some point it might be nice to break this function up a
 	 * bit. */
@@ -1093,23 +1143,26 @@
 		}
 	}
 
-	if (S_ISDIR(old_inode->i_mode)) {
-		/* Directories actually require metadata updates to
-		 * the directory info so we can't get away with not
-		 * doing node locking on it. */
-		status = ocfs2_meta_lock(old_inode, handle, NULL, 1);
-		if (status < 0) {
-			if (status != -ENOENT)
-				mlog_errno(status);
-			goto bail;
-		}
-
-		status = ocfs2_request_rename_vote(old_inode, old_dentry);
-		if (status < 0) {
+	/*
+	 * Though we don't require an inode meta data update if
+	 * old_inode is not a directory, we lock anyway here to ensure
+	 * the vote thread on other nodes won't have to concurrently
+	 * downconvert the inode and the dentry locks.
+	 */
+	status = ocfs2_meta_lock(old_inode, handle, NULL, 1);
+	if (status < 0) {
+		if (status != -ENOENT)
 			mlog_errno(status);
-			goto bail;
-		}
+		goto bail;
+	}
 
+	status = ocfs2_remote_dentry_delete(old_dentry);
+	if (status < 0) {
+		mlog_errno(status);
+		goto bail;
+	}
+
+	if (S_ISDIR(old_inode->i_mode)) {
 		status = -EIO;
 		old_inode_de_bh = ocfs2_bread(old_inode, 0, &status, 0);
 		if (!old_inode_de_bh)
@@ -1123,14 +1176,6 @@
 		if (!new_inode && new_dir!=old_dir &&
 		    new_dir->i_nlink >= OCFS2_LINK_MAX)
 			goto bail;
-	} else {
-		/* Ah, the simple case - we're a file so just send a
-		 * message. */
-		status = ocfs2_request_rename_vote(old_inode, old_dentry);
-		if (status < 0) {
-			mlog_errno(status);
-			goto bail;
-		}
 	}
 
 	status = -ENOENT;
@@ -1202,13 +1247,7 @@
 			goto bail;
 		}
 
-		if (S_ISDIR(new_inode->i_mode))
-			links_count = 0;
-		else
-			links_count = (unsigned int) (new_inode->i_nlink - 1);
-
-		status = ocfs2_request_unlink_vote(new_inode, new_dentry,
-						   links_count);
+		status = ocfs2_remote_dentry_delete(new_dentry);
 		if (status < 0) {
 			mlog_errno(status);
 			goto bail;
@@ -1387,6 +1426,7 @@
 		}
 	}
 
+	ocfs2_dentry_move(old_dentry, new_dentry, old_dir, new_dir);
 	status = 0;
 bail:
 	if (rename_lock)
@@ -1675,6 +1715,12 @@
 		goto bail;
 	}
 
+	status = ocfs2_dentry_attach_lock(dentry, inode, OCFS2_I(dir)->ip_blkno);
+	if (status) {
+		mlog_errno(status);
+		goto bail;
+	}
+
 	insert_inode_hash(inode);
 	dentry->d_op = &ocfs2_dentry_ops;
 	d_instantiate(dentry, inode);
diff --git a/fs/ocfs2/ocfs2_lockid.h b/fs/ocfs2/ocfs2_lockid.h
index 7dd9e1e..4d5d565 100644
--- a/fs/ocfs2/ocfs2_lockid.h
+++ b/fs/ocfs2/ocfs2_lockid.h
@@ -35,12 +35,15 @@
 #define OCFS2_LOCK_ID_MAX_LEN  32
 #define OCFS2_LOCK_ID_PAD "000000"
 
+#define OCFS2_DENTRY_LOCK_INO_START 18
+
 enum ocfs2_lock_type {
 	OCFS2_LOCK_TYPE_META = 0,
 	OCFS2_LOCK_TYPE_DATA,
 	OCFS2_LOCK_TYPE_SUPER,
 	OCFS2_LOCK_TYPE_RENAME,
 	OCFS2_LOCK_TYPE_RW,
+	OCFS2_LOCK_TYPE_DENTRY,
 	OCFS2_NUM_LOCK_TYPES
 };
 
@@ -63,6 +66,9 @@
 		case OCFS2_LOCK_TYPE_RW:
 			c = 'W';
 			break;
+		case OCFS2_LOCK_TYPE_DENTRY:
+			c = 'N';
+			break;
 		default:
 			c = '\0';
 	}
@@ -70,4 +76,23 @@
 	return c;
 }
 
+static char *ocfs2_lock_type_strings[] = {
+	[OCFS2_LOCK_TYPE_META] = "Meta",
+	[OCFS2_LOCK_TYPE_DATA] = "Data",
+	[OCFS2_LOCK_TYPE_SUPER] = "Super",
+	[OCFS2_LOCK_TYPE_RENAME] = "Rename",
+	/* Need to differntiate from [R]ename.. serializing writes is the
+	 * important job it does, anyway. */
+	[OCFS2_LOCK_TYPE_RW] = "Write/Read",
+	[OCFS2_LOCK_TYPE_DENTRY] = "Dentry",
+};
+
+static inline const char *ocfs2_lock_type_string(enum ocfs2_lock_type type)
+{
+#ifdef __KERNEL__
+	mlog_bug_on_msg(type >= OCFS2_NUM_LOCK_TYPES, "%d\n", type);
+#endif
+	return ocfs2_lock_type_strings[type];
+}
+
 #endif  /* OCFS2_LOCKID_H */
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index d17e33e..4c29cd7 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -202,7 +202,7 @@
 
 	mlog_entry_void();
 
-	new = ocfs2_iget(osb, osb->root_blkno);
+	new = ocfs2_iget(osb, osb->root_blkno, OCFS2_FI_FLAG_SYSFILE);
 	if (IS_ERR(new)) {
 		status = PTR_ERR(new);
 		mlog_errno(status);
@@ -210,7 +210,7 @@
 	}
 	osb->root_inode = new;
 
-	new = ocfs2_iget(osb, osb->system_dir_blkno);
+	new = ocfs2_iget(osb, osb->system_dir_blkno, OCFS2_FI_FLAG_SYSFILE);
 	if (IS_ERR(new)) {
 		status = PTR_ERR(new);
 		mlog_errno(status);
@@ -682,7 +682,7 @@
 	.kill_sb        = kill_block_super, /* set to the generic one
 					     * right now, but do we
 					     * need to change that? */
-	.fs_flags       = FS_REQUIRES_DEV,
+	.fs_flags       = FS_REQUIRES_DEV|FS_RENAME_DOES_D_MOVE,
 	.next           = NULL
 };
 
diff --git a/fs/ocfs2/sysfile.c b/fs/ocfs2/sysfile.c
index fc29cb7..5df6e35 100644
--- a/fs/ocfs2/sysfile.c
+++ b/fs/ocfs2/sysfile.c
@@ -28,11 +28,11 @@
 #include <linux/slab.h>
 #include <linux/highmem.h>
 
-#include "ocfs2.h"
-
 #define MLOG_MASK_PREFIX ML_INODE
 #include <cluster/masklog.h>
 
+#include "ocfs2.h"
+
 #include "alloc.h"
 #include "dir.h"
 #include "inode.h"
@@ -115,7 +115,7 @@
 		goto bail;
 	}
 
-	inode = ocfs2_iget(osb, blkno);
+	inode = ocfs2_iget(osb, blkno, OCFS2_FI_FLAG_SYSFILE);
 	if (IS_ERR(inode)) {
 		mlog_errno(PTR_ERR(inode));
 		inode = NULL;
diff --git a/fs/ocfs2/vote.c b/fs/ocfs2/vote.c
index cf70fe2..5b4dca7 100644
--- a/fs/ocfs2/vote.c
+++ b/fs/ocfs2/vote.c
@@ -74,9 +74,6 @@
 		__be32 v_orphaned_slot;	/* Used during delete votes */
 		__be32 v_nlink;		/* Used during unlink votes */
 	} md1;				/* Message type dependant 1 */
-	__be32 v_unlink_namelen;
-	__be64 v_unlink_parent;
-	u8  v_unlink_dirent[OCFS2_VOTE_FILENAME_LEN];
 };
 
 /* Responses are given these values to maintain backwards
@@ -100,8 +97,6 @@
 enum ocfs2_vote_request {
 	OCFS2_VOTE_REQ_INVALID = 0,
 	OCFS2_VOTE_REQ_DELETE,
-	OCFS2_VOTE_REQ_UNLINK,
-	OCFS2_VOTE_REQ_RENAME,
 	OCFS2_VOTE_REQ_MOUNT,
 	OCFS2_VOTE_REQ_UMOUNT,
 	OCFS2_VOTE_REQ_LAST
@@ -261,103 +256,13 @@
 	return response;
 }
 
-static int ocfs2_match_dentry(struct dentry *dentry,
-			      u64 parent_blkno,
-			      unsigned int namelen,
-			      const char *name)
-{
-	struct inode *parent;
-
-	if (!dentry->d_parent) {
-		mlog(0, "Detached from parent.\n");
-		return 0;
-	}
-
-	parent = dentry->d_parent->d_inode;
-	/* Negative parent dentry? */
-	if (!parent)
-		return 0;
-
-	/* Name is in a different directory. */
-	if (OCFS2_I(parent)->ip_blkno != parent_blkno)
-		return 0;
-
-	if (dentry->d_name.len != namelen)
-		return 0;
-
-	/* comparison above guarantees this is safe. */
-	if (memcmp(dentry->d_name.name, name, namelen))
-		return 0;
-
-	return 1;
-}
-
-static void ocfs2_process_dentry_request(struct inode *inode,
-					 int rename,
-					 unsigned int new_nlink,
-					 u64 parent_blkno,
-					 unsigned int namelen,
-					 const char *name)
-{
-	struct dentry *dentry = NULL;
-	struct list_head *p;
-	struct ocfs2_inode_info *oi = OCFS2_I(inode);
-
-	mlog(0, "parent %llu, namelen = %u, name = %.*s\n",
-	     (unsigned long long)parent_blkno, namelen, namelen, name);
-
-	spin_lock(&dcache_lock);
-
-	/* Another node is removing this name from the system. It is
-	 * up to us to find the corresponding dentry and if it exists,
-	 * unhash it from the dcache. */
-	list_for_each(p, &inode->i_dentry) {
-		dentry = list_entry(p, struct dentry, d_alias);
-
-		if (ocfs2_match_dentry(dentry, parent_blkno, namelen, name)) {
-			mlog(0, "dentry found: %.*s\n",
-			     dentry->d_name.len, dentry->d_name.name);
-
-			dget_locked(dentry);
-			break;
-		}
-
-		dentry = NULL;
-	}
-
-	spin_unlock(&dcache_lock);
-
-	if (dentry) {
-		d_delete(dentry);
-		dput(dentry);
-	}
-
-	/* rename votes don't send link counts */
-	if (!rename) {
-		mlog(0, "new_nlink = %u\n", new_nlink);
-
-		/* We don't have the proper locks here to directly
-		 * change i_nlink and besides, the vote is sent
-		 * *before* the operation so it may have failed on the
-		 * other node. This passes a hint to ocfs2_drop_inode
-		 * to force ocfs2_delete_inode, who will take the
-		 * proper cluster locks to sort things out. */
-		if (new_nlink == 0) {
-			spin_lock(&oi->ip_lock);
-			oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
-			spin_unlock(&OCFS2_I(inode)->ip_lock);
-		}
-	}
-}
-
 static void ocfs2_process_vote(struct ocfs2_super *osb,
 			       struct ocfs2_vote_msg *msg)
 {
 	int net_status, vote_response;
 	int orphaned_slot = 0;
-	int rename = 0;
-	unsigned int node_num, generation, new_nlink, namelen;
-	u64 blkno, parent_blkno;
+	unsigned int node_num, generation;
+	u64 blkno;
 	enum ocfs2_vote_request request;
 	struct inode *inode = NULL;
 	struct ocfs2_msg_hdr *hdr = &msg->v_hdr;
@@ -437,18 +342,6 @@
 		vote_response = ocfs2_process_delete_request(inode,
 							     &orphaned_slot);
 		break;
-	case OCFS2_VOTE_REQ_RENAME:
-		rename = 1;
-		/* fall through */
-	case OCFS2_VOTE_REQ_UNLINK:
-		parent_blkno = be64_to_cpu(msg->v_unlink_parent);
-		namelen = be32_to_cpu(msg->v_unlink_namelen);
-		/* new_nlink will be ignored in case of a rename vote */
-		new_nlink = be32_to_cpu(msg->md1.v_nlink);
-		ocfs2_process_dentry_request(inode, rename, new_nlink,
-					     parent_blkno, namelen,
-					     msg->v_unlink_dirent);
-		break;
 	default:
 		mlog(ML_ERROR, "node %u, invalid request: %u\n",
 		     node_num, request);
@@ -889,75 +782,6 @@
 	return status;
 }
 
-static void ocfs2_setup_unlink_vote(struct ocfs2_vote_msg *request,
-				    struct dentry *dentry)
-{
-	struct inode *parent = dentry->d_parent->d_inode;
-
-	/* We need some values which will uniquely identify a dentry
-	 * on the other nodes so that they can find it and run
-	 * d_delete against it. Parent directory block and full name
-	 * should suffice. */
-
-	mlog(0, "unlink/rename request: parent: %llu name: %.*s\n",
-	     (unsigned long long)OCFS2_I(parent)->ip_blkno, dentry->d_name.len,
-	     dentry->d_name.name);
-
-	request->v_unlink_parent = cpu_to_be64(OCFS2_I(parent)->ip_blkno);
-	request->v_unlink_namelen = cpu_to_be32(dentry->d_name.len);
-	memcpy(request->v_unlink_dirent, dentry->d_name.name,
-	       dentry->d_name.len);
-}
-
-int ocfs2_request_unlink_vote(struct inode *inode,
-			      struct dentry *dentry,
-			      unsigned int nlink)
-{
-	int status;
-	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-	struct ocfs2_vote_msg *request;
-
-	if (dentry->d_name.len > OCFS2_VOTE_FILENAME_LEN)
-		return -ENAMETOOLONG;
-
-	status = -ENOMEM;
-	request = ocfs2_new_vote_request(osb, OCFS2_I(inode)->ip_blkno,
-					 inode->i_generation,
-					 OCFS2_VOTE_REQ_UNLINK, nlink);
-	if (request) {
-		ocfs2_setup_unlink_vote(request, dentry);
-
-		status = ocfs2_request_vote(inode, request, NULL);
-
-		kfree(request);
-	}
-	return status;
-}
-
-int ocfs2_request_rename_vote(struct inode *inode,
-			      struct dentry *dentry)
-{
-	int status;
-	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-	struct ocfs2_vote_msg *request;
-
-	if (dentry->d_name.len > OCFS2_VOTE_FILENAME_LEN)
-		return -ENAMETOOLONG;
-
-	status = -ENOMEM;
-	request = ocfs2_new_vote_request(osb, OCFS2_I(inode)->ip_blkno,
-					 inode->i_generation,
-					 OCFS2_VOTE_REQ_RENAME, 0);
-	if (request) {
-		ocfs2_setup_unlink_vote(request, dentry);
-
-		status = ocfs2_request_vote(inode, request, NULL);
-
-		kfree(request);
-	}
-	return status;
-}
-
 int ocfs2_request_mount_vote(struct ocfs2_super *osb)
 {
 	int status;
diff --git a/fs/ocfs2/vote.h b/fs/ocfs2/vote.h
index 9cce607..53ebc1c6 100644
--- a/fs/ocfs2/vote.h
+++ b/fs/ocfs2/vote.h
@@ -39,11 +39,6 @@
 }
 
 int ocfs2_request_delete_vote(struct inode *inode);
-int ocfs2_request_unlink_vote(struct inode *inode,
-			      struct dentry *dentry,
-			      unsigned int nlink);
-int ocfs2_request_rename_vote(struct inode *inode,
-			      struct dentry *dentry);
 int ocfs2_request_mount_vote(struct ocfs2_super *osb);
 int ocfs2_request_umount_vote(struct ocfs2_super *osb);
 int ocfs2_register_net_handlers(struct ocfs2_super *osb);
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 555bc19..1d3e601 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -92,9 +92,10 @@
 #define FS_REQUIRES_DEV 1 
 #define FS_BINARY_MOUNTDATA 2
 #define FS_REVAL_DOT	16384	/* Check the paths ".", ".." for staleness */
-#define FS_ODD_RENAME	32768	/* Temporary stuff; will go away as soon
-				  * as nfs_rename() will be cleaned up
-				  */
+#define FS_RENAME_DOES_D_MOVE	32768	/* FS will handle d_move()
+					 * during rename() internally.
+					 */
+
 /*
  * These are the fs-independent mount-flags: up to 32 flags are supported
  */