libceph, rbd: new bio handling code (aka don't clone bios)

The reason we clone bios is to be able to give each object request
(and consequently each ceph_osd_data/ceph_msg_data item) its own
pointer to a (list of) bio(s).  The messenger then initializes its
cursor with cloned bio's ->bi_iter, so it knows where to start reading
from/writing to.  That's all the cloned bios are used for: to determine
each object request's starting position in the provided data buffer.

Introduce ceph_bio_iter to do exactly that -- store position within bio
list (i.e. pointer to bio) + position within that bio (i.e. bvec_iter).

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 883f17d..8eaebf6 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -218,7 +218,7 @@ typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
 
 enum obj_request_type {
 	OBJ_REQUEST_NODATA = 1,
-	OBJ_REQUEST_BIO,
+	OBJ_REQUEST_BIO,	/* pointer into provided bio (list) */
 	OBJ_REQUEST_PAGES,
 };
 
@@ -270,7 +270,7 @@ struct rbd_obj_request {
 
 	enum obj_request_type	type;
 	union {
-		struct bio	*bio_list;
+		struct ceph_bio_iter	bio_pos;
 		struct {
 			struct page	**pages;
 			u32		page_count;
@@ -1255,6 +1255,27 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev,
 	return length;
 }
 
+static void zero_bvec(struct bio_vec *bv)
+{
+	void *buf;
+	unsigned long flags;
+
+	buf = bvec_kmap_irq(bv, &flags);
+	memset(buf, 0, bv->bv_len);
+	flush_dcache_page(bv->bv_page);
+	bvec_kunmap_irq(buf, &flags);
+}
+
+static void zero_bios(struct ceph_bio_iter *bio_pos, u32 off, u32 bytes)
+{
+	struct ceph_bio_iter it = *bio_pos;
+
+	ceph_bio_iter_advance(&it, off);
+	ceph_bio_iter_advance_step(&it, bytes, ({
+		zero_bvec(&bv);
+	}));
+}
+
 /*
  * bio helpers
  */
@@ -1719,13 +1740,14 @@ rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
 	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
 	if (obj_request->result == -ENOENT) {
 		if (obj_request->type == OBJ_REQUEST_BIO)
-			zero_bio_chain(obj_request->bio_list, 0);
+			zero_bios(&obj_request->bio_pos, 0, length);
 		else
 			zero_pages(obj_request->pages, 0, length);
 		obj_request->result = 0;
 	} else if (xferred < length && !obj_request->result) {
 		if (obj_request->type == OBJ_REQUEST_BIO)
-			zero_bio_chain(obj_request->bio_list, xferred);
+			zero_bios(&obj_request->bio_pos, xferred,
+				  length - xferred);
 		else
 			zero_pages(obj_request->pages, xferred, length);
 	}
@@ -2036,11 +2058,8 @@ static void rbd_obj_request_destroy(struct kref *kref)
 	rbd_assert(obj_request_type_valid(obj_request->type));
 	switch (obj_request->type) {
 	case OBJ_REQUEST_NODATA:
-		break;		/* Nothing to do */
 	case OBJ_REQUEST_BIO:
-		if (obj_request->bio_list)
-			bio_chain_put(obj_request->bio_list);
-		break;
+		break;		/* Nothing to do */
 	case OBJ_REQUEST_PAGES:
 		/* img_data requests don't own their page array */
 		if (obj_request->pages &&
@@ -2368,7 +2387,7 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
 
 	if (obj_request->type == OBJ_REQUEST_BIO)
 		osd_req_op_extent_osd_data_bio(osd_request, num_ops,
-					obj_request->bio_list, length);
+					&obj_request->bio_pos, length);
 	else if (obj_request->type == OBJ_REQUEST_PAGES)
 		osd_req_op_extent_osd_data_pages(osd_request, num_ops,
 					obj_request->pages, length,
@@ -2396,8 +2415,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
 	struct rbd_device *rbd_dev = img_request->rbd_dev;
 	struct rbd_obj_request *obj_request = NULL;
 	struct rbd_obj_request *next_obj_request;
-	struct bio *bio_list = NULL;
-	unsigned int bio_offset = 0;
+	struct ceph_bio_iter bio_it;
 	struct page **pages = NULL;
 	enum obj_operation_type op_type;
 	u64 img_offset;
@@ -2412,9 +2430,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
 	op_type = rbd_img_request_op_type(img_request);
 
 	if (type == OBJ_REQUEST_BIO) {
-		bio_list = data_desc;
+		bio_it = *(struct ceph_bio_iter *)data_desc;
 		rbd_assert(img_offset ==
-			   bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
+			   bio_it.iter.bi_sector << SECTOR_SHIFT);
 	} else if (type == OBJ_REQUEST_PAGES) {
 		pages = data_desc;
 	}
@@ -2440,17 +2458,8 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
 		rbd_img_obj_request_add(img_request, obj_request);
 
 		if (type == OBJ_REQUEST_BIO) {
-			unsigned int clone_size;
-
-			rbd_assert(length <= (u64)UINT_MAX);
-			clone_size = (unsigned int)length;
-			obj_request->bio_list =
-					bio_chain_clone_range(&bio_list,
-								&bio_offset,
-								clone_size,
-								GFP_NOIO);
-			if (!obj_request->bio_list)
-				goto out_unwind;
+			obj_request->bio_pos = bio_it;
+			ceph_bio_iter_advance(&bio_it, length);
 		} else if (type == OBJ_REQUEST_PAGES) {
 			unsigned int page_count;
 
@@ -2980,7 +2989,7 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
 
 	if (obj_request->type == OBJ_REQUEST_BIO)
 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-						obj_request->bio_list);
+						&obj_request->bio_pos);
 	else
 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
 						obj_request->pages);
@@ -4093,9 +4102,13 @@ static void rbd_queue_workfn(struct work_struct *work)
 	if (op_type == OBJ_OP_DISCARD)
 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
 					      NULL);
-	else
+	else {
+		struct ceph_bio_iter bio_it = { .bio = rq->bio,
+						.iter = rq->bio->bi_iter };
+
 		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-					      rq->bio);
+					      &bio_it);
+	}
 	if (result)
 		goto err_img_request;