FS-Cache: Fix operation state management and accounting

Fix the state management of internal fscache operations and the accounting of
what operations are in what states.

This is done by:

 (1) Give struct fscache_operation a enum variable that directly represents the
     state it's currently in, rather than spreading this knowledge over a bunch
     of flags, who's processing the operation at the moment and whether it is
     queued or not.

     This makes it easier to write assertions to check the state at various
     points and to prevent invalid state transitions.

 (2) Add an 'operation complete' state and supply a function to indicate the
     completion of an operation (fscache_op_complete()) and make things call
     it.  The final call to fscache_put_operation() can then check that an op
     in the appropriate state (complete or cancelled).

 (3) Adjust the use of object->n_ops, ->n_in_progress, ->n_exclusive to better
     govern the state of an object:

	(a) The ->n_ops is now the number of extant operations on the object
	    and is now decremented by fscache_put_operation() only.

	(b) The ->n_in_progress is simply the number of objects that have been
	    taken off of the object's pending queue for the purposes of being
	    run.  This is decremented by fscache_op_complete() only.

	(c) The ->n_exclusive is the number of exclusive ops that have been
	    submitted and queued or are in progress.  It is decremented by
	    fscache_op_complete() and by fscache_cancel_op().

     fscache_put_operation() and fscache_operation_gc() now no longer try to
     clean up ->n_exclusive and ->n_in_progress.  That was leading to double
     decrements against fscache_cancel_op().

     fscache_cancel_op() now no longer decrements ->n_ops.  That was leading to
     double decrements against fscache_put_operation().

     fscache_submit_exclusive_op() now decides whether it has to queue an op
     based on ->n_in_progress being > 0 rather than ->n_ops > 0 as the latter
     will persist in being true even after all preceding operations have been
     cancelled or completed.  Furthermore, if an object is active and there are
     runnable ops against it, there must be at least one op running.

 (4) Add a remaining-pages counter (n_pages) to struct fscache_retrieval and
     provide a function to record completion of the pages as they complete.

     When n_pages reaches 0, the operation is deemed to be complete and
     fscache_op_complete() is called.

     Add calls to fscache_retrieval_complete() anywhere we've finished with a
     page we've been given to read or allocate for.  This includes places where
     we just return pages to the netfs for reading from the server and where
     accessing the cache fails and we discard the proposed netfs page.

The bugs in the unfixed state management manifest themselves as oopses like the
following where the operation completion gets out of sync with return of the
cookie by the netfs.  This is possible because the cache unlocks and returns
all the netfs pages before recording its completion - which means that there's
nothing to stop the netfs discarding them and returning the cookie.


