ceph: track pending caps flushing accurately

Previously we do not trace accurate TID for flushing caps. when
MDS failovers, we have no choice but to re-send all flushing caps
with a new TID. This can cause problem because MDS can has already
flushed some caps and has issued the same caps to other client.
The re-sent cap flush has a new TID, which makes MDS unable to
detect if it has already processed the cap flush.

This patch adds code to track pending caps flushing accurately.
When re-sending cap flush is needed, we use its original flush
TID.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index dc98833..9a25f8d 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1097,7 +1097,8 @@
  * caller should hold snap_rwsem (read), s_mutex.
  */
 static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
-		      int op, int used, int want, int retain, int flushing)
+		      int op, int used, int want, int retain, int flushing,
+		      u64 flush_tid)
 	__releases(cap->ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = cap->ci;
@@ -1115,8 +1116,6 @@
 	u64 xattr_version = 0;
 	struct ceph_buffer *xattr_blob = NULL;
 	int delayed = 0;
-	u64 flush_tid = 0;
-	int i;
 	int ret;
 	bool inline_data;
 
@@ -1160,24 +1159,7 @@
 	cap->implemented &= cap->issued | used;
 	cap->mds_wanted = want;
 
-	if (flushing) {
-		/*
-		 * assign a tid for flush operations so we can avoid
-		 * flush1 -> dirty1 -> flush2 -> flushack1 -> mark
-		 * clean type races.  track latest tid for every bit
-		 * so we can handle flush AxFw, flush Fw, and have the
-		 * first ack clean Ax.
-		 */
-		flush_tid = ++ci->i_cap_flush_last_tid;
-		dout(" cap_flush_tid %d\n", (int)flush_tid);
-		for (i = 0; i < CEPH_CAP_BITS; i++)
-			if (flushing & (1 << i))
-				ci->i_cap_flush_tid[i] = flush_tid;
-
-		follows = ci->i_head_snapc->seq;
-	} else {
-		follows = 0;
-	}
+	follows = flushing ? ci->i_head_snapc->seq : 0;
 
 	keep = cap->implemented;
 	seq = cap->seq;
@@ -1311,7 +1293,10 @@
 			goto retry;
 		}
 
-		capsnap->flush_tid = ++ci->i_cap_flush_last_tid;
+		spin_lock(&mdsc->cap_dirty_lock);
+		capsnap->flush_tid = ++mdsc->last_cap_flush_tid;
+		spin_unlock(&mdsc->cap_dirty_lock);
+
 		atomic_inc(&capsnap->nref);
 		if (list_empty(&capsnap->flushing_item))
 			list_add_tail(&capsnap->flushing_item,
@@ -1407,6 +1392,29 @@
 	return dirty;
 }
 
+static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci,
+					struct ceph_cap_flush *cf)
+{
+	struct rb_node **p = &ci->i_cap_flush_tree.rb_node;
+	struct rb_node *parent = NULL;
+	struct ceph_cap_flush *other = NULL;
+
+	while (*p) {
+		parent = *p;
+		other = rb_entry(parent, struct ceph_cap_flush, i_node);
+
+		if (cf->tid < other->tid)
+			p = &(*p)->rb_left;
+		else if (cf->tid > other->tid)
+			p = &(*p)->rb_right;
+		else
+			BUG();
+	}
+
+	rb_link_node(&cf->i_node, parent, p);
+	rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree);
+}
+
 /*
  * Add dirty inode to the flushing list.  Assigned a seq number so we
  * can wait for caps to flush without starving.
@@ -1414,10 +1422,12 @@
  * Called under i_ceph_lock.
  */
 static int __mark_caps_flushing(struct inode *inode,
-				 struct ceph_mds_session *session)
+				struct ceph_mds_session *session,
+				u64 *flush_tid)
 {
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_cap_flush *cf;
 	int flushing;
 
 	BUG_ON(ci->i_dirty_caps == 0);
@@ -1432,9 +1442,14 @@
 	ci->i_dirty_caps = 0;
 	dout(" inode %p now !dirty\n", inode);
 
+	cf = kmalloc(sizeof(*cf), GFP_ATOMIC);
+	cf->caps = flushing;
+
 	spin_lock(&mdsc->cap_dirty_lock);
 	list_del_init(&ci->i_dirty_item);
 
+	cf->tid = ++mdsc->last_cap_flush_tid;
+
 	if (list_empty(&ci->i_flushing_item)) {
 		ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
 		list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
@@ -1448,6 +1463,9 @@
 	}
 	spin_unlock(&mdsc->cap_dirty_lock);
 
+	__add_cap_flushing_to_inode(ci, cf);
+
+	*flush_tid = cf->tid;
 	return flushing;
 }
 
