ceph: rework dcache readdir

Previously our dcache readdir code relies on that child dentries in
directory dentry's d_subdir list are sorted by dentry's offset in
descending order. When adding dentries to the dcache, if a dentry
already exists, our readdir code moves it to head of directory
dentry's d_subdir list. This design relies on dcache internals.
Al Viro suggests using ncpfs's approach: keeping array of pointers
to dentries in page cache of directory inode. the validity of those
pointers are presented by directory inode's complete and ordered
flags. When a dentry gets pruned, we clear directory inode's complete
flag in the d_prune() callback. Before moving a dentry to other
directory, we clear the ordered flag for both old and new directory.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index dd7b20a..dc10c9d 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -833,7 +833,9 @@
 		used |= CEPH_CAP_PIN;
 	if (ci->i_rd_ref)
 		used |= CEPH_CAP_FILE_RD;
-	if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages)
+	if (ci->i_rdcache_ref ||
+	    (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */
+	     ci->vfs_inode.i_data.nrpages))
 		used |= CEPH_CAP_FILE_CACHE;
 	if (ci->i_wr_ref)
 		used |= CEPH_CAP_FILE_WR;
@@ -1651,9 +1653,10 @@
 	 * If we fail, it's because pages are locked.... try again later.
 	 */
 	if ((!is_delayed || mdsc->stopping) &&
-	    ci->i_wrbuffer_ref == 0 &&               /* no dirty pages... */
-	    inode->i_data.nrpages &&                 /* have cached pages */
-	    (file_wanted == 0 ||                     /* no open files */
+	    !S_ISDIR(inode->i_mode) &&		/* ignore readdir cache */
+	    ci->i_wrbuffer_ref == 0 &&		/* no dirty pages... */
+	    inode->i_data.nrpages &&		/* have cached pages */
+	    (file_wanted == 0 ||		/* no open files */
 	     (revoking & (CEPH_CAP_FILE_CACHE|
 			  CEPH_CAP_FILE_LAZYIO))) && /*  or revoking cache */
 	    !tried_invalidate) {
@@ -2805,7 +2808,8 @@
 	 * try to invalidate (once).  (If there are dirty buffers, we
 	 * will invalidate _after_ writeback.)
 	 */
-	if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
+	if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
+	    ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
 	    (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
 	    !ci->i_wrbuffer_ref) {
 		if (try_nonblocking_invalidate(inode)) {
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index b99f2ff..9314b4e 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -107,6 +107,27 @@
 }
 
 /*
+ * make note of the last dentry we read, so we can
+ * continue at the same lexicographical point,
+ * regardless of what dir changes take place on the
+ * server.
+ */
+static int note_last_dentry(struct ceph_file_info *fi, const char *name,
+		            int len, unsigned next_offset)
+{
+	char *buf = kmalloc(len+1, GFP_KERNEL);
+	if (!buf)
+		return -ENOMEM;
+	kfree(fi->last_name);
+	fi->last_name = buf;
+	memcpy(fi->last_name, name, len);
+	fi->last_name[len] = 0;
+	fi->next_offset = next_offset;
+	dout("note_last_dentry '%s'\n", fi->last_name);
+	return 0;
+}
+
+/*
  * When possible, we try to satisfy a readdir by peeking at the
  * dcache.  We make this work by carefully ordering dentries on
  * d_child when we initially get results back from the MDS, and
@@ -123,123 +144,113 @@
 	struct ceph_file_info *fi = file->private_data;
 	struct dentry *parent = file->f_path.dentry;
 	struct inode *dir = d_inode(parent);
-	struct list_head *p;
-	struct dentry *dentry, *last;
+	struct dentry *dentry, *last = NULL;
 	struct ceph_dentry_info *di;
+	unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry *);
 	int err = 0;
+	loff_t ptr_pos = 0;
+	struct ceph_readdir_cache_control cache_ctl = {};
 
-	/* claim ref on last dentry we returned */
-	last = fi->dentry;
-	fi->dentry = NULL;
+	dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
 
-	dout("__dcache_readdir %p v%u at %llu (last %p)\n",
-	     dir, shared_gen, ctx->pos, last);
-
-	spin_lock(&parent->d_lock);
-
-	/* start at beginning? */
-	if (ctx->pos == 2 || last == NULL ||
-	    fpos_cmp(ctx->pos, ceph_dentry(last)->offset) < 0) {
-		if (list_empty(&parent->d_subdirs))
-			goto out_unlock;
-		p = parent->d_subdirs.prev;
-		dout(" initial p %p/%p\n", p->prev, p->next);
-	} else {
-		p = last->d_child.prev;
+	/* we can calculate cache index for the first dirfrag */
+	if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
+		cache_ctl.index = fpos_off(ctx->pos) - 2;
+		BUG_ON(cache_ctl.index < 0);
+		ptr_pos = cache_ctl.index * sizeof(struct dentry *);
 	}
 
-more:
-	dentry = list_entry(p, struct dentry, d_child);
-	di = ceph_dentry(dentry);
-	while (1) {
-		dout(" p %p/%p %s d_subdirs %p/%p\n", p->prev, p->next,
-		     d_unhashed(dentry) ? "!hashed" : "hashed",
-		     parent->d_subdirs.prev, parent->d_subdirs.next);
-		if (p == &parent->d_subdirs) {
+	while (true) {
+		pgoff_t pgoff;
+		bool emit_dentry;
+
+		if (ptr_pos >= i_size_read(dir)) {
 			fi->flags |= CEPH_F_ATEND;
-			goto out_unlock;
+			err = 0;
+			break;
 		}
-		spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
+
+		err = -EAGAIN;
+		pgoff = ptr_pos >> PAGE_CACHE_SHIFT;
+		if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
+			ceph_readdir_cache_release(&cache_ctl);
+			cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
+			if (!cache_ctl.page) {
+				dout(" page %lu not found\n", pgoff);
+				break;
+			}
+			/* reading/filling the cache are serialized by
+			 * i_mutex, no need to use page lock */
+			unlock_page(cache_ctl.page);
+			cache_ctl.dentries = kmap(cache_ctl.page);
+		}
+
+		rcu_read_lock();
+		spin_lock(&parent->d_lock);
+		/* check i_size again here, because empty directory can be
+		 * marked as complete while not holding the i_mutex. */
+		if (ceph_dir_is_complete_ordered(dir) &&
+		    ptr_pos < i_size_read(dir))
+			dentry = cache_ctl.dentries[cache_ctl.index % nsize];
+		else
+			dentry = NULL;
+		spin_unlock(&parent->d_lock);
+		if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
+			dentry = NULL;
+		rcu_read_unlock();
+		if (!dentry)
+			break;
+
+		emit_dentry = false;
+		di = ceph_dentry(dentry);
+		spin_lock(&dentry->d_lock);
 		if (di->lease_shared_gen == shared_gen &&
-		    !d_unhashed(dentry) && d_really_is_positive(dentry) &&
+		    d_really_is_positive(dentry) &&
 		    ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR &&
 		    ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&
-		    fpos_cmp(ctx->pos, di->offset) <= 0)
-			break;
-		dout(" skipping %p %pd at %llu (%llu)%s%s\n", dentry,
-		     dentry, di->offset,
-		     ctx->pos, d_unhashed(dentry) ? " unhashed" : "",
-		     !d_inode(dentry) ? " null" : "");
-		spin_unlock(&dentry->d_lock);
-		p = p->prev;
-		dentry = list_entry(p, struct dentry, d_child);
-		di = ceph_dentry(dentry);
-	}
-
-	dget_dlock(dentry);
-	spin_unlock(&dentry->d_lock);
-	spin_unlock(&parent->d_lock);
-
-	/* make sure a dentry wasn't dropped while we didn't have parent lock */
-	if (!ceph_dir_is_complete_ordered(dir)) {
-		dout(" lost dir complete on %p; falling back to mds\n", dir);
-		dput(dentry);
-		err = -EAGAIN;
-		goto out;
-	}
-
-	dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
-	     dentry, dentry, d_inode(dentry));
-	if (!dir_emit(ctx, dentry->d_name.name,
-		      dentry->d_name.len,
-		      ceph_translate_ino(dentry->d_sb, d_inode(dentry)->i_ino),
-		      d_inode(dentry)->i_mode >> 12)) {
-		if (last) {
-			/* remember our position */
-			fi->dentry = last;
-			fi->next_offset = fpos_off(di->offset);
+		    fpos_cmp(ctx->pos, di->offset) <= 0) {
+			emit_dentry = true;
 		}
-		dput(dentry);
-		return 0;
+		spin_unlock(&dentry->d_lock);
+
+		if (emit_dentry) {
+			dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
+			     dentry, dentry, d_inode(dentry));
+			ctx->pos = di->offset;
+			if (!dir_emit(ctx, dentry->d_name.name,
+				      dentry->d_name.len,
+				      ceph_translate_ino(dentry->d_sb,
+							 d_inode(dentry)->i_ino),
+				      d_inode(dentry)->i_mode >> 12)) {
+				dput(dentry);
+				err = 0;
+				break;
+			}
+			ctx->pos++;
+
+			if (last)
+				dput(last);
+			last = dentry;
+		} else {
+			dput(dentry);
+		}
+
+		cache_ctl.index++;
+		ptr_pos += sizeof(struct dentry *);
 	}
-
-	ctx->pos = di->offset + 1;
-
-	if (last)
+	ceph_readdir_cache_release(&cache_ctl);
+	if (last) {
+		int ret;
+		di = ceph_dentry(last);
+		ret = note_last_dentry(fi, last->d_name.name, last->d_name.len,
+				       fpos_off(di->offset) + 1);
+		if (ret < 0)
+			err = ret;
 		dput(last);
-	last = dentry;
-
-	spin_lock(&parent->d_lock);
-	p = p->prev;	/* advance to next dentry */
-	goto more;
-
-out_unlock:
-	spin_unlock(&parent->d_lock);
-out:
-	if (last)
-		dput(last);
+	}
 	return err;
 }
 
-/*
- * make note of the last dentry we read, so we can
- * continue at the same lexicographical point,
- * regardless of what dir changes take place on the
- * server.
- */
-static int note_last_dentry(struct ceph_file_info *fi, const char *name,
-			    int len)
-{
-	kfree(fi->last_name);
-	fi->last_name = kmalloc(len+1, GFP_KERNEL);
-	if (!fi->last_name)
-		return -ENOMEM;
-	memcpy(fi->last_name, name, len);
-	fi->last_name[len] = 0;
-	dout("note_last_dentry '%s'\n", fi->last_name);
-	return 0;
-}
-
 static int ceph_readdir(struct file *file, struct dir_context *ctx)
 {
 	struct ceph_file_info *fi = file->private_data;
@@ -280,8 +291,7 @@
 
 	/* can we use the dcache? */
 	spin_lock(&ci->i_ceph_lock);
-	if ((ctx->pos == 2 || fi->dentry) &&
-	    ceph_test_mount_opt(fsc, DCACHE) &&
+	if (ceph_test_mount_opt(fsc, DCACHE) &&
 	    !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
 	    ceph_snap(inode) != CEPH_SNAPDIR &&
 	    __ceph_dir_is_complete_ordered(ci) &&
@@ -296,24 +306,8 @@
 	} else {
 		spin_unlock(&ci->i_ceph_lock);
 	}
-	if (fi->dentry) {
-		err = note_last_dentry(fi, fi->dentry->d_name.name,
-				       fi->dentry->d_name.len);
-		if (err)
-			return err;
-		dput(fi->dentry);
-		fi->dentry = NULL;
-	}
 
 	/* proceed with a normal readdir */
-
-	if (ctx->pos == 2) {
-		/* note dir version at start of readdir so we can tell
-		 * if any dentries get dropped */
-		fi->dir_release_count = atomic_read(&ci->i_release_count);
-		fi->dir_ordered_count = ci->i_ordered_count;
-	}
-
 more:
 	/* do we have the correct frag content buffered? */
 	if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -348,6 +342,9 @@
 				return -ENOMEM;
 			}
 		}
+		req->r_dir_release_cnt = fi->dir_release_count;
+		req->r_dir_ordered_cnt = fi->dir_ordered_count;
+		req->r_readdir_cache_idx = fi->readdir_cache_idx;
 		req->r_readdir_offset = fi->next_offset;
 		req->r_args.readdir.frag = cpu_to_le32(frag);
 
@@ -364,26 +361,38 @@
 		     (int)req->r_reply_info.dir_end,
 		     (int)req->r_reply_info.dir_complete);
 