FS-Cache: Cookie 'NFS.fh' still has outstanding reads
------------[ cut here ]------------
kernel BUG at fs/fscache/cookie.c:519!
invalid opcode: 0000 [#1] SMP
CPU 1
Modules linked in: cachefiles nfs fscache auth_rpcgss nfs_acl lockd sunrpc

Pid: 400, comm: kswapd0 Not tainted 3.1.0-rc7-fsdevel+ #1090                  /DG965RY
RIP: 0010:[<ffffffffa007050a>]  [<ffffffffa007050a>] __fscache_relinquish_cookie+0x170/0x343 [fscache]
RSP: 0018:ffff8800368cfb00  EFLAGS: 00010282
RAX: 000000000000003c RBX: ffff880023cc8790 RCX: 0000000000000000
RDX: 0000000000002f2e RSI: 0000000000000001 RDI: ffffffff813ab86c
RBP: ffff8800368cfb50 R08: 0000000000000002 R09: 0000000000000000
R10: ffff88003a1b7890 R11: ffff88001df6e488 R12: ffff880023d8ed98
R13: ffff880023cc8798 R14: 0000000000000004 R15: ffff88003b8bf370
FS:  0000000000000000(0000) GS:ffff88003bd00000(0000) knlGS:0000000000000000
CS:  0010 DS: 0000 ES: 0000 CR0: 000000008005003b
CR2: 00000000008ba008 CR3: 0000000023d93000 CR4: 00000000000006e0
DR0: 0000000000000000 DR1: 0000000000000000 DR2: 0000000000000000
DR3: 0000000000000000 DR6: 00000000ffff0ff0 DR7: 0000000000000400
Process kswapd0 (pid: 400, threadinfo ffff8800368ce000, task ffff88003b8bf040)
Stack:
 ffff88003b8bf040 ffff88001df6e528 ffff88001df6e528 ffffffffa00b46b0
 ffff88003b8bf040 ffff88001df6e488 ffff88001df6e620 ffffffffa00b46b0
 ffff88001ebd04c8 0000000000000004 ffff8800368cfb70 ffffffffa00b2c91
Call Trace:
 [<ffffffffa00b2c91>] nfs_fscache_release_inode_cookie+0x3b/0x47 [nfs]
 [<ffffffffa008f25f>] nfs_clear_inode+0x3c/0x41 [nfs]
 [<ffffffffa0090df1>] nfs4_evict_inode+0x2f/0x33 [nfs]
 [<ffffffff810d8d47>] evict+0xa1/0x15c
 [<ffffffff810d8e2e>] dispose_list+0x2c/0x38
 [<ffffffff810d9ebd>] prune_icache_sb+0x28c/0x29b
 [<ffffffff810c56b7>] prune_super+0xd5/0x140
 [<ffffffff8109b615>] shrink_slab+0x102/0x1ab
 [<ffffffff8109d690>] balance_pgdat+0x2f2/0x595
 [<ffffffff8103e009>] ? process_timeout+0xb/0xb
 [<ffffffff8109dba3>] kswapd+0x270/0x289
 [<ffffffff8104c5ea>] ? __init_waitqueue_head+0x46/0x46
 [<ffffffff8109d933>] ? balance_pgdat+0x595/0x595
 [<ffffffff8104bf7a>] kthread+0x7f/0x87
 [<ffffffff813ad6b4>] kernel_thread_helper+0x4/0x10
 [<ffffffff81026b98>] ? finish_task_switch+0x45/0xc0
 [<ffffffff813abcdd>] ? retint_restore_args+0xe/0xe
 [<ffffffff8104befb>] ? __init_kthread_worker+0x53/0x53
 [<ffffffff813ad6b0>] ? gs_change+0xb/0xb

Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/Documentation/filesystems/caching/backend-api.txt b/Documentation/filesystems/caching/backend-api.txt
index 382d52c..f4769b9 100644
--- a/Documentation/filesystems/caching/backend-api.txt
+++ b/Documentation/filesystems/caching/backend-api.txt
@@ -419,7 +419,10 @@
 
      If an I/O error occurs, fscache_io_error() should be called and -ENOBUFS
      returned if possible or fscache_end_io() called with a suitable error
-     code..
+     code.
+
+     fscache_put_retrieval() should be called after a page or pages are dealt
+     with.  This will complete the operation when all pages are dealt with.
 
 
  (*) Request pages be read from cache [mandatory]:
@@ -526,6 +529,27 @@
      error value should be 0 if successful and an error otherwise.
 
 
+ (*) Record that one or more pages being retrieved or allocated have been dealt
+     with:
+
+	void fscache_retrieval_complete(struct fscache_retrieval *op,
+					int n_pages);
+
+     This is called to record the fact that one or more pages have been dealt
+     with and are no longer the concern of this operation.  When the number of
+     pages remaining in the operation reaches 0, the operation will be
+     completed.
+
+
+ (*) Record operation completion:
+
+	void fscache_op_complete(struct fscache_operation *op);
+
+     This is called to record the completion of an operation.  This deducts
+     this operation from the parent object's run state, potentially permitting
+     one or more pending operations to start running.
+
+
  (*) Set highest store limit:
 
 	void fscache_set_store_limit(struct fscache_object *object,
diff --git a/Documentation/filesystems/caching/operations.txt b/Documentation/filesystems/caching/operations.txt
index b6b070c..bee2a5f 100644
--- a/Documentation/filesystems/caching/operations.txt
+++ b/Documentation/filesystems/caching/operations.txt
@@ -174,7 +174,7 @@
      necessary (the object might have died whilst the thread was waiting).
 
      When it has finished doing its processing, it should call
-     fscache_put_operation() on it.
+     fscache_op_complete() and fscache_put_operation() on it.
 
  (4) The operation holds an effective lock upon the object, preventing other
      exclusive ops conflicting until it is released.  The operation can be
diff --git a/fs/cachefiles/rdwr.c b/fs/cachefiles/rdwr.c
index bf123d9..93a0815 100644
--- a/fs/cachefiles/rdwr.c
+++ b/fs/cachefiles/rdwr.c
@@ -197,6 +197,7 @@
 
 		fscache_end_io(op, monitor->netfs_page, error);
 		page_cache_release(monitor->netfs_page);
+		fscache_retrieval_complete(op, 1);
 		fscache_put_retrieval(op);
 		kfree(monitor);
 
@@ -339,6 +340,7 @@
 
 	copy_highpage(netpage, backpage);
 	fscache_end_io(op, netpage, 0);
+	fscache_retrieval_complete(op, 1);
 
 success:
 	_debug("success");
@@ -360,6 +362,7 @@
 		goto out;
 io_error:
 	cachefiles_io_error_obj(object, "Page read error on backing file");
+	fscache_retrieval_complete(op, 1);
 	ret = -ENOBUFS;
 	goto out;
 
@@ -369,6 +372,7 @@
 	fscache_put_retrieval(monitor->op);
 	kfree(monitor);
 nomem:
+	fscache_retrieval_complete(op, 1);
 	_leave(" = -ENOMEM");
 	return -ENOMEM;
 }
@@ -407,7 +411,7 @@
 	_enter("{%p},{%lx},,,", object, page->index);
 
 	if (!object->backer)
-		return -ENOBUFS;
+		goto enobufs;
 
 	inode = object->backer->d_inode;
 	ASSERT(S_ISREG(inode->i_mode));
@@ -416,7 +420,7 @@
 
 	/* calculate the shift required to use bmap */
 	if (inode->i_sb->s_blocksize > PAGE_SIZE)
-		return -ENOBUFS;
+		goto enobufs;
 
 	shift = PAGE_SHIFT - inode->i_sb->s_blocksize_bits;
 
@@ -448,13 +452,19 @@
 	} else if (cachefiles_has_space(cache, 0, 1) == 0) {
 		/* there's space in the cache we can use */
 		fscache_mark_page_cached(op, page);
+		fscache_retrieval_complete(op, 1);
 		ret = -ENODATA;
 	} else {
-		ret = -ENOBUFS;
+		goto enobufs;
 	}
 
 	_leave(" = %d", ret);
 	return ret;