@@ -1493,6 +1511,7 @@
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct inode *inode = &ci->vfs_inode;
 	struct ceph_cap *cap;
+	u64 flush_tid;
 	int file_wanted, used, cap_used;
 	int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
 	int issued, implemented, want, retain, revoking, flushing = 0;
@@ -1711,17 +1730,20 @@
 			took_snap_rwsem = 1;
 		}
 
-		if (cap == ci->i_auth_cap && ci->i_dirty_caps)
-			flushing = __mark_caps_flushing(inode, session);
-		else
+		if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
+			flushing = __mark_caps_flushing(inode, session,
+							&flush_tid);
+		} else {
 			flushing = 0;
+			flush_tid = 0;
+		}
 
 		mds = cap->mds;  /* remember mds, so we don't repeat */
 		sent++;
 
 		/* __send_cap drops i_ceph_lock */
 		delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
-				      want, retain, flushing);
+				      want, retain, flushing, flush_tid);
 		goto retry; /* retake i_ceph_lock and restart our cap scan. */
 	}
 
@@ -1750,12 +1772,13 @@
 /*
  * Try to flush dirty caps back to the auth mds.
  */
-static int try_flush_caps(struct inode *inode, u16 flush_tid[])
+static int try_flush_caps(struct inode *inode, u64 *ptid)
 {
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_session *session = NULL;
 	int flushing = 0;
+	u64 flush_tid = 0;
 
 retry:
 	spin_lock(&ci->i_ceph_lock);
@@ -1780,46 +1803,52 @@
 		if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
 			goto out;
 
-		flushing = __mark_caps_flushing(inode, session);
+		flushing = __mark_caps_flushing(inode, session, &flush_tid);
 
 		/* __send_cap drops i_ceph_lock */
 		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
-				     cap->issued | cap->implemented, flushing);
+				     (cap->issued | cap->implemented),
+				     flushing, flush_tid);
 
-		spin_lock(&ci->i_ceph_lock);
-		if (delayed)
+		if (delayed) {
+			spin_lock(&ci->i_ceph_lock);
 			__cap_delay_requeue(mdsc, ci);
+			spin_unlock(&ci->i_ceph_lock);
+		}
+	} else {
+		struct rb_node *n = rb_last(&ci->i_cap_flush_tree);
+		if (n) {
+			struct ceph_cap_flush *cf =
+				rb_entry(n, struct ceph_cap_flush, i_node);
+			flush_tid = cf->tid;
+		}
+		flushing = ci->i_flushing_caps;
+		spin_unlock(&ci->i_ceph_lock);
 	}
-
-	flushing = ci->i_flushing_caps;
-	if (flushing)
-		memcpy(flush_tid, ci->i_cap_flush_tid,
-		       sizeof(ci->i_cap_flush_tid));
 out:
-	spin_unlock(&ci->i_ceph_lock);
 	if (session)
 		mutex_unlock(&session->s_mutex);
+
+	*ptid = flush_tid;
 	return flushing;
 }
 
 /*
  * Return true if we've flushed caps through the given flush_tid.
  */