-		if (!req->r_did_prepopulate) {
-			dout("readdir !did_prepopulate");
-			/* preclude from marking dir complete */
-			fi->dir_release_count--;
-		}
 
 		/* note next offset and last dentry name */
 		rinfo = &req->r_reply_info;
 		if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
 			frag = le32_to_cpu(rinfo->dir_dir->frag);
-			if (ceph_frag_is_leftmost(frag))
-				fi->next_offset = 2;
-			else
-				fi->next_offset = 0;
-			off = fi->next_offset;
+			off = req->r_readdir_offset;
+			fi->next_offset = off;
 		}
+
 		fi->frag = frag;
 		fi->offset = fi->next_offset;
 		fi->last_readdir = req;
 
+		if (req->r_did_prepopulate) {
+			fi->readdir_cache_idx = req->r_readdir_cache_idx;
+			if (fi->readdir_cache_idx < 0) {
+				/* preclude from marking dir ordered */
+				fi->dir_ordered_count = 0;
+			} else if (ceph_frag_is_leftmost(frag) && off == 2) {
+				/* note dir version at start of readdir so
+				 * we can tell if any dentries get dropped */
+				fi->dir_release_count = req->r_dir_release_cnt;
+				fi->dir_ordered_count = req->r_dir_ordered_cnt;
+			}
+		} else {
+			dout("readdir !did_prepopulate");
+			/* disable readdir cache */
+			fi->readdir_cache_idx = -1;
+			/* preclude from marking dir complete */
+			fi->dir_release_count = 0;
+		}
+
 		if (req->r_reply_info.dir_end) {
 			kfree(fi->last_name);
 			fi->last_name = NULL;
@@ -394,10 +403,10 @@
 		} else {
 			err = note_last_dentry(fi,
 				       rinfo->dir_dname[rinfo->dir_nr-1],
-				       rinfo->dir_dname_len[rinfo->dir_nr-1]);
+				       rinfo->dir_dname_len[rinfo->dir_nr-1],
+				       fi->next_offset + rinfo->dir_nr);
 			if (err)
 				return err;