+
+enobufs:
+	fscache_retrieval_complete(op, 1);
+	_leave(" = -ENOBUFS");
+	return -ENOBUFS;
 }
 
 /*
@@ -632,6 +642,7 @@
 
 		/* the netpage is unlocked and marked up to date here */
 		fscache_end_io(op, netpage, 0);
+		fscache_retrieval_complete(op, 1);
 		page_cache_release(netpage);
 		netpage = NULL;
 		continue;
@@ -659,6 +670,7 @@
 	list_for_each_entry_safe(netpage, _n, list, lru) {
 		list_del(&netpage->lru);
 		page_cache_release(netpage);
+		fscache_retrieval_complete(op, 1);
 	}
 
 	_leave(" = %d", ret);
@@ -707,7 +719,7 @@
 	       *nr_pages);
 
 	if (!object->backer)
-		return -ENOBUFS;
+		goto all_enobufs;
 
 	space = 1;
 	if (cachefiles_has_space(cache, 0, *nr_pages) < 0)
@@ -720,7 +732,7 @@
 
 	/* calculate the shift required to use bmap */
 	if (inode->i_sb->s_blocksize > PAGE_SIZE)
-		return -ENOBUFS;
+		goto all_enobufs;
 
 	shift = PAGE_SHIFT - inode->i_sb->s_blocksize_bits;
 