-static int caps_are_flushed(struct inode *inode, u16 flush_tid[])
+static int caps_are_flushed(struct inode *inode, u64 flush_tid)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int i, ret = 1;
+	struct ceph_cap_flush *cf;
+	struct rb_node *n;
+	int ret = 1;
 
 	spin_lock(&ci->i_ceph_lock);
-	for (i = 0; i < CEPH_CAP_BITS; i++) {
-		if (!(ci->i_flushing_caps & (1 << i)))
-			continue;
-		// tid only has 16 bits. we need to handle wrapping
-		if ((s16)(ci->i_cap_flush_tid[i] - flush_tid[i]) <= 0) {
-			/* still flushing this bit */
+	n = rb_first(&ci->i_cap_flush_tree);
+	if (n) {
+		cf = rb_entry(n, struct ceph_cap_flush, i_node);
+		if (cf->tid <= flush_tid)
 			ret = 0;
-			break;
-		}
 	}
 	spin_unlock(&ci->i_ceph_lock);
 	return ret;
@@ -1922,7 +1951,7 @@
 {
 	struct inode *inode = file->f_mapping->host;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	u16 flush_tid[CEPH_CAP_BITS];
+	u64 flush_tid;
 	int ret;
 	int dirty;
 
@@ -1938,7 +1967,7 @@
 
 	mutex_lock(&inode->i_mutex);
 
-	dirty = try_flush_caps(inode, flush_tid);
+	dirty = try_flush_caps(inode, &flush_tid);
 	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
 	ret = unsafe_dirop_wait(inode);
@@ -1967,14 +1996,14 @@
 int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	u16 flush_tid[CEPH_CAP_BITS];
+	u64 flush_tid;
 	int err = 0;
 	int dirty;
 	int wait = wbc->sync_mode == WB_SYNC_ALL;
 
 	dout("write_inode %p wait=%d\n", inode, wait);
 	if (wait) {
-		dirty = try_flush_caps(inode, flush_tid);
+		dirty = try_flush_caps(inode, &flush_tid);
 		if (dirty)
 			err = wait_event_interruptible(ci->i_cap_wq,
 				       caps_are_flushed(inode, flush_tid));
@@ -2022,6 +2051,51 @@
 	}
 }
 
+static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
+				struct ceph_mds_session *session,
+				struct ceph_inode_info *ci)
+{
+	struct inode *inode = &ci->vfs_inode;
+	struct ceph_cap *cap;
+	struct ceph_cap_flush *cf;
+	struct rb_node *n;
+	int delayed = 0;
+	u64 first_tid = 0;
+
+	while (true) {
+		spin_lock(&ci->i_ceph_lock);
+		cap = ci->i_auth_cap;
+		if (!(cap && cap->session == session)) {
+			pr_err("%p auth cap %p not mds%d ???\n", inode,
+					cap, session->s_mds);
+			spin_unlock(&ci->i_ceph_lock);
+			break;
+		}
+
+		for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
+			cf = rb_entry(n, struct ceph_cap_flush, i_node);
+			if (cf->tid >= first_tid)
+				break;
+		}
+		if (!n) {
+			spin_unlock(&ci->i_ceph_lock);
+			break;
+		}
+
+		cf = rb_entry(n, struct ceph_cap_flush, i_node);
+		first_tid = cf->tid + 1;
+
+		dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
+		     cap, cf->tid, ceph_cap_string(cf->caps));
+		delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+				      __ceph_caps_used(ci),
+				      __ceph_caps_wanted(ci),
+				      cap->issued | cap->implemented,
+				      cf->caps, cf->tid);
+	}
+	return delayed;
+}
+
 void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 			     struct ceph_mds_session *session)
 {
@@ -2031,28 +2105,10 @@
 
 	dout("kick_flushing_caps mds%d\n", session->s_mds);
 	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
-		struct inode *inode = &ci->vfs_inode;
-		struct ceph_cap *cap;
-		int delayed = 0;
-
-		spin_lock(&ci->i_ceph_lock);
-		cap = ci->i_auth_cap;
-		if (cap && cap->session == session) {
-			dout("kick_flushing_caps %p cap %p %s\n", inode,
-			     cap, ceph_cap_string(ci->i_flushing_caps));
-			delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-					     __ceph_caps_used(ci),
-					     __ceph_caps_wanted(ci),
-					     cap->issued | cap->implemented,
-					     ci->i_flushing_caps);
-			if (delayed) {
-				spin_lock(&ci->i_ceph_lock);
-				__cap_delay_requeue(mdsc, ci);
-				spin_unlock(&ci->i_ceph_lock);
-			}
-		} else {
-			pr_err("%p auth cap %p not mds%d ???\n", inode,
-			       cap, session->s_mds);
+		int delayed = __kick_flushing_caps(mdsc, session, ci);
+		if (delayed) {
+			spin_lock(&ci->i_ceph_lock);
+			__cap_delay_requeue(mdsc, ci);
 			spin_unlock(&ci->i_ceph_lock);
 		}
 	}
