NFSv4.1: pnfs: add LAYOUTGET and GETDEVICEINFO infrastructure

Add the ability to actually send LAYOUTGET and GETDEVICEINFO.  This also adds
in the machinery to handle layout state and the deviceid cache.  Note that
GETDEVICEINFO is not called directly by the generic layer.  Instead it
is called by the drivers while parsing the LAYOUTGET opaque data in response
to an unknown device id embedded therein.  RFC 5661 only encodes
device ids within the driver-specific opaque data.

Signed-off-by: Andy Adamson <andros@netapp.com>
Signed-off-by: Dean Hildebrand <dhildebz@umich.edu>
Signed-off-by: Marc Eshel <eshel@almaden.ibm.com>
Signed-off-by: Mike Sager <sager@netapp.com>
Signed-off-by: Ricardo Labiaga <ricardo.labiaga@netapp.com>
Signed-off-by: Tao Guo <guotao@nrchpc.ac.cn>
Signed-off-by: Boaz Harrosh <bharrosh@panasas.com>
Signed-off-by: Fred Isaman <iisaman@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 891a0c3..d1ad7df 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -140,6 +140,11 @@
 		printk(KERN_ERR "%s id 0 is reserved\n", __func__);
 		return status;
 	}
+	if (!ld_type->alloc_lseg || !ld_type->free_lseg) {
+		printk(KERN_ERR "%s Layout driver must provide "
+		       "alloc_lseg and free_lseg.\n", __func__);
+		return status;
+	}
 
 	spin_lock(&pnfs_spinlock);
 	tmp = find_pnfs_driver_locked(ld_type->id);
@@ -168,6 +173,10 @@
 }
 EXPORT_SYMBOL_GPL(pnfs_unregister_layoutdriver);
 
+/*
+ * pNFS client layout cache
+ */
+
 static void
 get_layout_hdr_locked(struct pnfs_layout_hdr *lo)
 {
@@ -190,7 +199,7 @@
 	}
 }
 
-static void
+void
 put_layout_hdr(struct inode *inode)
 {
 	spin_lock(&inode->i_lock);
@@ -215,7 +224,7 @@
 	struct inode *ino = lseg->layout->inode;
 
 	dprintk("--> %s\n", __func__);
-	kfree(lseg);
+	NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
 	/* Matched by get_layout_hdr_locked in pnfs_insert_layout */
 	put_layout_hdr(ino);
 }
@@ -249,6 +258,9 @@
 	/* List does not take a reference, so no need for put here */
 	list_del_init(&lo->layouts);
 	spin_unlock(&clp->cl_lock);
+	write_seqlock(&lo->seqlock);
+	clear_bit(NFS_LAYOUT_STATEID_SET, &lo->state);
+	write_sequnlock(&lo->seqlock);
 
 	dprintk("%s:Return\n", __func__);
 }
@@ -307,40 +319,135 @@
 	}
 }
 