-			fi->next_offset += rinfo->dir_nr;
 		}
 	}
 
@@ -453,16 +462,22 @@
 	 * were released during the whole readdir, and we should have
 	 * the complete dir contents in our cache.
 	 */
-	spin_lock(&ci->i_ceph_lock);
-	if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
-		if (ci->i_ordered_count == fi->dir_ordered_count)
+	if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) {
+		spin_lock(&ci->i_ceph_lock);
+		if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) {
 			dout(" marking %p complete and ordered\n", inode);
-		else
+			/* use i_size to track number of entries in
+			 * readdir cache */
+			BUG_ON(fi->readdir_cache_idx < 0);
+			i_size_write(inode, fi->readdir_cache_idx *
+				     sizeof(struct dentry*));
+		} else {
 			dout(" marking %p complete\n", inode);
+		}
 		__ceph_dir_set_complete(ci, fi->dir_release_count,
 					fi->dir_ordered_count);
+		spin_unlock(&ci->i_ceph_lock);
 	}
-	spin_unlock(&ci->i_ceph_lock);
 
 	dout("readdir %p file %p done.\n", inode, file);
 	return 0;
@@ -476,14 +491,12 @@
 	}
 	kfree(fi->last_name);
 	fi->last_name = NULL;
+	fi->dir_release_count = 0;
+	fi->readdir_cache_idx = -1;
 	if (ceph_frag_is_leftmost(frag))
 		fi->next_offset = 2;  /* compensate for . and .. */
 	else
 		fi->next_offset = 0;