@@ -2064,7 +2120,6 @@
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_cap *cap;
-	int delayed = 0;
 
 	spin_lock(&ci->i_ceph_lock);
 	cap = ci->i_auth_cap;
@@ -2074,16 +2129,16 @@
 	__ceph_flush_snaps(ci, &session, 1);
 
 	if (ci->i_flushing_caps) {
+		int delayed;
+
 		spin_lock(&mdsc->cap_dirty_lock);
 		list_move_tail(&ci->i_flushing_item,
 			       &cap->session->s_cap_flushing);
 		spin_unlock(&mdsc->cap_dirty_lock);
 
-		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-				     __ceph_caps_used(ci),
-				     __ceph_caps_wanted(ci),
-				     cap->issued | cap->implemented,
-				     ci->i_flushing_caps);
+		spin_unlock(&ci->i_ceph_lock);
+
+		delayed = __kick_flushing_caps(mdsc, session, ci);
 		if (delayed) {
 			spin_lock(&ci->i_ceph_lock);
 			__cap_delay_requeue(mdsc, ci);
@@ -2836,16 +2891,29 @@
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+	struct ceph_cap_flush *cf;
+	struct rb_node *n;
+	LIST_HEAD(to_remove);
 	unsigned seq = le32_to_cpu(m->seq);
 	int dirty = le32_to_cpu(m->dirty);
 	int cleaned = 0;
 	int drop = 0;
-	int i;
 
-	for (i = 0; i < CEPH_CAP_BITS; i++)
-		if ((dirty & (1 << i)) &&
-		    (u16)flush_tid == ci->i_cap_flush_tid[i])
-			cleaned |= 1 << i;
+	n = rb_first(&ci->i_cap_flush_tree);
+	while (n) {
+		cf = rb_entry(n, struct ceph_cap_flush, i_node);
+		n = rb_next(&cf->i_node);
+		if (cf->tid == flush_tid)
+			cleaned = cf->caps;
+		if (cf->tid <= flush_tid) {
+			rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
+			list_add_tail(&cf->list, &to_remove);
+		} else {
+			cleaned &= ~cf->caps;
+			if (!cleaned)
+				break;
+		}
+	}
 
 	dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s,"
 	     " flushing %s -> %s\n",
@@ -2890,6 +2958,13 @@
 
 out:
 	spin_unlock(&ci->i_ceph_lock);
+
+	while (!list_empty(&to_remove)) {
+		cf = list_first_entry(&to_remove,
+				      struct ceph_cap_flush, list);
+		list_del(&cf->list);
+		kfree(cf);
+	}
 	if (drop)
 		iput(inode);
 }
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 1c991df..6d3f19d 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -417,8 +417,7 @@
 	INIT_LIST_HEAD(&ci->i_dirty_item);
 	INIT_LIST_HEAD(&ci->i_flushing_item);
 	ci->i_cap_flush_seq = 0;