-static void pnfs_insert_layout(struct pnfs_layout_hdr *lo,
-			       struct pnfs_layout_segment *lseg);
+/* update lo->stateid with new if is more recent
+ *
+ * lo->stateid could be the open stateid, in which case we just use what given.
+ */
+static void
+pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
+			const nfs4_stateid *new)
+{
+	nfs4_stateid *old = &lo->stateid;
+	bool overwrite = false;
 
-/* Get layout from server. */
+	write_seqlock(&lo->seqlock);
+	if (!test_bit(NFS_LAYOUT_STATEID_SET, &lo->state) ||
+	    memcmp(old->stateid.other, new->stateid.other, sizeof(new->stateid.other)))
+		overwrite = true;
+	else {
+		u32 oldseq, newseq;
+
+		oldseq = be32_to_cpu(old->stateid.seqid);
+		newseq = be32_to_cpu(new->stateid.seqid);
+		if ((int)(newseq - oldseq) > 0)
+			overwrite = true;
+	}
+	if (overwrite)
+		memcpy(&old->stateid, &new->stateid, sizeof(new->stateid));
+	write_sequnlock(&lo->seqlock);
+}
+
+static void
+pnfs_layout_from_open_stateid(struct pnfs_layout_hdr *lo,
+			      struct nfs4_state *state)
+{
+	int seq;
+
+	dprintk("--> %s\n", __func__);
+	write_seqlock(&lo->seqlock);
+	do {
+		seq = read_seqbegin(&state->seqlock);
+		memcpy(lo->stateid.data, state->stateid.data,
+		       sizeof(state->stateid.data));
+	} while (read_seqretry(&state->seqlock, seq));
+	set_bit(NFS_LAYOUT_STATEID_SET, &lo->state);
+	write_sequnlock(&lo->seqlock);
+	dprintk("<-- %s\n", __func__);
+}
+
+void
+pnfs_get_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
+			struct nfs4_state *open_state)
+{
+	int seq;
+
+	dprintk("--> %s\n", __func__);
+	do {
+		seq = read_seqbegin(&lo->seqlock);
+		if (!test_bit(NFS_LAYOUT_STATEID_SET, &lo->state)) {
+			/* This will trigger retry of the read */
+			pnfs_layout_from_open_stateid(lo, open_state);
+		} else
+			memcpy(dst->data, lo->stateid.data,
+			       sizeof(lo->stateid.data));
+	} while (read_seqretry(&lo->seqlock, seq));
+	dprintk("<-- %s\n", __func__);
+}
+
+/*
+* Get layout from server.
+*    for now, assume that whole file layouts are requested.
+*    arg->offset: 0
+*    arg->length: all ones
+*/
 static struct pnfs_layout_segment *
 send_layoutget(struct pnfs_layout_hdr *lo,
 	   struct nfs_open_context *ctx,
 	   u32 iomode)
 {
 	struct inode *ino = lo->inode;
-	struct pnfs_layout_segment *lseg;
+	struct nfs_server *server = NFS_SERVER(ino);
+	struct nfs4_layoutget *lgp;
+	struct pnfs_layout_segment *lseg = NULL;
 
-	/* Lets pretend we sent LAYOUTGET and got a response */
-	lseg = kzalloc(sizeof(*lseg), GFP_KERNEL);
-	if (!lseg) {
-		set_bit(lo_fail_bit(iomode), &lo->state);
-		spin_lock(&ino->i_lock);
-		put_layout_hdr_locked(lo);
-		spin_unlock(&ino->i_lock);
+	dprintk("--> %s\n", __func__);
+
+	BUG_ON(ctx == NULL);
+	lgp = kzalloc(sizeof(*lgp), GFP_KERNEL);
+	if (lgp == NULL) {
+		put_layout_hdr(lo->inode);
 		return NULL;
 	}
-	init_lseg(lo, lseg);
-	lseg->iomode = IOMODE_RW;
-	spin_lock(&ino->i_lock);
-	pnfs_insert_layout(lo, lseg);
-	put_layout_hdr_locked(lo);
-	spin_unlock(&ino->i_lock);
+	lgp->args.minlength = NFS4_MAX_UINT64;
+	lgp->args.maxcount = PNFS_LAYOUT_MAXSIZE;
+	lgp->args.range.iomode = iomode;
+	lgp->args.range.offset = 0;
+	lgp->args.range.length = NFS4_MAX_UINT64;
+	lgp->args.type = server->pnfs_curr_ld->id;
+	lgp->args.inode = ino;
+	lgp->args.ctx = get_nfs_open_context(ctx);
+	lgp->lsegpp = &lseg;
+
+	/* Synchronously retrieve layout information from server and
+	 * store in lseg.
+	 */
+	nfs4_proc_layoutget(lgp);
+	if (!lseg) {
+		/* remember that LAYOUTGET failed and suspend trying */
+		set_bit(lo_fail_bit(iomode), &lo->state);
+	}
 	return lseg;
 }
 
+/*
+ * Compare two layout segments for sorting into layout cache.
+ * We want to preferentially return RW over RO layouts, so ensure those
+ * are seen first.
+ */
+static s64
+cmp_layout(u32 iomode1, u32 iomode2)
+{
+	/* read > read/write */
+	return (int)(iomode2 == IOMODE_READ) - (int)(iomode1 == IOMODE_READ);
+}
+
 static void
 pnfs_insert_layout(struct pnfs_layout_hdr *lo,
 		   struct pnfs_layout_segment *lseg)
 {
+	struct pnfs_layout_segment *lp;
+	int found = 0;
+
 	dprintk("%s:Begin\n", __func__);
 
 	assert_spin_locked(&lo->inode->i_lock);
@@ -352,19 +459,28 @@
 		list_add_tail(&lo->layouts, &clp->cl_layouts);
 		spin_unlock(&clp->cl_lock);
 	}
-	get_layout_hdr_locked(lo);
-	/* STUB - add the constructed lseg if necessary */
-	if (list_empty(&lo->segs)) {
-		list_add_tail(&lseg->fi_list, &lo->segs);
-		dprintk("%s: inserted lseg %p iomode %d at tail\n",
-			__func__, lseg, lseg->iomode);
-	} else {
-		/* There is no harm for the moment in calling this
-		 * with the lock held, and the call will be removed
-		 * with the STUB.
-		 */
-		put_lseg(lseg);
+	list_for_each_entry(lp, &lo->segs, fi_list) {
+		if (cmp_layout(lp->range.iomode, lseg->range.iomode) > 0)
+			continue;
+		list_add_tail(&lseg->fi_list, &lp->fi_list);
+		dprintk("%s: inserted lseg %p "
+			"iomode %d offset %llu length %llu before "
+			"lp %p iomode %d offset %llu length %llu\n",
+			__func__, lseg, lseg->range.iomode,
+			lseg->range.offset, lseg->range.length,
+			lp, lp->range.iomode, lp->range.offset,
+			lp->range.length);
+		found = 1;
+		break;
 	}
+	if (!found) {
+		list_add_tail(&lseg->fi_list, &lo->segs);
+		dprintk("%s: inserted lseg %p "
+			"iomode %d offset %llu length %llu at tail\n",
+			__func__, lseg, lseg->range.iomode,
+			lseg->range.offset, lseg->range.length);
+	}
+	get_layout_hdr_locked(lo);
 
 	dprintk("%s:Return\n", __func__);
 }
@@ -380,6 +496,7 @@
 	lo->refcount = 1;
 	INIT_LIST_HEAD(&lo->layouts);
 	INIT_LIST_HEAD(&lo->segs);
+	seqlock_init(&lo->seqlock);
 	lo->inode = ino;
 	return lo;
 }
