ocfs2: rework ocfs2_buffered_write_cluster()

Use some ideas from the new-aops patch series and turn
ocfs2_buffered_write_cluster() into a 2 stage operation with the caller
copying data in between. The code now understands multiple cluster writes as
a result of having to deal with a full page write for greater than 4k pages.

This sets us up to easily call into the write path during ->page_mkwrite().

Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/aops.c b/fs/ocfs2/aops.c
index a480b09..3e5758e 100644
--- a/fs/ocfs2/aops.c
+++ b/fs/ocfs2/aops.c
@@ -684,6 +684,8 @@
 	     bh = bh->b_this_page, block_start += bsize) {
 		block_end = block_start + bsize;
 
+		clear_buffer_new(bh);
+
 		/*
 		 * Ignore blocks outside of our i/o range -
 		 * they may belong to unallocated clusters.
@@ -698,9 +700,8 @@
 		 * For an allocating write with cluster size >= page
 		 * size, we always write the entire page.
 		 */
-
-		if (buffer_new(bh))
-			clear_buffer_new(bh);
+		if (new)
+			set_buffer_new(bh);
 
 		if (!buffer_mapped(bh)) {
 			map_bh(bh, inode->i_sb, *p_blkno);
@@ -761,217 +762,232 @@
 	return ret;
 }
 
-/*
- * This will copy user data from the buffer page in the splice
- * context.
- *
- * For now, we ignore SPLICE_F_MOVE as that would require some extra
- * communication out all the way to ocfs2_write().
- */
-int ocfs2_map_and_write_splice_data(struct inode *inode,
-				  struct ocfs2_write_ctxt *wc, u64 *p_blkno,
-				  unsigned int *ret_from, unsigned int *ret_to)
-{
-	int ret;
-	unsigned int to, from, cluster_start, cluster_end;
-	char *src, *dst;
-	struct ocfs2_splice_write_priv *sp = wc->w_private;
-	struct pipe_buffer *buf = sp->s_buf;
-	unsigned long bytes, src_from;
-	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+#if (PAGE_CACHE_SIZE >= OCFS2_MAX_CLUSTERSIZE)
+#define OCFS2_MAX_CTXT_PAGES	1
+#else
+#define OCFS2_MAX_CTXT_PAGES	(OCFS2_MAX_CLUSTERSIZE / PAGE_CACHE_SIZE)
+#endif
 
-	ocfs2_figure_cluster_boundaries(osb, wc->w_cpos, &cluster_start,
-					&cluster_end);
-
-	from = sp->s_offset;
-	src_from = sp->s_buf_offset;
-	bytes = wc->w_count;
-
-	if (wc->w_large_pages) {
-		/*
-		 * For cluster size < page size, we have to
-		 * calculate pos within the cluster and obey
-		 * the rightmost boundary.
-		 */
-		bytes = min(bytes, (unsigned long)(osb->s_clustersize
-				   - (wc->w_pos & (osb->s_clustersize - 1))));
-	}
-	to = from + bytes;
-
-	BUG_ON(from > PAGE_CACHE_SIZE);
-	BUG_ON(to > PAGE_CACHE_SIZE);
-	BUG_ON(from < cluster_start);
-	BUG_ON(to > cluster_end);
-
-	if (wc->w_this_page_new)
-		ret = ocfs2_map_page_blocks(wc->w_this_page, p_blkno, inode,
-					    cluster_start, cluster_end, 1);
-	else
-		ret = ocfs2_map_page_blocks(wc->w_this_page, p_blkno, inode,
-					    from, to, 0);
-	if (ret) {
-		mlog_errno(ret);
-		goto out;
-	}
-
-	src = buf->ops->map(sp->s_pipe, buf, 1);
-	dst = kmap_atomic(wc->w_this_page, KM_USER1);
-	memcpy(dst + from, src + src_from, bytes);
-	kunmap_atomic(wc->w_this_page, KM_USER1);
-	buf->ops->unmap(sp->s_pipe, buf, src);
-
-	wc->w_finished_copy = 1;
-
-	*ret_from = from;
-	*ret_to = to;
-out:
-
-	return bytes ? (unsigned int)bytes : ret;
-}
+#define OCFS2_MAX_CLUSTERS_PER_PAGE	(PAGE_CACHE_SIZE / OCFS2_MIN_CLUSTERSIZE)
 
 /*
- * This will copy user data from the iovec in the buffered write
- * context.
+ * Describe the state of a single cluster to be written to.
  */
-int ocfs2_map_and_write_user_data(struct inode *inode,
-				  struct ocfs2_write_ctxt *wc, u64 *p_blkno,
-				  unsigned int *ret_from, unsigned int *ret_to)
-{
-	int ret;
-	unsigned int to, from, cluster_start, cluster_end;
-	unsigned long bytes, src_from;
-	char *dst;
-	struct ocfs2_buffered_write_priv *bp = wc->w_private;
-	const struct iovec *cur_iov = bp->b_cur_iov;
-	char __user *buf;
-	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-
-	ocfs2_figure_cluster_boundaries(osb, wc->w_cpos, &cluster_start,
-					&cluster_end);
-
-	buf = cur_iov->iov_base + bp->b_cur_off;
-	src_from = (unsigned long)buf & ~PAGE_CACHE_MASK;
-
-	from = wc->w_pos & (PAGE_CACHE_SIZE - 1);
-
+struct ocfs2_write_cluster_desc {
+	u32		c_cpos;
+	u32		c_phys;
 	/*
-	 * This is a lot of comparisons, but it reads quite
-	 * easily, which is important here.
+	 * Give this a unique field because c_phys eventually gets
+	 * filled.
 	 */
-	/* Stay within the src page */
-	bytes = PAGE_SIZE - src_from;
-	/* Stay within the vector */
-	bytes = min(bytes,
-		    (unsigned long)(cur_iov->iov_len - bp->b_cur_off));
-	/* Stay within count */
-	bytes = min(bytes, (unsigned long)wc->w_count);
-	/*
-	 * For clustersize > page size, just stay within
-	 * target page, otherwise we have to calculate pos
-	 * within the cluster and obey the rightmost
-	 * boundary.
-	 */
-	if (wc->w_large_pages) {
-		/*
-		 * For cluster size < page size, we have to
-		 * calculate pos within the cluster and obey
-		 * the rightmost boundary.
-		 */
-		bytes = min(bytes, (unsigned long)(osb->s_clustersize
-				   - (wc->w_pos & (osb->s_clustersize - 1))));
-	} else {
-		/*
-		 * cluster size > page size is the most common
-		 * case - we just stay within the target page
-		 * boundary.
-		 */
-		bytes = min(bytes, PAGE_CACHE_SIZE - from);
-	}
+	unsigned	c_new;
+};
 
-	to = from + bytes;
+struct ocfs2_write_ctxt {
+	/* Logical cluster position / len of write */
+	u32				w_cpos;
+	u32				w_clen;
 
-	BUG_ON(from > PAGE_CACHE_SIZE);
-	BUG_ON(to > PAGE_CACHE_SIZE);
-	BUG_ON(from < cluster_start);
-	BUG_ON(to > cluster_end);
-
-	if (wc->w_this_page_new)
-		ret = ocfs2_map_page_blocks(wc->w_this_page, p_blkno, inode,
-					    cluster_start, cluster_end, 1);
-	else
-		ret = ocfs2_map_page_blocks(wc->w_this_page, p_blkno, inode,
-					    from, to, 0);
-	if (ret) {
-		mlog_errno(ret);
-		goto out;
-	}
-
-	dst = kmap(wc->w_this_page);
-	memcpy(dst + from, bp->b_src_buf + src_from, bytes);
-	kunmap(wc->w_this_page);
+	struct ocfs2_write_cluster_desc	w_desc[OCFS2_MAX_CLUSTERS_PER_PAGE];
 
 	/*
-	 * XXX: This is slow, but simple. The caller of
-	 * ocfs2_buffered_write_cluster() is responsible for
-	 * passing through the iovecs, so it's difficult to
-	 * predict what our next step is in here after our
-	 * initial write. A future version should be pushing
-	 * that iovec manipulation further down.
+	 * This is true if page_size > cluster_size.
 	 *
-	 * By setting this, we indicate that a copy from user
-	 * data was done, and subsequent calls for this
-	 * cluster will skip copying more data.
+	 * It triggers a set of special cases during write which might
+	 * have to deal with allocating writes to partial pages.
 	 */
-	wc->w_finished_copy = 1;
+	unsigned int			w_large_pages;
 
-	*ret_from = from;
-	*ret_to = to;
-out:
+	/*
+	 * Pages involved in this write.
+	 *
+	 * w_target_page is the page being written to by the user.
+	 *
+	 * w_pages is an array of pages which always contains
+	 * w_target_page, and in the case of an allocating write with
+	 * page_size < cluster size, it will contain zero'd and mapped
+	 * pages adjacent to w_target_page which need to be written
+	 * out in so that future reads from that region will get
+	 * zero's.
+	 */
+	struct page			*w_pages[OCFS2_MAX_CTXT_PAGES];
+	unsigned int			w_num_pages;
+	struct page			*w_target_page;
 
-	return bytes ? (unsigned int)bytes : ret;
+	/*
+	 * ocfs2_write_end() uses this to know what the real range to
+	 * write in the target should be.
+	 */
+	unsigned int			w_target_from;
+	unsigned int			w_target_to;
+
+	/*
+	 * We could use journal_current_handle() but this is cleaner,
+	 * IMHO -Mark
+	 */
+	handle_t			*w_handle;
+
+	struct buffer_head		*w_di_bh;
+};
+
+static void ocfs2_free_write_ctxt(struct ocfs2_write_ctxt *wc)
+{
+	int i;
+
+	for(i = 0; i < wc->w_num_pages; i++) {
+		if (wc->w_pages[i] == NULL)
+			continue;
+
+		unlock_page(wc->w_pages[i]);
+		mark_page_accessed(wc->w_pages[i]);
+		page_cache_release(wc->w_pages[i]);
+	}
+
+	brelse(wc->w_di_bh);
+	kfree(wc);
+}
+
+static int ocfs2_alloc_write_ctxt(struct ocfs2_write_ctxt **wcp,
+				  struct ocfs2_super *osb, loff_t pos,
+				  unsigned len)
+{
+	struct ocfs2_write_ctxt *wc;
+
+	wc = kzalloc(sizeof(struct ocfs2_write_ctxt), GFP_NOFS);
+	if (!wc)
+		return -ENOMEM;
+
+	wc->w_cpos = pos >> osb->s_clustersize_bits;
+	wc->w_clen = ocfs2_clusters_for_bytes(osb->sb, len);
+
+	if (unlikely(PAGE_CACHE_SHIFT > osb->s_clustersize_bits))
+		wc->w_large_pages = 1;
+	else
+		wc->w_large_pages = 0;
+
+	*wcp = wc;
+
+	return 0;
 }
 
 /*
- * Map, fill and write a page to disk.
- *
- * The work of copying data is done via callback.  Newly allocated
- * pages which don't take user data will be zero'd (set 'new' to
- * indicate an allocating write)
- *
- * Returns a negative error code or the number of bytes copied into
- * the page.
+ * If a page has any new buffers, zero them out here, and mark them uptodate
+ * and dirty so they'll be written out (in order to prevent uninitialised
+ * block data from leaking). And clear the new bit.
  */
-static int ocfs2_write_data_page(struct inode *inode, handle_t *handle,
-				 u64 *p_blkno, struct page *page,
-				 struct ocfs2_write_ctxt *wc, int new)
+static void ocfs2_zero_new_buffers(struct page *page, unsigned from, unsigned to)
 {
-	int ret, copied = 0;
-	unsigned int from = 0, to = 0;
-	unsigned int cluster_start, cluster_end;
-	unsigned int zero_from = 0, zero_to = 0;
+	unsigned int block_start, block_end;
+	struct buffer_head *head, *bh;
 
-	ocfs2_figure_cluster_boundaries(OCFS2_SB(inode->i_sb), wc->w_cpos,
+	BUG_ON(!PageLocked(page));
+	if (!page_has_buffers(page))
+		return;
+
+	bh = head = page_buffers(page);
+	block_start = 0;
+	do {
+		block_end = block_start + bh->b_size;
+
+		if (buffer_new(bh)) {
+			if (block_end > from && block_start < to) {
+				if (!PageUptodate(page)) {
+					unsigned start, end;
+					void *kaddr;
+
+					start = max(from, block_start);
+					end = min(to, block_end);
+
+					kaddr = kmap_atomic(page, KM_USER0);
+					memset(kaddr+start, 0, end - start);
+					flush_dcache_page(page);
+					kunmap_atomic(kaddr, KM_USER0);
+					set_buffer_uptodate(bh);
+				}
+
+				clear_buffer_new(bh);
+				mark_buffer_dirty(bh);
+			}
+		}
+
+		block_start = block_end;
+		bh = bh->b_this_page;
+	} while (bh != head);
+}
+
+/*
+ * Only called when we have a failure during allocating write to write
+ * zero's to the newly allocated region.
+ */
+static void ocfs2_write_failure(struct inode *inode,
+				struct ocfs2_write_ctxt *wc,
+				loff_t user_pos, unsigned user_len)
+{
+	int i;
+	unsigned from, to;
+	struct page *tmppage;
+
+	ocfs2_zero_new_buffers(wc->w_target_page, user_pos, user_len);
+
+	if (wc->w_large_pages) {
+		from = wc->w_target_from;
+		to = wc->w_target_to;
+	} else {
+		from = 0;
+		to = PAGE_CACHE_SIZE;
+	}
+
+	for(i = 0; i < wc->w_num_pages; i++) {
+		tmppage = wc->w_pages[i];
+
+		if (ocfs2_should_order_data(inode))
+			walk_page_buffers(wc->w_handle, page_buffers(tmppage),
+					  from, to, NULL,
+					  ocfs2_journal_dirty_data);
+
+		block_commit_write(tmppage, from, to);
+	}
+}
+
+static int ocfs2_prepare_page_for_write(struct inode *inode, u64 *p_blkno,
+					struct ocfs2_write_ctxt *wc,
+					struct page *page, u32 cpos,
+					loff_t user_pos, unsigned user_len,
+					int new)
+{
+	int ret;
+	unsigned int map_from = 0, map_to = 0;
+	unsigned int cluster_start, cluster_end;
+	unsigned int user_data_from = 0, user_data_to = 0;
+
+	ocfs2_figure_cluster_boundaries(OCFS2_SB(inode->i_sb), cpos,
 					&cluster_start, &cluster_end);
 
-	if ((wc->w_pos >> PAGE_CACHE_SHIFT) == page->index
-	    && !wc->w_finished_copy) {
+	if (page == wc->w_target_page) {
+		map_from = user_pos & (PAGE_CACHE_SIZE - 1);
+		map_to = map_from + user_len;
 
-		wc->w_this_page = page;
-		wc->w_this_page_new = new;
-		ret = wc->w_write_data_page(inode, wc, p_blkno, &from, &to);
-		if (ret < 0) {
+		if (new)
+			ret = ocfs2_map_page_blocks(page, p_blkno, inode,
+						    cluster_start, cluster_end,
+						    new);
+		else
+			ret = ocfs2_map_page_blocks(page, p_blkno, inode,
+						    map_from, map_to, new);
+		if (ret) {
 			mlog_errno(ret);
 			goto out;
 		}
 
-		copied = ret;
-
-		zero_from = from;
-		zero_to = to;
+		user_data_from = map_from;
+		user_data_to = map_to;
 		if (new) {
-			from = cluster_start;
-			to = cluster_end;
+			map_from = cluster_start;
+			map_to = cluster_end;
 		}
+
+		wc->w_target_from = map_from;
+		wc->w_target_to = map_to;
 	} else {
 		/*
 		 * If we haven't allocated the new page yet, we
@@ -980,11 +996,11 @@
 		 */
 		BUG_ON(!new);
 
-		from = cluster_start;
-		to = cluster_end;
+		map_from = cluster_start;
+		map_to = cluster_end;
 
 		ret = ocfs2_map_page_blocks(page, p_blkno, inode,
-					    cluster_start, cluster_end, 1);
+					    cluster_start, cluster_end, new);
 		if (ret) {
 			mlog_errno(ret);
 			goto out;
@@ -1003,108 +1019,84 @@
 	 */
 	if (new && !PageUptodate(page))
 		ocfs2_clear_page_regions(page, OCFS2_SB(inode->i_sb),
-					 wc->w_cpos, zero_from, zero_to);
+					 cpos, user_data_from, user_data_to);
 
 	flush_dcache_page(page);
 
-	if (ocfs2_should_order_data(inode)) {
-		ret = walk_page_buffers(handle,
-					page_buffers(page),
-					from, to, NULL,
-					ocfs2_journal_dirty_data);
-		if (ret < 0)
-			mlog_errno(ret);
-	}
-
-	/*
-	 * We don't use generic_commit_write() because we need to
-	 * handle our own i_size update.
-	 */
-	ret = block_commit_write(page, from, to);
-	if (ret)
-		mlog_errno(ret);
 out:
-
-	return copied ? copied : ret;
+	return ret;
 }
 
 /*
- * Do the actual write of some data into an inode. Optionally allocate
- * in order to fulfill the write.
- *
- * cpos is the logical cluster offset within the file to write at
- *
- * 'phys' is the physical mapping of that offset. a 'phys' value of
- * zero indicates that allocation is required. In this case, data_ac
- * and meta_ac should be valid (meta_ac can be null if metadata
- * allocation isn't required).
+ * This function will only grab one clusters worth of pages.
  */
-static ssize_t ocfs2_write(struct file *file, u32 phys, handle_t *handle,
-			   struct buffer_head *di_bh,
-			   struct ocfs2_alloc_context *data_ac,
-			   struct ocfs2_alloc_context *meta_ac,
-			   struct ocfs2_write_ctxt *wc)
+static int ocfs2_grab_pages_for_write(struct address_space *mapping,
+				      struct ocfs2_write_ctxt *wc,
+				      u32 cpos, loff_t user_pos, int new)
 {
-	int ret, i, numpages = 1, new;
-	unsigned int copied = 0;
-	u32 tmp_pos;
-	u64 v_blkno, p_blkno;
-	struct address_space *mapping = file->f_mapping;
+	int ret = 0, i;
+	unsigned long start, target_index, index;
 	struct inode *inode = mapping->host;
-	unsigned long index, start;
-	struct page **cpages;
 
-	new = phys == 0 ? 1 : 0;
+	target_index = user_pos >> PAGE_CACHE_SHIFT;
 
 	/*
 	 * Figure out how many pages we'll be manipulating here. For
 	 * non allocating write, we just change the one
 	 * page. Otherwise, we'll need a whole clusters worth.
 	 */
-	if (new)
-		numpages = ocfs2_pages_per_cluster(inode->i_sb);
-
-	cpages = kzalloc(sizeof(*cpages) * numpages, GFP_NOFS);
-	if (!cpages) {
-		ret = -ENOMEM;
-		mlog_errno(ret);
-		return ret;
-	}
-
-	/*
-	 * Fill our page array first. That way we've grabbed enough so
-	 * that we can zero and flush if we error after adding the
-	 * extent.
-	 */
 	if (new) {
-		start = ocfs2_align_clusters_to_page_index(inode->i_sb,
-							   wc->w_cpos);
-		v_blkno = ocfs2_clusters_to_blocks(inode->i_sb, wc->w_cpos);
+		wc->w_num_pages = ocfs2_pages_per_cluster(inode->i_sb);
+		start = ocfs2_align_clusters_to_page_index(inode->i_sb, cpos);
 	} else {
-		start = wc->w_pos >> PAGE_CACHE_SHIFT;
-		v_blkno = wc->w_pos >> inode->i_sb->s_blocksize_bits;
+		wc->w_num_pages = 1;
+		start = target_index;
 	}
 
-	for(i = 0; i < numpages; i++) {
+	for(i = 0; i < wc->w_num_pages; i++) {
 		index = start + i;
 
-		cpages[i] = find_or_create_page(mapping, index, GFP_NOFS);
-		if (!cpages[i]) {
+		wc->w_pages[i] = find_or_create_page(mapping, index, GFP_NOFS);
+		if (!wc->w_pages[i]) {
 			ret = -ENOMEM;
 			mlog_errno(ret);
 			goto out;
 		}
+
+		if (index == target_index)
+			wc->w_target_page = wc->w_pages[i];
 	}
+out:
+	return ret;
+}
+
+/*
+ * Prepare a single cluster for write one cluster into the file.
+ */
+static int ocfs2_write_cluster(struct address_space *mapping,
+			       u32 phys, struct ocfs2_alloc_context *data_ac,
+			       struct ocfs2_alloc_context *meta_ac,
+			       struct ocfs2_write_ctxt *wc, u32 cpos,
+			       loff_t user_pos, unsigned user_len)
+{
+	int ret, i, new;
+	u64 v_blkno, p_blkno;
+	struct inode *inode = mapping->host;
+
+	new = phys == 0 ? 1 : 0;
 
 	if (new) {
+		u32 tmp_pos;
+
 		/*
 		 * This is safe to call with the page locks - it won't take
 		 * any additional semaphores or cluster locks.
 		 */
-		tmp_pos = wc->w_cpos;
+		tmp_pos = cpos;
 		ret = ocfs2_do_extend_allocation(OCFS2_SB(inode->i_sb), inode,
-						 &tmp_pos, 1, di_bh, handle,
-						 data_ac, meta_ac, NULL);
+						 &tmp_pos, 1, wc->w_di_bh,
+						 wc->w_handle, data_ac,
+						 meta_ac, NULL);
 		/*
 		 * This shouldn't happen because we must have already
 		 * calculated the correct meta data allocation required. The
@@ -1121,103 +1113,132 @@
 			mlog_errno(ret);
 			goto out;
 		}
+
+		v_blkno = ocfs2_clusters_to_blocks(inode->i_sb, cpos);
+	} else {
+		v_blkno = user_pos >> inode->i_sb->s_blocksize_bits;
 	}
 
+	/*
+	 * The only reason this should fail is due to an inability to
+	 * find the extent added.
+	 */
 	ret = ocfs2_extent_map_get_blocks(inode, v_blkno, &p_blkno, NULL,
 					  NULL);
 	if (ret < 0) {
-
-		/*
-		 * XXX: Should we go readonly here?
-		 */
-
-		mlog_errno(ret);
+		ocfs2_error(inode->i_sb, "Corrupting extend for inode %llu, "
+			    "at logical block %llu",
+			    (unsigned long long)OCFS2_I(inode)->ip_blkno,
+			    (unsigned long long)v_blkno);
 		goto out;
 	}
 
 	BUG_ON(p_blkno == 0);
 
-	for(i = 0; i < numpages; i++) {
-		ret = ocfs2_write_data_page(inode, handle, &p_blkno, cpages[i],
-					    wc, new);
-		if (ret < 0) {
-			mlog_errno(ret);
-			goto out;
-		}
+	for(i = 0; i < wc->w_num_pages; i++) {
+		int tmpret;
 
-		copied += ret;
+		tmpret = ocfs2_prepare_page_for_write(inode, &p_blkno, wc,
+						      wc->w_pages[i], cpos,
+						      user_pos, user_len, new);
+		if (tmpret) {
+			mlog_errno(tmpret);
+			if (ret == 0)
+				tmpret = ret;
+		}
 	}
 
+	/*
+	 * We only have cleanup to do in case of allocating write.
+	 */
+	if (ret && new)
+		ocfs2_write_failure(inode, wc, user_pos, user_len);
+
 out:
-	for(i = 0; i < numpages; i++) {
-		unlock_page(cpages[i]);
-		mark_page_accessed(cpages[i]);
-		page_cache_release(cpages[i]);
-	}
-	kfree(cpages);
 
-	return copied ? copied : ret;
-}
-
-static void ocfs2_write_ctxt_init(struct ocfs2_write_ctxt *wc,
-				  struct ocfs2_super *osb, loff_t pos,
-				  size_t count, ocfs2_page_writer *cb,
-				  void *cb_priv)
-{
-	wc->w_count = count;
-	wc->w_pos = pos;
-	wc->w_cpos = wc->w_pos >> osb->s_clustersize_bits;
-	wc->w_finished_copy = 0;
-
-	if (unlikely(PAGE_CACHE_SHIFT > osb->s_clustersize_bits))
-		wc->w_large_pages = 1;
-	else
-		wc->w_large_pages = 0;
-
-	wc->w_write_data_page = cb;
-	wc->w_private = cb_priv;
+	return ret;
 }
 
 /*
- * Write a cluster to an inode. The cluster may not be allocated yet,
- * in which case it will be. This only exists for buffered writes -
- * O_DIRECT takes a more "traditional" path through the kernel.
- *
- * The caller is responsible for incrementing pos, written counts, etc
- *
- * For file systems that don't support sparse files, pre-allocation
- * and page zeroing up until cpos should be done prior to this
- * function call.
- *
- * Callers should be holding i_sem, and the rw cluster lock.
- *
- * Returns the number of user bytes written, or less than zero for
- * error.
+ * ocfs2_write_end() wants to know which parts of the target page it
+ * should complete the write on. It's easiest to compute them ahead of
+ * time when a more complete view of the write is available.
  */
-ssize_t ocfs2_buffered_write_cluster(struct file *file, loff_t pos,
-				     size_t count, ocfs2_page_writer *actor,
-				     void *priv)
+static void ocfs2_set_target_boundaries(struct ocfs2_super *osb,
+					struct ocfs2_write_ctxt *wc,
+					loff_t pos, unsigned len, int alloc)
 {
-	int ret, credits = OCFS2_INODE_UPDATE_CREDITS;
-	ssize_t written = 0;
-	u32 phys;
-	struct inode *inode = file->f_mapping->host;
+	struct ocfs2_write_cluster_desc *desc;
+
+	wc->w_target_from = pos & (PAGE_CACHE_SIZE - 1);
+	wc->w_target_to = wc->w_target_from + len;
+
+	if (alloc == 0)
+		return;
+
+	/*
+	 * Allocating write - we may have different boundaries based
+	 * on page size and cluster size.
+	 *
+	 * NOTE: We can no longer compute one value from the other as
+	 * the actual write length and user provided length may be
+	 * different.
+	 */
+
+	if (wc->w_large_pages) {
+		/*
+		 * We only care about the 1st and last cluster within
+		 * our range and whether they are holes or not. Either
+		 * value may be extended out to the start/end of a
+		 * newly allocated cluster.
+		 */
+		desc = &wc->w_desc[0];
+		if (desc->c_new)
+			ocfs2_figure_cluster_boundaries(osb,
+							desc->c_cpos,
+							&wc->w_target_from,
+							NULL);
+
+		desc = &wc->w_desc[wc->w_clen - 1];
+		if (desc->c_new)
+			ocfs2_figure_cluster_boundaries(osb,
+							desc->c_cpos,
+							NULL,
+							&wc->w_target_to);
+	} else {
+		wc->w_target_from = 0;
+		wc->w_target_to = PAGE_CACHE_SIZE;
+	}
+}
+
+int ocfs2_write_begin(struct file *file, struct address_space *mapping,
+		      loff_t pos, unsigned len, unsigned flags,
+		      struct page **pagep, void **fsdata)
+{
+	int ret, i, credits = OCFS2_INODE_UPDATE_CREDITS;
+	unsigned int num_clusters = 0, clusters_to_alloc = 0;
+	u32 phys = 0;
+	struct ocfs2_write_ctxt *wc;
+	struct inode *inode = mapping->host;
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-	struct buffer_head *di_bh = NULL;
 	struct ocfs2_dinode *di;
 	struct ocfs2_alloc_context *data_ac = NULL;
 	struct ocfs2_alloc_context *meta_ac = NULL;
 	handle_t *handle;
-	struct ocfs2_write_ctxt wc;
+	struct ocfs2_write_cluster_desc *desc;
 
-	ocfs2_write_ctxt_init(&wc, osb, pos, count, actor, priv);
+	ret = ocfs2_alloc_write_ctxt(&wc, osb, pos, len);
+	if (ret) {
+		mlog_errno(ret);
+		return ret;
+	}
 
-	ret = ocfs2_meta_lock(inode, &di_bh, 1);
+	ret = ocfs2_meta_lock(inode, &wc->w_di_bh, 1);
 	if (ret) {
 		mlog_errno(ret);
 		goto out;
 	}
-	di = (struct ocfs2_dinode *)di_bh->b_data;
+	di = (struct ocfs2_dinode *)wc->w_di_bh->b_data;
 
 	/*
 	 * Take alloc sem here to prevent concurrent lookups. That way
@@ -1228,23 +1249,60 @@
 	 */
 	down_write(&OCFS2_I(inode)->ip_alloc_sem);
 
-	ret = ocfs2_get_clusters(inode, wc.w_cpos, &phys, NULL, NULL);
-	if (ret) {
-		mlog_errno(ret);
-		goto out_meta;
+	for (i = 0; i < wc->w_clen; i++) {
+		desc = &wc->w_desc[i];
+		desc->c_cpos = wc->w_cpos + i;
+
+		if (num_clusters == 0) {
+			ret = ocfs2_get_clusters(inode, desc->c_cpos, &phys,
+						 &num_clusters, NULL);
+			if (ret) {
+				mlog_errno(ret);
+				goto out_meta;
+			}
+		} else if (phys) {
+			/*
+			 * Only increment phys if it doesn't describe
+			 * a hole.
+			 */
+			phys++;
+		}
+
+		desc->c_phys = phys;
+		if (phys == 0) {
+			desc->c_new = 1;
+			clusters_to_alloc++;
+		}
+
+		num_clusters--;
 	}
 
-	/* phys == 0 means that allocation is required. */
-	if (phys == 0) {
-		ret = ocfs2_lock_allocators(inode, di, 1, &data_ac, &meta_ac);
+	/*
+	 * We set w_target_from, w_target_to here so that
+	 * ocfs2_write_end() knows which range in the target page to
+	 * write out. An allocation requires that we write the entire
+	 * cluster range.
+	 */
+	if (clusters_to_alloc > 0) {
+		/*
+		 * XXX: We are stretching the limits of
+		 * ocfs2_lock_allocators(). It greately over-estimates
+		 * the work to be done.
+		 */
+		ret = ocfs2_lock_allocators(inode, di, clusters_to_alloc,
+					    &data_ac, &meta_ac);
 		if (ret) {
 			mlog_errno(ret);
 			goto out_meta;
 		}
 
-		credits = ocfs2_calc_extend_credits(inode->i_sb, di, 1);
+		credits = ocfs2_calc_extend_credits(inode->i_sb, di,
+						    clusters_to_alloc);
+
 	}
 
+	ocfs2_set_target_boundaries(osb, wc, pos, len, clusters_to_alloc);
+
 	ret = ocfs2_data_lock(inode, 1);
 	if (ret) {
 		mlog_errno(ret);
@@ -1258,36 +1316,50 @@
 		goto out_data;
 	}
 
-	written = ocfs2_write(file, phys, handle, di_bh, data_ac,
-			      meta_ac, &wc);
-	if (written < 0) {
-		ret = written;
-		mlog_errno(ret);
-		goto out_commit;
-	}
+	wc->w_handle = handle;
 
-	ret = ocfs2_journal_access(handle, inode, di_bh,
+	/*
+	 * We don't want this to fail in ocfs2_write_end(), so do it
+	 * here.
+	 */
+	ret = ocfs2_journal_access(handle, inode, wc->w_di_bh,
 				   OCFS2_JOURNAL_ACCESS_WRITE);
 	if (ret) {
 		mlog_errno(ret);
 		goto out_commit;
 	}
 
-	pos += written;
-	if (pos > inode->i_size) {
-		i_size_write(inode, pos);
-		mark_inode_dirty(inode);
-	}
-	inode->i_blocks = ocfs2_inode_sector_count(inode);
-	di->i_size = cpu_to_le64((u64)i_size_read(inode));
-	inode->i_mtime = inode->i_ctime = CURRENT_TIME;
-	di->i_mtime = di->i_ctime = cpu_to_le64(inode->i_mtime.tv_sec);
-	di->i_mtime_nsec = di->i_ctime_nsec = cpu_to_le32(inode->i_mtime.tv_nsec);
-
-	ret = ocfs2_journal_dirty(handle, di_bh);
-	if (ret)
+	/*
+	 * Fill our page array first. That way we've grabbed enough so
+	 * that we can zero and flush if we error after adding the
+	 * extent.
+	 */
+	ret = ocfs2_grab_pages_for_write(mapping, wc, wc->w_cpos, pos,
+					 clusters_to_alloc);
+	if (ret) {
 		mlog_errno(ret);
+		goto out_commit;
+	}
 
+	for (i = 0; i < wc->w_clen; i++) {
+		desc = &wc->w_desc[i];
+
+		ret = ocfs2_write_cluster(mapping, desc->c_phys, data_ac,
+					  meta_ac, wc, desc->c_cpos, pos, len);
+		if (ret) {
+			mlog_errno(ret);
+			goto out_commit;
+		}
+	}
+
+	if (data_ac)
+		ocfs2_free_alloc_context(data_ac);
+	if (meta_ac)
+		ocfs2_free_alloc_context(meta_ac);
+
+	*pagep = wc->w_target_page;
+	*fsdata = wc;
+	return 0;
 out_commit:
 	ocfs2_commit_trans(osb, handle);
 
@@ -1299,13 +1371,85 @@
 	ocfs2_meta_unlock(inode, 1);
 
 out:
-	brelse(di_bh);
+	ocfs2_free_write_ctxt(wc);
+
 	if (data_ac)
 		ocfs2_free_alloc_context(data_ac);
 	if (meta_ac)
 		ocfs2_free_alloc_context(meta_ac);
+	return ret;
+}
 
-	return written ? written : ret;
+int ocfs2_write_end(struct file *file, struct address_space *mapping,
+		    loff_t pos, unsigned len, unsigned copied,
+		    struct page *page, void *fsdata)
+{
+	int i;
+	unsigned from, to, start = pos & (PAGE_CACHE_SIZE - 1);
+	struct inode *inode = mapping->host;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+	struct ocfs2_write_ctxt *wc = fsdata;
+	struct ocfs2_dinode *di = (struct ocfs2_dinode *)wc->w_di_bh->b_data;
+	handle_t *handle = wc->w_handle;
+	struct page *tmppage;
+
+	if (unlikely(copied < len)) {
+		if (!PageUptodate(wc->w_target_page))
+			copied = 0;
+
+		ocfs2_zero_new_buffers(wc->w_target_page, start+copied,
+				       start+len);
+	}
+	flush_dcache_page(wc->w_target_page);
+
+	for(i = 0; i < wc->w_num_pages; i++) {
+		tmppage = wc->w_pages[i];
+
+		if (tmppage == wc->w_target_page) {
+			from = wc->w_target_from;
+			to = wc->w_target_to;
+
+			BUG_ON(from > PAGE_CACHE_SIZE ||
+			       to > PAGE_CACHE_SIZE ||
+			       to < from);
+		} else {
+			/*
+			 * Pages adjacent to the target (if any) imply
+			 * a hole-filling write in which case we want
+			 * to flush their entire range.
+			 */
+			from = 0;
+			to = PAGE_CACHE_SIZE;
+		}
+
+		if (ocfs2_should_order_data(inode))
+			walk_page_buffers(wc->w_handle, page_buffers(tmppage),
+					  from, to, NULL,
+					  ocfs2_journal_dirty_data);
+
+		block_commit_write(tmppage, from, to);
+	}
+
+	pos += copied;
+	if (pos > inode->i_size) {
+		i_size_write(inode, pos);
+		mark_inode_dirty(inode);
+	}
+	inode->i_blocks = ocfs2_inode_sector_count(inode);
+	di->i_size = cpu_to_le64((u64)i_size_read(inode));
+	inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+	di->i_mtime = di->i_ctime = cpu_to_le64(inode->i_mtime.tv_sec);
+	di->i_mtime_nsec = di->i_ctime_nsec = cpu_to_le32(inode->i_mtime.tv_nsec);
+
+	ocfs2_journal_dirty(handle, wc->w_di_bh);
+
+	ocfs2_commit_trans(osb, handle);
+	ocfs2_data_unlock(inode, 1);
+	up_write(&OCFS2_I(inode)->ip_alloc_sem);
+	ocfs2_meta_unlock(inode, 1);
+	ocfs2_free_write_ctxt(wc);
+
+	return copied;
 }
 
 const struct address_space_operations ocfs2_aops = {
diff --git a/fs/ocfs2/aops.h b/fs/ocfs2/aops.h
index 45821d4..bdcdd1a 100644
--- a/fs/ocfs2/aops.h
+++ b/fs/ocfs2/aops.h
@@ -42,57 +42,13 @@
 			int (*fn)(	handle_t *handle,
 					struct buffer_head *bh));
 
-struct ocfs2_write_ctxt;
-typedef int (ocfs2_page_writer)(struct inode *, struct ocfs2_write_ctxt *,
-				u64 *, unsigned int *, unsigned int *);
+int ocfs2_write_begin(struct file *file, struct address_space *mapping,
+		      loff_t pos, unsigned len, unsigned flags,
+		      struct page **pagep, void **fsdata);
 
-ssize_t ocfs2_buffered_write_cluster(struct file *file, loff_t pos,
-				     size_t count, ocfs2_page_writer *actor,
-				     void *priv);
-
-struct ocfs2_write_ctxt {
-	size_t				w_count;
-	loff_t				w_pos;
-	u32				w_cpos;
-	unsigned int			w_finished_copy;
-
-	/* This is true if page_size > cluster_size */
-	unsigned int			w_large_pages;
-
-	/* Filler callback and private data */
-	ocfs2_page_writer		*w_write_data_page;
-	void				*w_private;
-
-	/* Only valid for the filler callback */
-	struct page			*w_this_page;
-	unsigned int			w_this_page_new;
-};
-
-struct ocfs2_buffered_write_priv {
-	char				*b_src_buf;
-	const struct iovec		*b_cur_iov; /* Current iovec */
-	size_t				b_cur_off; /* Offset in the
-						    * current iovec */
-};
-int ocfs2_map_and_write_user_data(struct inode *inode,
-				  struct ocfs2_write_ctxt *wc,
-				  u64 *p_blkno,
-				  unsigned int *ret_from,
-				  unsigned int *ret_to);
-
-struct ocfs2_splice_write_priv {
-	struct splice_desc		*s_sd;
-	struct pipe_buffer		*s_buf;
-	struct pipe_inode_info		*s_pipe;
-	/* Neither offset value is ever larger than one page */
-	unsigned int			s_offset;
-	unsigned int			s_buf_offset;
-};
-int ocfs2_map_and_write_splice_data(struct inode *inode,
-				    struct ocfs2_write_ctxt *wc,
-				    u64 *p_blkno,
-				    unsigned int *ret_from,
-				    unsigned int *ret_to);
+int ocfs2_write_end(struct file *file, struct address_space *mapping,
+		    loff_t pos, unsigned len, unsigned copied,
+		    struct page *page, void *fsdata);
 
 /* all ocfs2_dio_end_io()'s fault */
 #define ocfs2_iocb_is_rw_locked(iocb) \
diff --git a/fs/ocfs2/file.c b/fs/ocfs2/file.c
index 566f9b7..4c850d0 100644
--- a/fs/ocfs2/file.c
+++ b/fs/ocfs2/file.c
@@ -1335,15 +1335,16 @@
 	*basep = base;
 }
 
-static struct page * ocfs2_get_write_source(struct ocfs2_buffered_write_priv *bp,
+static struct page * ocfs2_get_write_source(char **ret_src_buf,
 					    const struct iovec *cur_iov,
 					    size_t iov_offset)
 {
 	int ret;
-	char *buf;
+	char *buf = cur_iov->iov_base + iov_offset;
 	struct page *src_page = NULL;
+	unsigned long off;
 
-	buf = cur_iov->iov_base + iov_offset;
+	off = (unsigned long)(buf) & ~PAGE_CACHE_MASK;
 
 	if (!segment_eq(get_fs(), KERNEL_DS)) {
 		/*
@@ -1355,18 +1356,17 @@
 				     (unsigned long)buf & PAGE_CACHE_MASK, 1,
 				     0, 0, &src_page, NULL);
 		if (ret == 1)
-			bp->b_src_buf = kmap(src_page);
+			*ret_src_buf = kmap(src_page) + off;
 		else
 			src_page = ERR_PTR(-EFAULT);
 	} else {
-		bp->b_src_buf = buf;
+		*ret_src_buf = buf;
 	}
 
 	return src_page;
 }
 
-static void ocfs2_put_write_source(struct ocfs2_buffered_write_priv *bp,
-				   struct page *page)
+static void ocfs2_put_write_source(struct page *page)
 {
 	if (page) {
 		kunmap(page);
@@ -1382,10 +1382,12 @@
 {
 	int ret = 0;
 	ssize_t copied, total = 0;
-	size_t iov_offset = 0;
+	size_t iov_offset = 0, bytes;
+	loff_t pos;
 	const struct iovec *cur_iov = iov;
-	struct ocfs2_buffered_write_priv bp;
-	struct page *page;
+	struct page *user_page, *page;
+	char *buf, *dst;
+	void *fsdata;
 
 	/*
 	 * handle partial DIO write.  Adjust cur_iov if needed.
@@ -1393,21 +1395,38 @@
 	ocfs2_set_next_iovec(&cur_iov, &iov_offset, o_direct_written);
 
 	do {
-		bp.b_cur_off = iov_offset;
-		bp.b_cur_iov = cur_iov;
+		pos = *ppos;
 
-		page = ocfs2_get_write_source(&bp, cur_iov, iov_offset);
-		if (IS_ERR(page)) {
-			ret = PTR_ERR(page);
+		user_page = ocfs2_get_write_source(&buf, cur_iov, iov_offset);
+		if (IS_ERR(user_page)) {
+			ret = PTR_ERR(user_page);
 			goto out;
 		}
 
-		copied = ocfs2_buffered_write_cluster(file, *ppos, count,
-						      ocfs2_map_and_write_user_data,
-						      &bp);
+		/* Stay within our page boundaries */
+		bytes = min((PAGE_CACHE_SIZE - ((unsigned long)pos & ~PAGE_CACHE_MASK)),
+			    (PAGE_CACHE_SIZE - ((unsigned long)buf & ~PAGE_CACHE_MASK)));
+		/* Stay within the vector boundary */
+		bytes = min_t(size_t, bytes, cur_iov->iov_len - iov_offset);
+		/* Stay within count */
+		bytes = min(bytes, count);
 
-		ocfs2_put_write_source(&bp, page);
+		page = NULL;
+		ret = ocfs2_write_begin(file, file->f_mapping, pos, bytes, 0,
+					&page, &fsdata);
+		if (ret) {
+			mlog_errno(ret);
+			goto out;
+		}
 
+		dst = kmap_atomic(page, KM_USER0);
+		memcpy(dst + (pos & (PAGE_CACHE_SIZE - 1)), buf, bytes);
+		kunmap_atomic(dst, KM_USER0);
+		flush_dcache_page(page);
+		ocfs2_put_write_source(user_page);
+
+		copied = ocfs2_write_end(file, file->f_mapping, pos, bytes,
+					 bytes, page, fsdata);
 		if (copied < 0) {
 			mlog_errno(copied);
 			ret = copied;
@@ -1415,7 +1434,7 @@
 		}
 
 		total += copied;
-		*ppos = *ppos + copied;
+		*ppos = pos + copied;
 		count -= copied;
 
 		ocfs2_set_next_iovec(&cur_iov, &iov_offset, copied);
@@ -1585,52 +1604,46 @@
 				    struct pipe_buffer *buf,
 				    struct splice_desc *sd)
 {
-	int ret, count, total = 0;
+	int ret, count;
 	ssize_t copied = 0;
-	struct ocfs2_splice_write_priv sp;
+	struct file *file = sd->u.file;
+	unsigned int offset;
+	struct page *page = NULL;
+	void *fsdata;
+	char *src, *dst;
 
 	ret = buf->ops->confirm(pipe, buf);
 	if (ret)
 		goto out;
 
-	sp.s_sd = sd;
-	sp.s_buf = buf;
-	sp.s_pipe = pipe;
-	sp.s_offset = sd->pos & ~PAGE_CACHE_MASK;
-	sp.s_buf_offset = buf->offset;
-
+	offset = sd->pos & ~PAGE_CACHE_MASK;
 	count = sd->len;
-	if (count + sp.s_offset > PAGE_CACHE_SIZE)
-		count = PAGE_CACHE_SIZE - sp.s_offset;
+	if (count + offset > PAGE_CACHE_SIZE)
+		count = PAGE_CACHE_SIZE - offset;
 
-	do {
-		/*
-		 * splice wants us to copy up to one page at a
-		 * time. For pagesize > cluster size, this means we
-		 * might enter ocfs2_buffered_write_cluster() more
-		 * than once, so keep track of our progress here.
-		 */
-		copied = ocfs2_buffered_write_cluster(sd->u.file,
-						      (loff_t)sd->pos + total,
-						      count,
-						      ocfs2_map_and_write_splice_data,
-						      &sp);
-		if (copied < 0) {
-			mlog_errno(copied);
-			ret = copied;
-			goto out;
-		}
+	ret = ocfs2_write_begin(file, file->f_mapping, sd->pos, count, 0,
+				&page, &fsdata);
+	if (ret) {
+		mlog_errno(ret);
+		goto out;
+	}
 
-		count -= copied;
-		sp.s_offset += copied;
-		sp.s_buf_offset += copied;
-		total += copied;
-	} while (count);
+	src = buf->ops->map(pipe, buf, 1);
+	dst = kmap_atomic(page, KM_USER1);
+	memcpy(dst + offset, src + buf->offset, count);
+	kunmap_atomic(page, KM_USER1);
+	buf->ops->unmap(pipe, buf, src);
 
-	ret = 0;
+	copied = ocfs2_write_end(file, file->f_mapping, sd->pos, count, count,
+				 page, fsdata);
+	if (copied < 0) {
+		mlog_errno(copied);
+		ret = copied;
+		goto out;
+	}
 out:
 
-	return total ? total : ret;
+	return copied ? copied : ret;
 }
 
 static ssize_t __ocfs2_file_splice_write(struct pipe_inode_info *pipe,