@@ -760,7 +772,10 @@
 			nrbackpages++;
 		} else if (space && pagevec_add(&pagevec, page) == 0) {
 			fscache_mark_pages_cached(op, &pagevec);
+			fscache_retrieval_complete(op, 1);
 			ret = -ENODATA;
+		} else {
+			fscache_retrieval_complete(op, 1);
 		}
 	}
 
@@ -781,6 +796,10 @@
 	_leave(" = %d [nr=%u%s]",
 	       ret, *nr_pages, list_empty(pages) ? " empty" : "");
 	return ret;
+
+all_enobufs:
+	fscache_retrieval_complete(op, *nr_pages);
+	return -ENOBUFS;
 }
 
 /*
@@ -815,6 +834,7 @@
 	else
 		ret = -ENOBUFS;
 
+	fscache_retrieval_complete(op, 1);
 	_leave(" = %d", ret);
 	return ret;
 }
@@ -864,6 +884,7 @@
 		ret = -ENOBUFS;
 	}
 
+	fscache_retrieval_complete(op, *nr_pages);
 	_leave(" = %d", ret);
 	return ret;
 }
diff --git a/fs/fscache/object.c b/fs/fscache/object.c
index b6b897c..773bc79 100644
--- a/fs/fscache/object.c
+++ b/fs/fscache/object.c
@@ -587,8 +587,6 @@
 	if (object->n_in_progress == 0) {
 		if (object->n_ops > 0) {
 			ASSERTCMP(object->n_ops, >=, object->n_obj_ops);
-			ASSERTIF(object->n_ops > object->n_obj_ops,
-				 !list_empty(&object->pending_ops));
 			fscache_start_operations(object);
 		} else {
 			ASSERT(list_empty(&object->pending_ops));
diff --git a/fs/fscache/operation.c b/fs/fscache/operation.c
index c857ab8..748f955 100644
--- a/fs/fscache/operation.c
+++ b/fs/fscache/operation.c
@@ -37,6 +37,7 @@
 	ASSERT(op->processor != NULL);
 	ASSERTCMP(op->object->state, >=, FSCACHE_OBJECT_AVAILABLE);
 	ASSERTCMP(atomic_read(&op->usage), >, 0);
+	ASSERTCMP(op->state, ==, FSCACHE_OP_ST_IN_PROGRESS);
 
 	fscache_stat(&fscache_n_op_enqueue);
 	switch (op->flags & FSCACHE_OP_TYPE) {
@@ -64,6 +65,9 @@
 static void fscache_run_op(struct fscache_object *object,
 			   struct fscache_operation *op)
 {
+	ASSERTCMP(op->state, ==, FSCACHE_OP_ST_PENDING);
+
+	op->state = FSCACHE_OP_ST_IN_PROGRESS;
 	object->n_in_progress++;
 	if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
 		wake_up_bit(&op->flags, FSCACHE_OP_WAITING);
@@ -80,22 +84,23 @@
 int fscache_submit_exclusive_op(struct fscache_object *object,
 				struct fscache_operation *op)
 {
-	int ret;
-
 	_enter("{OBJ%x OP%x},", object->debug_id, op->debug_id);
 
+	ASSERTCMP(op->state, ==, FSCACHE_OP_ST_INITIALISED);
+	ASSERTCMP(atomic_read(&op->usage), >, 0);
+
 	spin_lock(&object->lock);
 	ASSERTCMP(object->n_ops, >=, object->n_in_progress);
 	ASSERTCMP(object->n_ops, >=, object->n_exclusive);
 	ASSERT(list_empty(&op->pend_link));
 
-	ret = -ENOBUFS;
+	op->state = FSCACHE_OP_ST_PENDING;
 	if (fscache_object_is_active(object)) {
 		op->object = object;
 		object->n_ops++;
 		object->n_exclusive++;	/* reads and writes must wait */
 