-	if (fi->dentry) {
-		dput(fi->dentry);
-		fi->dentry = NULL;
-	}
 	fi->flags &= ~CEPH_F_ATEND;
 }
 
@@ -497,13 +510,12 @@
 	mutex_lock(&inode->i_mutex);
 	retval = -EINVAL;
 	switch (whence) {
-	case SEEK_END:
-		offset += inode->i_size + 2;   /* FIXME */
-		break;
 	case SEEK_CUR:
 		offset += file->f_pos;
 	case SEEK_SET:
 		break;
+	case SEEK_END:
+		retval = -EOPNOTSUPP;
 	default:
 		goto out;
 	}
@@ -516,20 +528,18 @@
 		}
 		retval = offset;
 
-		/*
-		 * discard buffered readdir content on seekdir(0), or
-		 * seek to new frag, or seek prior to current chunk.
-		 */
 		if (offset == 0 ||
 		    fpos_frag(offset) != fi->frag ||
 		    fpos_off(offset) < fi->offset) {
+			/* discard buffered readdir content on seekdir(0), or
+			 * seek to new frag, or seek prior to current chunk */
 			dout("dir_llseek dropping %p content\n", file);
 			reset_readdir(fi, fpos_frag(offset));
+		} else if (fpos_cmp(offset, old_offset) > 0) {
+			/* reset dir_release_count if we did a forward seek */
+			fi->dir_release_count = 0;
+			fi->readdir_cache_idx = -1;
 		}