@@ -407,11 +524,46 @@
 	return nfsi->layout;
 }
 
-/* STUB - LAYOUTGET never succeeds, so cache is empty */
+/*
+ * iomode matching rules:
+ * iomode	lseg	match
+ * -----	-----	-----
+ * ANY		READ	true
+ * ANY		RW	true
+ * RW		READ	false
+ * RW		RW	true
+ * READ		READ	true
+ * READ		RW	true
+ */
+static int
+is_matching_lseg(struct pnfs_layout_segment *lseg, u32 iomode)
+{
+	return (iomode != IOMODE_RW || lseg->range.iomode == IOMODE_RW);
+}
+
+/*
+ * lookup range in layout
+ */
 static struct pnfs_layout_segment *
 pnfs_has_layout(struct pnfs_layout_hdr *lo, u32 iomode)
 {
-	return NULL;
+	struct pnfs_layout_segment *lseg, *ret = NULL;
+
+	dprintk("%s:Begin\n", __func__);
+
+	assert_spin_locked(&lo->inode->i_lock);
+	list_for_each_entry(lseg, &lo->segs, fi_list) {
+		if (is_matching_lseg(lseg, iomode)) {
+			ret = lseg;
+			break;
+		}
+		if (cmp_layout(iomode, lseg->range.iomode) > 0)
+			break;
+	}
+
+	dprintk("%s:Return lseg %p ref %d\n",
+		__func__, ret, ret ? atomic_read(&ret->kref.refcount) : 0);
+	return ret;
 }
 
 /*
@@ -448,7 +600,7 @@
 	if (test_bit(lo_fail_bit(iomode), &nfsi->layout->state))
 		goto out_unlock;
 
-	get_layout_hdr_locked(lo);
+	get_layout_hdr_locked(lo); /* Matched in nfs4_layoutget_release */
 	spin_unlock(&ino->i_lock);
 
 	lseg = send_layoutget(lo, ctx, iomode);