-		if (object->n_ops > 1) {
+		if (object->n_in_progress > 0) {
 			atomic_inc(&op->usage);
 			list_add_tail(&op->pend_link, &object->pending_ops);
 			fscache_stat(&fscache_n_op_pend);
@@ -111,7 +116,6 @@
 
 		/* need to issue a new write op after this */
 		clear_bit(FSCACHE_OBJECT_PENDING_WRITE, &object->flags);
-		ret = 0;
 	} else if (object->state == FSCACHE_OBJECT_CREATING) {
 		op->object = object;
 		object->n_ops++;
@@ -119,14 +123,13 @@
 		atomic_inc(&op->usage);
 		list_add_tail(&op->pend_link, &object->pending_ops);
 		fscache_stat(&fscache_n_op_pend);
-		ret = 0;
 	} else {
 		/* not allowed to submit ops in any other state */
 		BUG();
 	}
 
 	spin_unlock(&object->lock);
-	return ret;
+	return 0;
 }
 
 /*
@@ -186,6 +189,7 @@
 	_enter("{OBJ%x OP%x},{%u}",
 	       object->debug_id, op->debug_id, atomic_read(&op->usage));
 
+	ASSERTCMP(op->state, ==, FSCACHE_OP_ST_INITIALISED);
 	ASSERTCMP(atomic_read(&op->usage), >, 0);
 
 	spin_lock(&object->lock);
@@ -196,6 +200,7 @@
 	ostate = object->state;
 	smp_rmb();
 
+	op->state = FSCACHE_OP_ST_PENDING;
 	if (fscache_object_is_active(object)) {
 		op->object = object;
 		object->n_ops++;
@@ -225,12 +230,15 @@
 		   object->state == FSCACHE_OBJECT_LC_DYING ||
 		   object->state == FSCACHE_OBJECT_WITHDRAWING) {
 		fscache_stat(&fscache_n_op_rejected);
+		op->state = FSCACHE_OP_ST_CANCELLED;
 		ret = -ENOBUFS;
 	} else if (!test_bit(FSCACHE_IOERROR, &object->cache->flags)) {
 		fscache_report_unexpected_submission(object, op, ostate);
 		ASSERT(!fscache_object_is_active(object));
+		op->state = FSCACHE_OP_ST_CANCELLED;
 		ret = -ENOBUFS;
 	} else {
+		op->state = FSCACHE_OP_ST_CANCELLED;
 		ret = -ENOBUFS;
 	}
 
@@ -290,13 +298,18 @@
 
 	_enter("OBJ%x OP%x}", op->object->debug_id, op->debug_id);
 
+	ASSERTCMP(op->state, >=, FSCACHE_OP_ST_PENDING);
+	ASSERTCMP(op->state, !=, FSCACHE_OP_ST_CANCELLED);
+	ASSERTCMP(atomic_read(&op->usage), >, 0);
+
 	spin_lock(&object->lock);
 
 	ret = -EBUSY;
-	if (!list_empty(&op->pend_link)) {
+	if (op->state == FSCACHE_OP_ST_PENDING) {
+		ASSERT(!list_empty(&op->pend_link));
 		fscache_stat(&fscache_n_op_cancelled);
 		list_del_init(&op->pend_link);
-		object->n_ops--;
+		op->state = FSCACHE_OP_ST_CANCELLED;
 		if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
 			object->n_exclusive--;
 		if (test_and_clear_bit(FSCACHE_OP_WAITING, &op->flags))
@@ -311,6 +324,37 @@
 }
 
 /*
+ * Record the completion of an in-progress operation.
+ */
+void fscache_op_complete(struct fscache_operation *op)
+{
+	struct fscache_object *object = op->object;
+
+	_enter("OBJ%x", object->debug_id);
+
+	ASSERTCMP(op->state, ==, FSCACHE_OP_ST_IN_PROGRESS);
+	ASSERTCMP(object->n_in_progress, >, 0);
+	ASSERTIFCMP(test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags),
+		    object->n_exclusive, >, 0);
+	ASSERTIFCMP(test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags),
+		    object->n_in_progress, ==, 1);
+
+	spin_lock(&object->lock);
+
+	op->state = FSCACHE_OP_ST_COMPLETE;
+
+	if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags))
+		object->n_exclusive--;
+	object->n_in_progress--;
+	if (object->n_in_progress == 0)
+		fscache_start_operations(object);
+
+	spin_unlock(&object->lock);
+	_leave("");
+}
+EXPORT_SYMBOL(fscache_op_complete);
+
+/*
  * release an operation
  * - queues pending ops if this is the last in-progress op
  */