-
-		/* bump dir_release_count if we did a forward seek */
-		if (fpos_cmp(offset, old_offset) > 0)
-			fi->dir_release_count--;
 	}
 out:
 	mutex_unlock(&inode->i_mutex);
@@ -985,16 +995,15 @@
 		 * to do it here.
 		 */
 
+		/* d_move screws up sibling dentries' offsets */
+		ceph_dir_clear_complete(old_dir);
+		ceph_dir_clear_complete(new_dir);
+
 		d_move(old_dentry, new_dentry);
 
 		/* ensure target dentry is invalidated, despite
 		   rehashing bug in vfs_rename_dir */
 		ceph_invalidate_dentry_lease(new_dentry);
-
-		/* d_move screws up sibling dentries' offsets */
-		ceph_dir_clear_complete(old_dir);
-		ceph_dir_clear_complete(new_dir);
-
 	}
 	ceph_mdsc_put_request(req);
 	return err;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 424b5b5..faf9209 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -96,6 +96,7 @@
 		}
 		cf->fmode = fmode;
 		cf->next_offset = 2;
+		cf->readdir_cache_idx = -1;
 		file->private_data = cf;
 		BUG_ON(inode->i_fop->release != ceph_release);
 		break;
@@ -324,7 +325,6 @@
 		ceph_mdsc_put_request(cf->last_readdir);
 	kfree(cf->last_name);
 	kfree(cf->dir_info);
-	dput(cf->dentry);
 	kmem_cache_free(ceph_file_cachep, cf);
 
 	/* wake up anyone waiting for caps on this inode */
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e86d1a4..2a6d93b 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -390,9 +390,10 @@
 	ci->i_inline_version = 0;
 	ci->i_time_warp_seq = 0;
 	ci->i_ceph_flags = 0;
-	ci->i_ordered_count = 0;
-	atomic_set(&ci->i_release_count, 1);
-	atomic_set(&ci->i_complete_count, 0);
+	atomic64_set(&ci->i_ordered_count, 1);
+	atomic64_set(&ci->i_release_count, 1);
+	atomic64_set(&ci->i_complete_seq[0], 0);
+	atomic64_set(&ci->i_complete_seq[1], 0);
 	ci->i_symlink = NULL;
 
 	memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
@@ -860,9 +861,10 @@
 			    (issued & CEPH_CAP_FILE_EXCL) == 0 &&
 			    !__ceph_dir_is_complete(ci)) {
 				dout(" marking %p complete (empty)\n", inode);
+				i_size_write(inode, 0);
 				__ceph_dir_set_complete(ci,
-					atomic_read(&ci->i_release_count),
-					ci->i_ordered_count);
+					atomic64_read(&ci->i_release_count),
+					atomic64_read(&ci->i_ordered_count));
 			}
 
 			wake = true;
@@ -1214,6 +1216,10 @@
 			dout("fill_trace doing d_move %p -> %p\n",
 			     req->r_old_dentry, dn);
 
+			/* d_move screws up sibling dentries' offsets */
+			ceph_dir_clear_ordered(dir);
+			ceph_dir_clear_ordered(olddir);
+
 			d_move(req->r_old_dentry, dn);
 			dout(" src %p '%pd' dst %p '%pd'\n",
 			     req->r_old_dentry,
@@ -1224,10 +1230,6 @@
 			   rehashing bug in vfs_rename_dir */
 			ceph_invalidate_dentry_lease(dn);
 
-			/* d_move screws up sibling dentries' offsets */
-			ceph_dir_clear_ordered(dir);
-			ceph_dir_clear_ordered(olddir);
-
 			dout("dn %p gets new offset %lld\n", req->r_old_dentry,
 			     ceph_dentry(req->r_old_dentry)->offset);
 
@@ -1335,6 +1337,49 @@
 	return err;
 }
 