@@ -460,3 +612,172 @@
 	spin_unlock(&ino->i_lock);
 	goto out;
 }
+
+int
+pnfs_layout_process(struct nfs4_layoutget *lgp)
+{
+	struct pnfs_layout_hdr *lo = NFS_I(lgp->args.inode)->layout;
+	struct nfs4_layoutget_res *res = &lgp->res;
+	struct pnfs_layout_segment *lseg;
+	struct inode *ino = lo->inode;
+	int status = 0;
+
+	/* Inject layout blob into I/O device driver */
+	lseg = NFS_SERVER(ino)->pnfs_curr_ld->alloc_lseg(lo, res);
+	if (!lseg || IS_ERR(lseg)) {
+		if (!lseg)
+			status = -ENOMEM;
+		else
+			status = PTR_ERR(lseg);
+		dprintk("%s: Could not allocate layout: error %d\n",
+		       __func__, status);
+		goto out;
+	}
+
+	spin_lock(&ino->i_lock);
+	init_lseg(lo, lseg);
+	lseg->range = res->range;
+	*lgp->lsegpp = lseg;
+	pnfs_insert_layout(lo, lseg);
+
+	/* Done processing layoutget. Set the layout stateid */
+	pnfs_set_layout_stateid(lo, &res->stateid);
+	spin_unlock(&ino->i_lock);
+out:
+	return status;
+}
+
+/*
+ * Device ID cache. Currently supports one layout type per struct nfs_client.
+ * Add layout type to the lookup key to expand to support multiple types.
+ */
+int
+pnfs_alloc_init_deviceid_cache(struct nfs_client *clp,
+			 void (*free_callback)(struct pnfs_deviceid_node *))
+{
+	struct pnfs_deviceid_cache *c;
+
+	c = kzalloc(sizeof(struct pnfs_deviceid_cache), GFP_KERNEL);
+	if (!c)
+		return -ENOMEM;
+	spin_lock(&clp->cl_lock);
+	if (clp->cl_devid_cache != NULL) {
+		atomic_inc(&clp->cl_devid_cache->dc_ref);
+		dprintk("%s [kref [%d]]\n", __func__,
+			atomic_read(&clp->cl_devid_cache->dc_ref));
+		kfree(c);
+	} else {
+		/* kzalloc initializes hlists */
+		spin_lock_init(&c->dc_lock);
+		atomic_set(&c->dc_ref, 1);
+		c->dc_free_callback = free_callback;
+		clp->cl_devid_cache = c;
+		dprintk("%s [new]\n", __func__);
+	}
+	spin_unlock(&clp->cl_lock);
+	return 0;
+}
+EXPORT_SYMBOL_GPL(pnfs_alloc_init_deviceid_cache);
+
+/*
+ * Called from pnfs_layoutdriver_type->free_lseg
+ * last layout segment reference frees deviceid
+ */
+void
+pnfs_put_deviceid(struct pnfs_deviceid_cache *c,
+		  struct pnfs_deviceid_node *devid)
+{
+	struct nfs4_deviceid *id = &devid->de_id;
+	struct pnfs_deviceid_node *d;
+	struct hlist_node *n;
+	long h = nfs4_deviceid_hash(id);
+
+	dprintk("%s [%d]\n", __func__, atomic_read(&devid->de_ref));
+	if (!atomic_dec_and_lock(&devid->de_ref, &c->dc_lock))
+		return;
+
+	hlist_for_each_entry_rcu(d, n, &c->dc_deviceids[h], de_node)
+		if (!memcmp(&d->de_id, id, sizeof(*id))) {
+			hlist_del_rcu(&d->de_node);
+			spin_unlock(&c->dc_lock);
+			synchronize_rcu();
+			c->dc_free_callback(devid);
+			return;
+		}
+	spin_unlock(&c->dc_lock);
+	/* Why wasn't it found in  the list? */
+	BUG();
+}
+EXPORT_SYMBOL_GPL(pnfs_put_deviceid);
+
+/* Find and reference a deviceid */
+struct pnfs_deviceid_node *
+pnfs_find_get_deviceid(struct pnfs_deviceid_cache *c, struct nfs4_deviceid *id)
+{
+	struct pnfs_deviceid_node *d;
+	struct hlist_node *n;
+	long hash = nfs4_deviceid_hash(id);
+
+	dprintk("--> %s hash %ld\n", __func__, hash);
+	rcu_read_lock();
+	hlist_for_each_entry_rcu(d, n, &c->dc_deviceids[hash], de_node) {
+		if (!memcmp(&d->de_id, id, sizeof(*id))) {
+			if (!atomic_inc_not_zero(&d->de_ref)) {
+				goto fail;
+			} else {
+				rcu_read_unlock();
+				return d;
+			}
+		}
+	}
+fail:
+	rcu_read_unlock();
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(pnfs_find_get_deviceid);
+
+/*
+ * Add a deviceid to the cache.
+ * GETDEVICEINFOs for same deviceid can race. If deviceid is found, discard new
+ */
+struct pnfs_deviceid_node *
+pnfs_add_deviceid(struct pnfs_deviceid_cache *c, struct pnfs_deviceid_node *new)
+{
+	struct pnfs_deviceid_node *d;
+	long hash = nfs4_deviceid_hash(&new->de_id);
+
+	dprintk("--> %s hash %ld\n", __func__, hash);
+	spin_lock(&c->dc_lock);
+	d = pnfs_find_get_deviceid(c, &new->de_id);
+	if (d) {
+		spin_unlock(&c->dc_lock);
+		dprintk("%s [discard]\n", __func__);
+		c->dc_free_callback(new);
+		return d;
+	}
+	INIT_HLIST_NODE(&new->de_node);
+	atomic_set(&new->de_ref, 1);
+	hlist_add_head_rcu(&new->de_node, &c->dc_deviceids[hash]);
+	spin_unlock(&c->dc_lock);
+	dprintk("%s [new]\n", __func__);
+	return new;
+}
+EXPORT_SYMBOL_GPL(pnfs_add_deviceid);
+
+void
+pnfs_put_deviceid_cache(struct nfs_client *clp)
+{
+	struct pnfs_deviceid_cache *local = clp->cl_devid_cache;
+
+	dprintk("--> %s cl_devid_cache %p\n", __func__, clp->cl_devid_cache);
+	if (atomic_dec_and_lock(&local->dc_ref, &clp->cl_lock)) {
+		int i;
+		/* Verify cache is empty */
+		for (i = 0; i < NFS4_DEVICE_ID_HASH_SIZE; i++)
+			BUG_ON(!hlist_empty(&local->dc_deviceids[i]));
+		clp->cl_devid_cache = NULL;
+		spin_unlock(&clp->cl_lock);
+		kfree(local);
+	}
+}
+EXPORT_SYMBOL_GPL(pnfs_put_deviceid_cache);