-	ci->i_cap_flush_last_tid = 0;
-	memset(&ci->i_cap_flush_tid, 0, sizeof(ci->i_cap_flush_tid));
+	ci->i_cap_flush_tree = RB_ROOT;
 	init_waitqueue_head(&ci->i_cap_wq);
 	ci->i_hold_caps_min = 0;
 	ci->i_hold_caps_max = 0;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 8080d48..839901f 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1142,6 +1142,7 @@
 				  void *arg)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	LIST_HEAD(to_remove);
 	int drop = 0;
 
 	dout("removing cap %p, ci is %p, inode is %p\n",
@@ -1149,9 +1150,19 @@
 	spin_lock(&ci->i_ceph_lock);
 	__ceph_remove_cap(cap, false);
 	if (!ci->i_auth_cap) {
+		struct ceph_cap_flush *cf;
 		struct ceph_mds_client *mdsc =
 			ceph_sb_to_client(inode->i_sb)->mdsc;
 
+		while (true) {
+			struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
+			if (!n)
+				break;
+			cf = rb_entry(n, struct ceph_cap_flush, i_node);
+			rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
+			list_add(&cf->list, &to_remove);
+		}
+
 		spin_lock(&mdsc->cap_dirty_lock);
 		if (!list_empty(&ci->i_dirty_item)) {
 			pr_warn_ratelimited(
@@ -1173,8 +1184,16 @@
 			drop = 1;
 		}
 		spin_unlock(&mdsc->cap_dirty_lock);
+
 	}
 	spin_unlock(&ci->i_ceph_lock);
+	while (!list_empty(&to_remove)) {
+		struct ceph_cap_flush *cf;
+		cf = list_first_entry(&to_remove,
+				      struct ceph_cap_flush, list);
+		list_del(&cf->list);
+		kfree(cf);
+	}
 	while (drop--)
 		iput(inode);
 	return 0;
@@ -3408,6 +3427,7 @@
 	INIT_LIST_HEAD(&mdsc->snap_flush_list);
 	spin_lock_init(&mdsc->snap_flush_lock);
 	mdsc->cap_flush_seq = 0;
+	mdsc->last_cap_flush_tid = 1;
 	INIT_LIST_HEAD(&mdsc->cap_dirty);
 	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
 	mdsc->num_cap_flushing = 0;
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 509d682..19f6084 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -307,6 +307,7 @@
 	spinlock_t       snap_flush_lock;
 
 	u64               cap_flush_seq;
+	u64               last_cap_flush_tid;
 	struct list_head  cap_dirty;        /* inodes with dirty caps */
 	struct list_head  cap_dirty_migrating; /* ...that are migration... */
 	int               num_cap_flushing; /* # caps we are flushing */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index c496135..cc597f5 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -186,6 +186,15 @@
 	}
 }
 
+struct ceph_cap_flush {
+	u64 tid;
+	int caps;
+	union {
+		struct rb_node i_node;
+		struct list_head list;
+	};
+};
+
 /*
  * The frag tree describes how a directory is fragmented, potentially across
  * multiple metadata servers.  It is also used to indicate points where
@@ -299,7 +308,7 @@
 	/* we need to track cap writeback on a per-cap-bit basis, to allow
 	 * overlapping, pipelined cap flushes to the mds.  we can probably
 	 * reduce the tid to 8 bits if we're concerned about inode size. */
-	u16 i_cap_flush_last_tid, i_cap_flush_tid[CEPH_CAP_BITS];
+	struct rb_root i_cap_flush_tree;
 	wait_queue_head_t i_cap_wq;      /* threads waiting on a capability */
 	unsigned long i_hold_caps_min; /* jiffies */
 	unsigned long i_hold_caps_max; /* jiffies */