+void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl)
+{
+	if (ctl->page) {
+		kunmap(ctl->page);
+		page_cache_release(ctl->page);
+		ctl->page = NULL;
+	}
+}
+
+static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
+			      struct ceph_readdir_cache_control *ctl,
+			      struct ceph_mds_request *req)
+{
+	struct ceph_inode_info *ci = ceph_inode(dir);
+	unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry*);
+	unsigned idx = ctl->index % nsize;
+	pgoff_t pgoff = ctl->index / nsize;
+
+	if (!ctl->page || pgoff != page_index(ctl->page)) {
+		ceph_readdir_cache_release(ctl);
+		ctl->page  = grab_cache_page(&dir->i_data, pgoff);
+		if (!ctl->page) {
+			ctl->index = -1;
+			return -ENOMEM;
+		}
+		/* reading/filling the cache are serialized by
+		 * i_mutex, no need to use page lock */
+		unlock_page(ctl->page);
+		ctl->dentries = kmap(ctl->page);
+	}
+
+	if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
+	    req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) {
+		dout("readdir cache dn %p idx %d\n", dn, ctl->index);
+		ctl->dentries[idx] = dn;
+		ctl->index++;
+	} else {
+		dout("disable readdir cache\n");
+		ctl->index = -1;
+	}
+	return 0;
+}
+
 int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 			     struct ceph_mds_session *session)
 {
@@ -1347,8 +1392,11 @@
 	struct inode *snapdir = NULL;
 	struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
 	struct ceph_dentry_info *di;
-	u64 r_readdir_offset = req->r_readdir_offset;
 	u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+	struct ceph_readdir_cache_control cache_ctl = {};
+
+	if (req->r_aborted)
+		return readdir_prepopulate_inodes_only(req, session);
 
 	if (rinfo->dir_dir &&
 	    le32_to_cpu(rinfo->dir_dir->frag) != frag) {
@@ -1356,14 +1404,11 @@
 		     frag, le32_to_cpu(rinfo->dir_dir->frag));
 		frag = le32_to_cpu(rinfo->dir_dir->frag);
 		if (ceph_frag_is_leftmost(frag))
-			r_readdir_offset = 2;
+			req->r_readdir_offset = 2;
 		else
-			r_readdir_offset = 0;
+			req->r_readdir_offset = 0;
 	}
 
-	if (req->r_aborted)
-		return readdir_prepopulate_inodes_only(req, session);
-
 	if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
 		snapdir = ceph_get_snapdir(d_inode(parent));
 		parent = d_find_alias(snapdir);
@@ -1376,6 +1421,17 @@
 			ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir);
 	}
 
+	if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
+		/* note dir version at start of readdir so we can tell
+		 * if any dentries get dropped */
+		struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
+		req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
+		req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
+		req->r_readdir_cache_idx = 0;
+	}
+
+	cache_ctl.index = req->r_readdir_cache_idx;
+
 	/* FIXME: release caps/leases if error occurs */
 	for (i = 0; i < rinfo->dir_nr; i++) {
 		struct ceph_vino vino;
@@ -1415,13 +1471,6 @@
 			d_delete(dn);
 			dput(dn);
 			goto retry_lookup;
-		} else {
-			/* reorder parent's d_subdirs */
-			spin_lock(&parent->d_lock);
-			spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED);
-			list_move(&dn->d_child, &parent->d_subdirs);
-			spin_unlock(&dn->d_lock);
-			spin_unlock(&parent->d_lock);
 		}
 
 		/* inode */
@@ -1438,13 +1487,15 @@
 			}
 		}
 
-		if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
-			       req->r_request_started, -1,
-			       &req->r_caps_reservation) < 0) {
+		ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
+				 req->r_request_started, -1,
+				 &req->r_caps_reservation);
+		if (ret < 0) {
 			pr_err("fill_inode badness on %p\n", in);
 			if (d_really_is_negative(dn))
 				iput(in);
 			d_drop(dn);
+			err = ret;
 			goto next_item;
 		}
 
@@ -1460,19 +1511,28 @@
 		}
 
 		di = dn->d_fsdata;
-		di->offset = ceph_make_fpos(frag, i + r_readdir_offset);
+		di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
 
 		update_dentry_lease(dn, rinfo->dir_dlease[i],
 				    req->r_session,
 				    req->r_request_started);
+
+		if (err == 0 && cache_ctl.index >= 0) {
+			ret = fill_readdir_cache(d_inode(parent), dn,
+						 &cache_ctl, req);
+			if (ret < 0)
+				err = ret;
+		}
 next_item:
 		if (dn)
 			dput(dn);
 	}