@@ -328,8 +372,9 @@
 		return;
 
 	_debug("PUT OP");
-	if (test_and_set_bit(FSCACHE_OP_DEAD, &op->flags))
-		BUG();
+	ASSERTIFCMP(op->state != FSCACHE_OP_ST_COMPLETE,
+		    op->state, ==, FSCACHE_OP_ST_CANCELLED);
+	op->state = FSCACHE_OP_ST_DEAD;
 
 	fscache_stat(&fscache_n_op_release);
 
@@ -365,16 +410,6 @@
 		return;
 	}
 
-	if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) {
-		ASSERTCMP(object->n_exclusive, >, 0);
-		object->n_exclusive--;
-	}
-
-	ASSERTCMP(object->n_in_progress, >, 0);
-	object->n_in_progress--;
-	if (object->n_in_progress == 0)
-		fscache_start_operations(object);
-
 	ASSERTCMP(object->n_ops, >, 0);
 	object->n_ops--;
 	if (object->n_ops == 0)
@@ -413,23 +448,14 @@
 		spin_unlock(&cache->op_gc_list_lock);
 
 		object = op->object;
+		spin_lock(&object->lock);
 
 		_debug("GC DEFERRED REL OBJ%x OP%x",
 		       object->debug_id, op->debug_id);
 		fscache_stat(&fscache_n_op_gc);
 
 		ASSERTCMP(atomic_read(&op->usage), ==, 0);
-
-		spin_lock(&object->lock);
-		if (test_bit(FSCACHE_OP_EXCLUSIVE, &op->flags)) {
-			ASSERTCMP(object->n_exclusive, >, 0);
-			object->n_exclusive--;
-		}
-
-		ASSERTCMP(object->n_in_progress, >, 0);
-		object->n_in_progress--;
-		if (object->n_in_progress == 0)
-			fscache_start_operations(object);
+		ASSERTCMP(op->state, ==, FSCACHE_OP_ST_DEAD);
 
 		ASSERTCMP(object->n_ops, >, 0);
 		object->n_ops--;
@@ -437,6 +463,7 @@
 			fscache_raise_event(object, FSCACHE_OBJECT_EV_CLEARED);
 
 		spin_unlock(&object->lock);
+		kfree(op);
 
 	} while (count++ < 20);
 
diff --git a/fs/fscache/page.c b/fs/fscache/page.c
index 248a12e..b38b13d 100644
--- a/fs/fscache/page.c
+++ b/fs/fscache/page.c
@@ -162,6 +162,7 @@
 			fscache_abort_object(object);
 	}
 
+	fscache_op_complete(op);
 	_leave("");
 }
 
@@ -223,6 +224,8 @@
 
 	_enter("{OP%x}", op->op.debug_id);
 
+	ASSERTCMP(op->n_pages, ==, 0);
+
 	fscache_hist(fscache_retrieval_histogram, op->start_time);
 	if (op->context)
 		fscache_put_context(op->op.object->cookie, op->context);
@@ -320,6 +323,11 @@
 	_debug("<<< GO");
 
 check_if_dead:
+	if (op->op.state == FSCACHE_OP_ST_CANCELLED) {
+		fscache_stat(stat_object_dead);
+		_leave(" = -ENOBUFS [cancelled]");
+		return -ENOBUFS;
+	}
 	if (unlikely(fscache_object_is_dead(object))) {
 		fscache_stat(stat_object_dead);
 		return -ENOBUFS;
@@ -364,6 +372,7 @@
 		_leave(" = -ENOMEM");
 		return -ENOMEM;
 	}
+	op->n_pages = 1;
 
 	spin_lock(&cookie->lock);
 
@@ -375,10 +384,10 @@
 	ASSERTCMP(object->state, >, FSCACHE_OBJECT_LOOKING_UP);
 
 	atomic_inc(&object->n_reads);
-	set_bit(FSCACHE_OP_DEC_READ_CNT, &op->op.flags);
+	__set_bit(FSCACHE_OP_DEC_READ_CNT, &op->op.flags);
 
 	if (fscache_submit_op(object, &op->op) < 0)
-		goto nobufs_unlock;
+		goto nobufs_unlock_dec;
 	spin_unlock(&cookie->lock);
 
 	fscache_stat(&fscache_n_retrieval_ops);
@@ -425,6 +434,8 @@
 	_leave(" = %d", ret);
 	return ret;
 
+nobufs_unlock_dec:
+	atomic_dec(&object->n_reads);
 nobufs_unlock:
 	spin_unlock(&cookie->lock);
 	kfree(op);
@@ -482,6 +493,7 @@
 	op = fscache_alloc_retrieval(mapping, end_io_func, context);
 	if (!op)
 		return -ENOMEM;
+	op->n_pages = *nr_pages;
 
 	spin_lock(&cookie->lock);
 
@@ -491,10 +503,10 @@
 			     struct fscache_object, cookie_link);
 
 	atomic_inc(&object->n_reads);
-	set_bit(FSCACHE_OP_DEC_READ_CNT, &op->op.flags);
+	__set_bit(FSCACHE_OP_DEC_READ_CNT, &op->op.flags);
 
 	if (fscache_submit_op(object, &op->op) < 0)
-		goto nobufs_unlock;
+		goto nobufs_unlock_dec;
 	spin_unlock(&cookie->lock);
 
 	fscache_stat(&fscache_n_retrieval_ops);
@@ -541,6 +553,8 @@
 	_leave(" = %d", ret);
 	return ret;
 
+nobufs_unlock_dec:
+	atomic_dec(&object->n_reads);
 nobufs_unlock:
 	spin_unlock(&cookie->lock);
 	kfree(op);
@@ -583,6 +597,7 @@
 	op = fscache_alloc_retrieval(page->mapping, NULL, NULL);
 	if (!op)
 		return -ENOMEM;
+	op->n_pages = 1;
 
 	spin_lock(&cookie->lock);
 
@@ -696,6 +711,7 @@
 	fscache_end_page_write(object, page);
 	if (ret < 0) {
 		fscache_abort_object(object);
+		fscache_op_complete(&op->op);
 	} else {
 		fscache_enqueue_operation(&op->op);
 	}
@@ -710,6 +726,7 @@
 	spin_unlock(&cookie->stores_lock);
 	clear_bit(FSCACHE_OBJECT_PENDING_WRITE, &object->flags);
 	spin_unlock(&object->lock);
+	fscache_op_complete(&op->op);
 	_leave("");
 }
 
diff --git a/include/linux/fscache-cache.h b/include/linux/fscache-cache.h
index e3d6d93..f5facd1 100644
--- a/include/linux/fscache-cache.h
+++ b/include/linux/fscache-cache.h
@@ -75,6 +75,16 @@
 typedef void (*fscache_operation_release_t)(struct fscache_operation *op);
 typedef void (*fscache_operation_processor_t)(struct fscache_operation *op);
 
