orangefs: copy Orangefs-sized blocks into the pagecache if possible.

->readpage looks in file->private_data to try and find out how the
userspace program set "count" in read(2) or with "dd bs=" or whatever.

->readpage uses "count" and inode->i_size to calculate how much
data Orangefs should deposit in the Orangefs shared buffer, and
remembers which slot the data is in.

After copying data from the Orangefs shared buffer slot into
"the page", readpage tries to increment through the pagecache index
and fill as many pages as it can from the extra data in the shared
buffer. Hopefully these extra pages will soon be needed by the vfs,
and they'll be in the pagecache already.

Signed-off-by: Mike Marshall <hubcap@omnibond.com>
Signed-off-by: Martin Brandenburg <martin@omnibond.com>
diff --git a/fs/orangefs/file.c b/fs/orangefs/file.c
index 68ba5ae..a35c170 100644
--- a/fs/orangefs/file.c
+++ b/fs/orangefs/file.c
@@ -54,6 +54,7 @@ ssize_t wait_for_direct_io(enum ORANGEFS_io_type type, struct inode *inode,
 	struct orangefs_kernel_op_s *new_op = NULL;
 	int buffer_index = -1;
 	ssize_t ret;
+	size_t copy_amount;
 
 	new_op = op_alloc(ORANGEFS_VFS_OP_FILE_IO);
 	if (!new_op)
@@ -212,8 +213,25 @@ ssize_t wait_for_direct_io(enum ORANGEFS_io_type type, struct inode *inode,
 		 *       can futher be kernel-space or user-space addresses.
 		 *       or it can pointers to struct page's
 		 */
+
+		/*
+		 * When reading, readahead_size will only be zero when
+		 * we're doing O_DIRECT, otherwise we got here from
+		 * orangefs_readpage.
+		 *
+		 * If we got here from orangefs_readpage we want to
+		 * copy either a page or the whole file into the io
+		 * vector, whichever is smaller.
+		 */
+		if (readahead_size)
+			copy_amount =
+				min(new_op->downcall.resp.io.amt_complete,
+					(__s64)PAGE_SIZE);
+		else
+			copy_amount = new_op->downcall.resp.io.amt_complete;
+
 		ret = orangefs_bufmap_copy_to_iovec(iter, buffer_index,
-		    new_op->downcall.resp.io.amt_complete);
+			copy_amount);
 		if (ret < 0) {
 			gossip_err("%s: Failed to copy-out buffers. Please make sure that the pvfs2-client is running (%ld)\n",
 			    __func__, (long)ret);
@@ -231,10 +249,19 @@ ssize_t wait_for_direct_io(enum ORANGEFS_io_type type, struct inode *inode,
 
 out:
 	if (buffer_index >= 0) {
-		orangefs_bufmap_put(buffer_index);
-		gossip_debug(GOSSIP_FILE_DEBUG,
-			     "%s(%pU): PUT buffer_index %d\n",
-			     __func__, handle, buffer_index);
+		if ((readahead_size) && (type == ORANGEFS_IO_READ)) {
+			/* readpage */
+			*index_return = buffer_index;
+			gossip_debug(GOSSIP_FILE_DEBUG,
+				"%s: hold on to buffer_index :%d:\n",
+				__func__, buffer_index);
+		} else {
+			/* O_DIRECT */
+			orangefs_bufmap_put(buffer_index);
+			gossip_debug(GOSSIP_FILE_DEBUG,
+				"%s(%pU): PUT buffer_index %d\n",
+				__func__, handle, buffer_index);
+		}
 		buffer_index = -1;
 	}
 	op_release(new_op);
diff --git a/fs/orangefs/inode.c b/fs/orangefs/inode.c
index cded74e..3fb671d 100644
--- a/fs/orangefs/inode.c
+++ b/fs/orangefs/inode.c
@@ -247,31 +247,80 @@ static int orangefs_writepages(struct address_space *mapping,
 	return ret;
 }
 
+static int orangefs_launder_page(struct page *);
+
 static int orangefs_readpage(struct file *file, struct page *page)
 {
 	struct inode *inode = page->mapping->host;
 	struct iov_iter iter;
 	struct bio_vec bv;
 	ssize_t ret;
-	loff_t off;
+	loff_t off; /* offset into this page */
+	pgoff_t index; /* which page */
+	struct page *next_page;
+	char *kaddr;
+	struct orangefs_read_options *ro = file->private_data;
+	loff_t read_size;
+	loff_t roundedup;
+	int buffer_index = -1; /* orangefs shared memory slot */
+	int slot_index;   /* index into slot */
+	int remaining;
+
+	/*
+	 * If they set some miniscule size for "count" in read(2)
+	 * (for example) then let's try to read a page, or the whole file
+	 * if it is smaller than a page. Once "count" goes over a page
+	 * then lets round up to the highest page size multiple that is
+	 * less than or equal to "count" and do that much orangefs IO and
+	 * try to fill as many pages as we can from it.
+	 *
+	 * "count" should be represented in ro->blksiz.
+	 *
+	 * inode->i_size = file size.
+	 */
+	if (ro) {
+		if (ro->blksiz < PAGE_SIZE) {
+			if (inode->i_size < PAGE_SIZE)
+				read_size = inode->i_size;
+			else
+				read_size = PAGE_SIZE;
+		} else {
+			roundedup = ((PAGE_SIZE - 1) & ro->blksiz) ?
+				((ro->blksiz + PAGE_SIZE) & ~(PAGE_SIZE -1)) :
+				ro->blksiz;
+			if (roundedup > inode->i_size)
+				read_size = inode->i_size;
+			else
+				read_size = roundedup;
+
+		}
+	} else {
+		read_size = PAGE_SIZE;
+	}
+	if (!read_size)
+		read_size = PAGE_SIZE;
+
+	if (PageDirty(page))
+		orangefs_launder_page(page);
 
 	off = page_offset(page);
+	index = off >> PAGE_SHIFT;
 	bv.bv_page = page;
 	bv.bv_len = PAGE_SIZE;
 	bv.bv_offset = 0;
 	iov_iter_bvec(&iter, READ, &bv, 1, PAGE_SIZE);
 
-	if (PageDirty(page))
-		orangefs_launder_page(page);
-
 	ret = wait_for_direct_io(ORANGEFS_IO_READ, inode, &off, &iter,
-	    PAGE_SIZE, inode->i_size, NULL, NULL);
+	    read_size, inode->i_size, NULL, &buffer_index);
+	remaining = ret;
 	/* this will only zero remaining unread portions of the page data */
 	iov_iter_zero(~0U, &iter);
 	/* takes care of potential aliasing */
 	flush_dcache_page(page);
 	if (ret < 0) {
 		SetPageError(page);
+		unlock_page(page);
+		goto out;
 	} else {
 		SetPageUptodate(page);
 		if (PageError(page))
@@ -280,11 +329,62 @@ static int orangefs_readpage(struct file *file, struct page *page)
 	}
 	/* unlock the page after the ->readpage() routine completes */
 	unlock_page(page);
+
+	if (remaining > PAGE_SIZE) {
+		slot_index = 0;
+		while ((remaining - PAGE_SIZE) >= PAGE_SIZE) {
+			remaining -= PAGE_SIZE;
+			/*
+			 * It is an optimization to try and fill more than one
+			 * page... by now we've already gotten the single
+			 * page we were after, if stuff doesn't seem to
+			 * be going our way at this point just return
+			 * and hope for the best.
+			 *
+			 * If we look for pages and they're already there is
+			 * one reason to give up, and if they're not there
+			 * and we can't create them is another reason.
+			 */
+
+			index++;
+			slot_index++;
+			next_page = find_get_page(inode->i_mapping, index);
+			if (next_page) {
+				gossip_debug(GOSSIP_FILE_DEBUG,
+					"%s: found next page, quitting\n",
+					__func__);
+				put_page(next_page);
+				goto out;
+			}
+			next_page = find_or_create_page(inode->i_mapping,
+							index,
+							GFP_KERNEL);
+			/*
+			 * I've never hit this, leave it as a printk for
+			 * now so it will be obvious.
+			 */
+			if (!next_page) {
+				printk("%s: can't create next page, quitting\n",
+					__func__);
+				goto out;
+			}
+			kaddr = kmap_atomic(next_page);
+			orangefs_bufmap_page_fill(kaddr,
+						buffer_index,
+						slot_index);
+			kunmap_atomic(kaddr);
+			SetPageUptodate(next_page);
+			unlock_page(next_page);
+			put_page(next_page);
+		}
+	}
+
+out:
+	if (buffer_index != -1)
+		orangefs_bufmap_put(buffer_index);
 	return ret;
 }
 
-static int orangefs_launder_page(struct page *);
-
 static int orangefs_write_begin(struct file *file,
     struct address_space *mapping,
     loff_t pos, unsigned len, unsigned flags, struct page **pagep,
@@ -326,7 +426,6 @@ static int orangefs_write_begin(struct file *file,
 			if (ret)
 				return ret;
 		}
-
 	}
 
 	wr = kmalloc(sizeof *wr, GFP_KERNEL);
diff --git a/fs/orangefs/orangefs-bufmap.c b/fs/orangefs/orangefs-bufmap.c
index 443bcd8..d4811f9 100644
--- a/fs/orangefs/orangefs-bufmap.c
+++ b/fs/orangefs/orangefs-bufmap.c
@@ -538,3 +538,16 @@ int orangefs_bufmap_copy_to_iovec(struct iov_iter *iter,
 	}
 	return 0;
 }
+
+void orangefs_bufmap_page_fill(void *page_to,
+				int buffer_index,
+				int slot_index)
+{
+	struct orangefs_bufmap_desc *from;
+	void *page_from;
+
+	from = &__orangefs_bufmap->desc_array[buffer_index];
+	page_from = kmap_atomic(from->page_array[slot_index]);
+	memcpy(page_to, page_from, PAGE_SIZE);
+	kunmap_atomic(page_from);
+}
diff --git a/fs/orangefs/orangefs-bufmap.h b/fs/orangefs/orangefs-bufmap.h
index c2c3c5a..75b2d28 100644
--- a/fs/orangefs/orangefs-bufmap.h
+++ b/fs/orangefs/orangefs-bufmap.h
@@ -34,4 +34,6 @@ int orangefs_bufmap_copy_to_iovec(struct iov_iter *iter,
 			      int buffer_index,
 			      size_t size);
 
+void orangefs_bufmap_page_fill(void *kaddr, int buffer_index, int slot_index);
+
 #endif /* __ORANGEFS_BUFMAP_H */
diff --git a/fs/orangefs/orangefs-debugfs.c b/fs/orangefs/orangefs-debugfs.c
index 0732cb0..87b1a6f 100644
--- a/fs/orangefs/orangefs-debugfs.c
+++ b/fs/orangefs/orangefs-debugfs.c
@@ -963,7 +963,7 @@ int orangefs_debugfs_new_client_mask(void __user *arg)
 	return ret;
 }
 
-int orangefs_debugfs_new_client_string(void __user *arg) 
+int orangefs_debugfs_new_client_string(void __user *arg)
 {
 	int ret;
 
@@ -1016,7 +1016,7 @@ int orangefs_debugfs_new_client_string(void __user *arg)
 	return 0;
 }
 
-int orangefs_debugfs_new_debug(void __user *arg) 
+int orangefs_debugfs_new_debug(void __user *arg)
 {
 	struct dev_mask_info_s mask_info = {0};
 	int ret;