-	if (err == 0)
-		req->r_did_prepopulate = true;
-
 out:
+	if (err == 0) {
+		req->r_did_prepopulate = true;
+		req->r_readdir_cache_idx = cache_ctl.index;
+	}
+	ceph_readdir_cache_release(&cache_ctl);
 	if (snapdir) {
 		iput(snapdir);
 		dput(parent);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 470be4e..762757e 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -253,6 +253,9 @@
 	bool		  r_got_unsafe, r_got_safe, r_got_result;
 
 	bool              r_did_prepopulate;
+	long long	  r_dir_release_cnt;
+	long long	  r_dir_ordered_cnt;
+	int		  r_readdir_cache_idx;
 	u32               r_readdir_offset;
 
 	struct ceph_cap_reservation r_caps_reservation;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 4415e97..860cc016 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -282,9 +282,9 @@
 	u32 i_time_warp_seq;
 
 	unsigned i_ceph_flags;
-	int i_ordered_count;
-	atomic_t i_release_count;
-	atomic_t i_complete_count;
+	atomic64_t i_release_count;
+	atomic64_t i_ordered_count;
+	atomic64_t i_complete_seq[2];
 
 	struct ceph_dir_layout i_dir_layout;
 	struct ceph_file_layout i_layout;
@@ -471,30 +471,36 @@
 
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
-					   int release_count, int ordered_count)
+					   long long release_count,
+					   long long ordered_count)
 {
-	atomic_set(&ci->i_complete_count, release_count);
-	if (ci->i_ordered_count == ordered_count)
-		ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
-	else
-		ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
+	smp_mb__before_atomic();
+	atomic64_set(&ci->i_complete_seq[0], release_count);
+	atomic64_set(&ci->i_complete_seq[1], ordered_count);
 }
 
 static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
 {
-	atomic_inc(&ci->i_release_count);
+	atomic64_inc(&ci->i_release_count);
+}
+
+static inline void __ceph_dir_clear_ordered(struct ceph_inode_info *ci)
+{
+	atomic64_inc(&ci->i_ordered_count);
 }
 
 static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
 {
-	return atomic_read(&ci->i_complete_count) ==
-		atomic_read(&ci->i_release_count);
+	return atomic64_read(&ci->i_complete_seq[0]) ==
+		atomic64_read(&ci->i_release_count);
 }
 
 static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
 {
-	return __ceph_dir_is_complete(ci) &&
-		(ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
+	return  atomic64_read(&ci->i_complete_seq[0]) ==
+		atomic64_read(&ci->i_release_count) &&
+		atomic64_read(&ci->i_complete_seq[1]) ==
+		atomic64_read(&ci->i_ordered_count);
 }
 
 static inline void ceph_dir_clear_complete(struct inode *inode)
@@ -504,20 +510,13 @@
 
 static inline void ceph_dir_clear_ordered(struct inode *inode)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	spin_lock(&ci->i_ceph_lock);
-	ci->i_ordered_count++;
-	ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
-	spin_unlock(&ci->i_ceph_lock);
+	__ceph_dir_clear_ordered(ceph_inode(inode));
 }
 
 static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	bool ret;
-	spin_lock(&ci->i_ceph_lock);
-	ret = __ceph_dir_is_complete_ordered(ci);
-	spin_unlock(&ci->i_ceph_lock);
+	bool ret = __ceph_dir_is_complete_ordered(ceph_inode(inode));
+	smp_rmb();
 	return ret;
 }
 
@@ -636,16 +635,20 @@
 	unsigned offset;       /* offset of last chunk, adjusted for . and .. */
 	unsigned next_offset;  /* offset of next chunk (last_name's + 1) */
 	char *last_name;       /* last entry in previous chunk */
-	struct dentry *dentry; /* next dentry (for dcache readdir) */
-	int dir_release_count;
-	int dir_ordered_count;
+	long long dir_release_count;
+	long long dir_ordered_count;
+	int readdir_cache_idx;
 
 	/* used for -o dirstat read() on directory thing */
 	char *dir_info;
 	int dir_info_len;
 };
 
-
+struct ceph_readdir_cache_control {
+	struct page  *page;
+	struct dentry **dentries;
+	int index;
+};
 
 /*
  * A "snap realm" describes a subset of the file hierarchy sharing
@@ -944,6 +947,7 @@
 extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
 extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
 extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
+extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
 
 /*
  * our d_ops vary depending on whether the inode is live,