+enum fscache_operation_state {
+	FSCACHE_OP_ST_BLANK,		/* Op is not yet submitted */
+	FSCACHE_OP_ST_INITIALISED,	/* Op is initialised */
+	FSCACHE_OP_ST_PENDING,		/* Op is blocked from running */
+	FSCACHE_OP_ST_IN_PROGRESS,	/* Op is in progress */
+	FSCACHE_OP_ST_COMPLETE,		/* Op is complete */
+	FSCACHE_OP_ST_CANCELLED,	/* Op has been cancelled */
+	FSCACHE_OP_ST_DEAD		/* Op is now dead */
+};
+
 struct fscache_operation {
 	struct work_struct	work;		/* record for async ops */
 	struct list_head	pend_link;	/* link in object->pending_ops */
@@ -86,10 +96,10 @@
 #define FSCACHE_OP_MYTHREAD	0x0002	/* - processing is done be issuing thread, not pool */
 #define FSCACHE_OP_WAITING	4	/* cleared when op is woken */
 #define FSCACHE_OP_EXCLUSIVE	5	/* exclusive op, other ops must wait */
-#define FSCACHE_OP_DEAD		6	/* op is now dead */
-#define FSCACHE_OP_DEC_READ_CNT	7	/* decrement object->n_reads on destruction */
-#define FSCACHE_OP_KEEP_FLAGS	0xc0	/* flags to keep when repurposing an op */
+#define FSCACHE_OP_DEC_READ_CNT	6	/* decrement object->n_reads on destruction */
+#define FSCACHE_OP_KEEP_FLAGS	0x0070	/* flags to keep when repurposing an op */
 
+	enum fscache_operation_state state;
 	atomic_t		usage;
 	unsigned		debug_id;	/* debugging ID */
 
@@ -106,6 +116,7 @@
 extern void fscache_op_work_func(struct work_struct *work);
 
 extern void fscache_enqueue_operation(struct fscache_operation *);
+extern void fscache_op_complete(struct fscache_operation *);
 extern void fscache_put_operation(struct fscache_operation *);
 
 /**
@@ -122,6 +133,7 @@
 {
 	INIT_WORK(&op->work, fscache_op_work_func);
 	atomic_set(&op->usage, 1);
+	op->state = FSCACHE_OP_ST_INITIALISED;
 	op->debug_id = atomic_inc_return(&fscache_op_debug_id);
 	op->processor = processor;
 	op->release = release;
@@ -138,6 +150,7 @@
 	void			*context;	/* netfs read context (pinned) */
 	struct list_head	to_do;		/* list of things to be done by the backend */
 	unsigned long		start_time;	/* time at which retrieval started */
+	unsigned		n_pages;	/* number of pages to be retrieved */
 };
 
 typedef int (*fscache_page_retrieval_func_t)(struct fscache_retrieval *op,
@@ -174,8 +187,22 @@
 }
 
 /**
+ * fscache_retrieval_complete - Record (partial) completion of a retrieval
+ * @op: The retrieval operation affected
+ * @n_pages: The number of pages to account for
+ */
+static inline void fscache_retrieval_complete(struct fscache_retrieval *op,
+					      int n_pages)
+{
+	op->n_pages -= n_pages;
+	if (op->n_pages <= 0)
+		fscache_op_complete(&op->op);
+}
+
+/**
  * fscache_put_retrieval - Drop a reference to a retrieval operation
  * @op: The retrieval operation affected
+ * @n_pages: The number of pages to account for
  *
  * Drop a reference to a retrieval operation.
  */
@@ -333,10 +360,10 @@
 
 	int			debug_id;	/* debugging ID */
 	int			n_children;	/* number of child objects */
-	int			n_ops;		/* number of ops outstanding on object */
+	int			n_ops;		/* number of extant ops on object */
 	int			n_obj_ops;	/* number of object ops outstanding on object */
 	int			n_in_progress;	/* number of ops in progress */
-	int			n_exclusive;	/* number of exclusive ops queued */
+	int			n_exclusive;	/* number of exclusive ops queued or in progress */
 	atomic_t		n_reads;	/* number of read ops in progress */
 	spinlock_t		lock;		/* state and operations lock */