NFS: Ensure we commit after writeback is complete

If the page cache is being flushed, then we want to ensure that we
do start a commit once the pages are done being flushed.
If we just wait until all I/O is done to that file, we can end up
livelocking until the balance_dirty_pages() mechanism puts its
foot down and forces I/O to stop.
So instead we do more or less the same thing that O_DIRECT does,
and set up a counter to tell us when the flush is done,

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index db7ba54..051197c 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -40,6 +40,12 @@
 #define MIN_POOL_WRITE		(32)
 #define MIN_POOL_COMMIT		(4)
 
+struct nfs_io_completion {
+	void (*complete)(void *data);
+	void *data;
+	struct kref refcount;
+};
+
 /*
  * Local function declarations
  */
@@ -108,6 +114,39 @@ static void nfs_writehdr_free(struct nfs_pgio_header *hdr)
 	mempool_free(hdr, nfs_wdata_mempool);
 }
 
+static struct nfs_io_completion *nfs_io_completion_alloc(gfp_t gfp_flags)
+{
+	return kmalloc(sizeof(struct nfs_io_completion), gfp_flags);
+}
+
+static void nfs_io_completion_init(struct nfs_io_completion *ioc,
+		void (*complete)(void *), void *data)
+{
+	ioc->complete = complete;
+	ioc->data = data;
+	kref_init(&ioc->refcount);
+}
+
+static void nfs_io_completion_release(struct kref *kref)
+{
+	struct nfs_io_completion *ioc = container_of(kref,
+			struct nfs_io_completion, refcount);
+	ioc->complete(ioc->data);
+	kfree(ioc);
+}
+
+static void nfs_io_completion_get(struct nfs_io_completion *ioc)
+{
+	if (ioc != NULL)
+		kref_get(&ioc->refcount);
+}
+
+static void nfs_io_completion_put(struct nfs_io_completion *ioc)
+{
+	if (ioc != NULL)
+		kref_put(&ioc->refcount, nfs_io_completion_release);
+}
+
 static void nfs_context_set_write_error(struct nfs_open_context *ctx, int error)
 {
 	ctx->error = error;
@@ -681,18 +720,29 @@ static int nfs_writepages_callback(struct page *page, struct writeback_control *
 	return ret;
 }
 
+static void nfs_io_completion_commit(void *inode)
+{
+	nfs_commit_inode(inode, 0);
+}
+
 int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc)
 {
 	struct inode *inode = mapping->host;
 	struct nfs_pageio_descriptor pgio;
+	struct nfs_io_completion *ioc = nfs_io_completion_alloc(GFP_NOFS);
 	int err;
 
 	nfs_inc_stats(inode, NFSIOS_VFSWRITEPAGES);
 
+	if (ioc)
+		nfs_io_completion_init(ioc, nfs_io_completion_commit, inode);
+
 	nfs_pageio_init_write(&pgio, inode, wb_priority(wbc), false,
 				&nfs_async_write_completion_ops);
+	pgio.pg_io_completion = ioc;
 	err = write_cache_pages(mapping, wbc, nfs_writepages_callback, &pgio);
 	nfs_pageio_complete(&pgio);
+	nfs_io_completion_put(ioc);
 
 	if (err < 0)
 		goto out_err;
@@ -940,6 +990,11 @@ int nfs_write_need_commit(struct nfs_pgio_header *hdr)
 	return hdr->verf.committed != NFS_FILE_SYNC;
 }
 
+static void nfs_async_write_init(struct nfs_pgio_header *hdr)
+{
+	nfs_io_completion_get(hdr->io_completion);
+}
+
 static void nfs_write_completion(struct nfs_pgio_header *hdr)
 {
 	struct nfs_commit_info cinfo;
@@ -973,6 +1028,7 @@ static void nfs_write_completion(struct nfs_pgio_header *hdr)
 		nfs_release_request(req);
 	}
 out:
+	nfs_io_completion_put(hdr->io_completion);
 	hdr->release(hdr);
 }
 
@@ -1378,6 +1434,7 @@ static void nfs_async_write_reschedule_io(struct nfs_pgio_header *hdr)
 }
 
 static const struct nfs_pgio_completion_ops nfs_async_write_completion_ops = {
+	.init_hdr = nfs_async_write_init,
 	.error_cleanup = nfs_async_write_error,
 	.completion = nfs_write_completion,
 	.reschedule_io = nfs_async_write_reschedule_io,