ceph: make sure syncfs flushes all cap snaps

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 88010f9a..2bb9264 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1488,17 +1488,22 @@
 	return err;
 }
 
-static int check_cap_flush(struct inode *inode, u64 want_flush_seq)
+static int check_cap_flush(struct ceph_inode_info *ci,
+			   u64 want_flush_seq, u64 want_snap_seq)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	int ret;
+	int ret1 = 1, ret2 = 1;
 	spin_lock(&ci->i_ceph_lock);
-	if (ci->i_flushing_caps)
-		ret = ci->i_cap_flush_seq >= want_flush_seq;
-	else
-		ret = 1;
+	if (want_flush_seq > 0 && ci->i_flushing_caps)
+		ret1 = ci->i_cap_flush_seq >= want_flush_seq;
+
+	if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
+		struct ceph_cap_snap *capsnap =
+			list_first_entry(&ci->i_cap_snaps,
+					 struct ceph_cap_snap, ci_item);
+		ret2 = capsnap->follows >= want_snap_seq;
+	}
 	spin_unlock(&ci->i_ceph_lock);
-	return ret;
+	return ret1 && ret2;
 }
 
 /*
@@ -1506,44 +1511,71 @@
  *
  * returns true if we've flushed through want_flush_seq
  */
-static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
+static void wait_caps_flush(struct ceph_mds_client *mdsc,
+			    u64 want_flush_seq, u64 want_snap_seq)
 {
 	int mds;
 
 	dout("check_cap_flush want %lld\n", want_flush_seq);
 	mutex_lock(&mdsc->mutex);
-	for (mds = 0; mds < mdsc->max_sessions; mds++) {
+	for (mds = 0; mds < mdsc->max_sessions; ) {
 		struct ceph_mds_session *session = mdsc->sessions[mds];
-		struct inode *inode = NULL;
+		struct inode *inode1 = NULL, *inode2 = NULL;
 
-		if (!session)
+		if (!session) {
+			mds++;
 			continue;
+		}
 		get_session(session);
 		mutex_unlock(&mdsc->mutex);
 
 		mutex_lock(&session->s_mutex);
 		if (!list_empty(&session->s_cap_flushing)) {
 			struct ceph_inode_info *ci =
-				list_entry(session->s_cap_flushing.next,
-					   struct ceph_inode_info,
-					   i_flushing_item);
+				list_first_entry(&session->s_cap_flushing,
+						 struct ceph_inode_info,
+						 i_flushing_item);
 
-			if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) {
+			if (!check_cap_flush(ci, want_flush_seq, 0)) {
 				dout("check_cap_flush still flushing %p "
 				     "seq %lld <= %lld to mds%d\n",
 				     &ci->vfs_inode, ci->i_cap_flush_seq,
-				     want_flush_seq, session->s_mds);
-				inode = igrab(&ci->vfs_inode);
+				     want_flush_seq, mds);
+				inode1 = igrab(&ci->vfs_inode);
+			}
+		}
+		if (!list_empty(&session->s_cap_snaps_flushing)) {
+			struct ceph_cap_snap *capsnap =
+				list_first_entry(&session->s_cap_snaps_flushing,
+						 struct ceph_cap_snap,
+						 flushing_item);
+			struct ceph_inode_info *ci = capsnap->ci;
+			if (!check_cap_flush(ci, 0, want_snap_seq)) {
+				dout("check_cap_flush still flushing snap %p "
+				     "follows %lld <= %lld to mds%d\n",
+				     &ci->vfs_inode, capsnap->follows,
+				     want_snap_seq, mds);
+				inode2 = igrab(&ci->vfs_inode);
 			}
 		}
 		mutex_unlock(&session->s_mutex);
 		ceph_put_mds_session(session);
 
-		if (inode) {
+		if (inode1) {
 			wait_event(mdsc->cap_flushing_wq,
-				   check_cap_flush(inode, want_flush_seq));
-			iput(inode);
+				   check_cap_flush(ceph_inode(inode1),
+						   want_flush_seq, 0));
+			iput(inode1);
 		}
+		if (inode2) {
+			wait_event(mdsc->cap_flushing_wq,
+				   check_cap_flush(ceph_inode(inode2),
+						   0, want_snap_seq));
+			iput(inode2);
+		}
+
+		if (!inode1 && !inode2)
+			mds++;
 
 		mutex_lock(&mdsc->mutex);
 	}
@@ -3391,6 +3423,7 @@
 	atomic_set(&mdsc->num_sessions, 0);
 	mdsc->max_sessions = 0;
 	mdsc->stopping = 0;
+	mdsc->last_snap_seq = 0;
 	init_rwsem(&mdsc->snap_rwsem);
 	mdsc->snap_realms = RB_ROOT;
 	INIT_LIST_HEAD(&mdsc->snap_empty);
@@ -3517,7 +3550,7 @@
 
 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 {
-	u64 want_tid, want_flush;
+	u64 want_tid, want_flush, want_snap;
 
 	if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
 		return;
@@ -3532,10 +3565,15 @@
 	want_flush = mdsc->cap_flush_seq;
 	spin_unlock(&mdsc->cap_dirty_lock);
 
-	dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
+	down_read(&mdsc->snap_rwsem);
+	want_snap = mdsc->last_snap_seq;
+	up_read(&mdsc->snap_rwsem);
+
+	dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
+	     want_tid, want_flush, want_snap);
 
 	wait_unsafe_requests(mdsc, want_tid);
-	wait_caps_flush(mdsc, want_flush);
+	wait_caps_flush(mdsc, want_flush, want_snap);
 }
 
 /*