ceph: factor out libceph from Ceph file system

This factors out protocol and low-level storage parts of ceph into a
separate libceph module living in net/ceph and include/linux/ceph.  This
is mostly a matter of moving files around.  However, a few key pieces
of the interface change as well:

 - ceph_client becomes ceph_fs_client and ceph_client, where the latter
   captures the mon and osd clients, and the fs_client gets the mds client
   and file system specific pieces.
 - Mount option parsing and debugfs setup is correspondingly broken into
   two pieces.
 - The mon client gets a generic handler callback for otherwise unknown
   messages (mds map, in this case).
 - The basic supported/required feature bits can be expanded (and are by
   ceph_fs_client).

No functional change, aside from some subtle error handling cases that got
cleaned up in the refactoring process.

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 9922628..d6e0e04 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -1,5 +1,5 @@
 
-#include "ceph_debug.h"
+#include <linux/ceph/ceph_debug.h>
 
 #include <linux/backing-dev.h>
 #include <linux/ctype.h>
@@ -15,10 +15,13 @@
 #include <linux/statfs.h>
 #include <linux/string.h>
 
-#include "decode.h"
 #include "super.h"
-#include "mon_client.h"
-#include "auth.h"
+#include "mds_client.h"
+
+#include <linux/ceph/decode.h>
+#include <linux/ceph/mon_client.h>
+#include <linux/ceph/auth.h>
+#include <linux/ceph/debugfs.h>
 
 /*
  * Ceph superblock operations
@@ -26,36 +29,22 @@
  * Handle the basics of mounting, unmounting.
  */
 
-
-/*
- * find filename portion of a path (/foo/bar/baz -> baz)
- */
-const char *ceph_file_part(const char *s, int len)
-{
-	const char *e = s + len;
-
-	while (e != s && *(e-1) != '/')
-		e--;
-	return e;
-}
-
-
 /*
  * super ops
  */
 static void ceph_put_super(struct super_block *s)
 {
-	struct ceph_client *client = ceph_sb_to_client(s);
+	struct ceph_fs_client *fsc = ceph_sb_to_client(s);
 
 	dout("put_super\n");
-	ceph_mdsc_close_sessions(&client->mdsc);
+	ceph_mdsc_close_sessions(fsc->mdsc);
 
 	/*
 	 * ensure we release the bdi before put_anon_super releases
 	 * the device name.
 	 */
-	if (s->s_bdi == &client->backing_dev_info) {
-		bdi_unregister(&client->backing_dev_info);
+	if (s->s_bdi == &fsc->backing_dev_info) {
+		bdi_unregister(&fsc->backing_dev_info);
 		s->s_bdi = NULL;
 	}
 
@@ -64,14 +53,14 @@
 
 static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
 {
-	struct ceph_client *client = ceph_inode_to_client(dentry->d_inode);
-	struct ceph_monmap *monmap = client->monc.monmap;
+	struct ceph_fs_client *fsc = ceph_inode_to_client(dentry->d_inode);
+	struct ceph_monmap *monmap = fsc->client->monc.monmap;
 	struct ceph_statfs st;
 	u64 fsid;
 	int err;
 
 	dout("statfs\n");
-	err = ceph_monc_do_statfs(&client->monc, &st);
+	err = ceph_monc_do_statfs(&fsc->client->monc, &st);
 	if (err < 0)
 		return err;
 
@@ -104,49 +93,237 @@
 
 static int ceph_sync_fs(struct super_block *sb, int wait)
 {
-	struct ceph_client *client = ceph_sb_to_client(sb);
+	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
 
 	if (!wait) {
 		dout("sync_fs (non-blocking)\n");
-		ceph_flush_dirty_caps(&client->mdsc);
+		ceph_flush_dirty_caps(fsc->mdsc);
 		dout("sync_fs (non-blocking) done\n");
 		return 0;
 	}
 
 	dout("sync_fs (blocking)\n");
-	ceph_osdc_sync(&ceph_sb_to_client(sb)->osdc);
-	ceph_mdsc_sync(&ceph_sb_to_client(sb)->mdsc);
+	ceph_osdc_sync(&fsc->client->osdc);
+	ceph_mdsc_sync(fsc->mdsc);
 	dout("sync_fs (blocking) done\n");
 	return 0;
 }
 
-static int default_congestion_kb(void)
+/*
+ * mount options
+ */
+enum {
+	Opt_wsize,
+	Opt_rsize,
+	Opt_caps_wanted_delay_min,
+	Opt_caps_wanted_delay_max,
+	Opt_cap_release_safety,
+	Opt_readdir_max_entries,
+	Opt_readdir_max_bytes,
+	Opt_congestion_kb,
+	Opt_last_int,
+	/* int args above */
+	Opt_snapdirname,
+	Opt_last_string,
+	/* string args above */
+	Opt_dirstat,
+	Opt_nodirstat,
+	Opt_rbytes,
+	Opt_norbytes,
+	Opt_noasyncreaddir,
+};
+
+static match_table_t fsopt_tokens = {
+	{Opt_wsize, "wsize=%d"},
+	{Opt_rsize, "rsize=%d"},
+	{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
+	{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
+	{Opt_cap_release_safety, "cap_release_safety=%d"},
+	{Opt_readdir_max_entries, "readdir_max_entries=%d"},
+	{Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
+	{Opt_congestion_kb, "write_congestion_kb=%d"},
+	/* int args above */
+	{Opt_snapdirname, "snapdirname=%s"},
+	/* string args above */
+	{Opt_dirstat, "dirstat"},
+	{Opt_nodirstat, "nodirstat"},
+	{Opt_rbytes, "rbytes"},
+	{Opt_norbytes, "norbytes"},
+	{Opt_noasyncreaddir, "noasyncreaddir"},
+	{-1, NULL}
+};
+
+static int parse_fsopt_token(char *c, void *private)
 {
-	int congestion_kb;
+	struct ceph_mount_options *fsopt = private;
+	substring_t argstr[MAX_OPT_ARGS];
+	int token, intval, ret;
 
-	/*
-	 * Copied from NFS
-	 *
-	 * congestion size, scale with available memory.
-	 *
-	 *  64MB:    8192k
-	 * 128MB:   11585k
-	 * 256MB:   16384k
-	 * 512MB:   23170k
-	 *   1GB:   32768k
-	 *   2GB:   46340k
-	 *   4GB:   65536k
-	 *   8GB:   92681k
-	 *  16GB:  131072k
-	 *
-	 * This allows larger machines to have larger/more transfers.
-	 * Limit the default to 256M
-	 */
-	congestion_kb = (16*int_sqrt(totalram_pages)) << (PAGE_SHIFT-10);
-	if (congestion_kb > 256*1024)
-		congestion_kb = 256*1024;
+	token = match_token((char *)c, fsopt_tokens, argstr);
+	if (token < 0)
+		return -EINVAL;
 
-	return congestion_kb;
+	if (token < Opt_last_int) {
+		ret = match_int(&argstr[0], &intval);
+		if (ret < 0) {
+			pr_err("bad mount option arg (not int) "
+			       "at '%s'\n", c);
+			return ret;
+		}
+		dout("got int token %d val %d\n", token, intval);
+	} else if (token > Opt_last_int && token < Opt_last_string) {
+		dout("got string token %d val %s\n", token,
+		     argstr[0].from);
+	} else {
+		dout("got token %d\n", token);
+	}
+
+	switch (token) {
+	case Opt_snapdirname:
+		kfree(fsopt->snapdir_name);
+		fsopt->snapdir_name = kstrndup(argstr[0].from,
+					       argstr[0].to-argstr[0].from,
+					       GFP_KERNEL);
+		if (!fsopt->snapdir_name)
+			return -ENOMEM;
+		break;
+
+		/* misc */
+	case Opt_wsize:
+		fsopt->wsize = intval;
+		break;
+	case Opt_rsize:
+		fsopt->rsize = intval;
+		break;
+	case Opt_caps_wanted_delay_min:
+		fsopt->caps_wanted_delay_min = intval;
+		break;
+	case Opt_caps_wanted_delay_max:
+		fsopt->caps_wanted_delay_max = intval;
+		break;
+	case Opt_readdir_max_entries:
+		fsopt->max_readdir = intval;
+		break;
+	case Opt_readdir_max_bytes:
+		fsopt->max_readdir_bytes = intval;
+		break;
+	case Opt_congestion_kb:
+		fsopt->congestion_kb = intval;
+		break;
+	case Opt_dirstat:
+		fsopt->flags |= CEPH_MOUNT_OPT_DIRSTAT;
+		break;
+	case Opt_nodirstat:
+		fsopt->flags &= ~CEPH_MOUNT_OPT_DIRSTAT;
+		break;
+	case Opt_rbytes:
+		fsopt->flags |= CEPH_MOUNT_OPT_RBYTES;
+		break;
+	case Opt_norbytes:
+		fsopt->flags &= ~CEPH_MOUNT_OPT_RBYTES;
+		break;
+	case Opt_noasyncreaddir:
+		fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
+		break;
+	default:
+		BUG_ON(token);
+	}
+	return 0;
+}
+
+static void destroy_mount_options(struct ceph_mount_options *args)
+{
+	dout("destroy_mount_options %p\n", args);
+	kfree(args->snapdir_name);
+	kfree(args);
+}
+
+static int strcmp_null(const char *s1, const char *s2)
+{
+	if (!s1 && !s2)
+		return 0;
+	if (s1 && !s2)
+		return -1;
+	if (!s1 && s2)
+		return 1;
+	return strcmp(s1, s2);
+}
+
+static int compare_mount_options(struct ceph_mount_options *new_fsopt,
+				 struct ceph_options *new_opt,
+				 struct ceph_fs_client *fsc)
+{
+	struct ceph_mount_options *fsopt1 = new_fsopt;
+	struct ceph_mount_options *fsopt2 = fsc->mount_options;
+	int ofs = offsetof(struct ceph_mount_options, snapdir_name);
+	int ret;
+
+	ret = memcmp(fsopt1, fsopt2, ofs);
+	if (ret)
+		return ret;
+
+	ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name);
+	if (ret)
+		return ret;
+
+	return ceph_compare_options(new_opt, fsc->client);
+}
+
+static int parse_mount_options(struct ceph_mount_options **pfsopt,
+			       struct ceph_options **popt,
+			       int flags, char *options,
+			       const char *dev_name,
+			       const char **path)
+{
+	struct ceph_mount_options *fsopt;
+	const char *dev_name_end;
+	int err = -ENOMEM;
+
+	fsopt = kzalloc(sizeof(*fsopt), GFP_KERNEL);
+	if (!fsopt)
+		return -ENOMEM;
+
+	dout("parse_mount_options %p, dev_name '%s'\n", fsopt, dev_name);
+
+        fsopt->sb_flags = flags;
+        fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
+
+        fsopt->rsize = CEPH_MOUNT_RSIZE_DEFAULT;
+        fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
+        fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
+        fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
+        fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
+        fsopt->congestion_kb = default_congestion_kb();
+	
+        /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
+        err = -EINVAL;
+        if (!dev_name)
+                goto out;
+        *path = strstr(dev_name, ":/");
+        if (*path == NULL) {
+                pr_err("device name is missing path (no :/ in %s)\n",
+                       dev_name);
+                goto out;
+        }
+	dev_name_end = *path;
+	dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
+
+	/* path on server */
+	*path += 2;
+	dout("server path '%s'\n", *path);
+
+	err = ceph_parse_options(popt, options, dev_name, dev_name_end,
+				 parse_fsopt_token, (void *)fsopt);
+	if (err)
+		goto out;
+
+	/* success */
+	*pfsopt = fsopt;
+	return 0;
+
+out:
+	destroy_mount_options(fsopt);
+	return err;
 }
 
 /**
@@ -156,60 +333,176 @@
  */
 static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
 {
-	struct ceph_client *client = ceph_sb_to_client(mnt->mnt_sb);
-	struct ceph_mount_args *args = client->mount_args;
+	struct ceph_fs_client *fsc = ceph_sb_to_client(mnt->mnt_sb);
+	struct ceph_mount_options *fsopt = fsc->mount_options;
+	struct ceph_options *opt = fsc->client->options;
 
-	if (args->flags & CEPH_OPT_FSID)
-		seq_printf(m, ",fsid=%pU", &args->fsid);
-	if (args->flags & CEPH_OPT_NOSHARE)
+	if (opt->flags & CEPH_OPT_FSID)
+		seq_printf(m, ",fsid=%pU", &opt->fsid);
+	if (opt->flags & CEPH_OPT_NOSHARE)
 		seq_puts(m, ",noshare");
-	if (args->flags & CEPH_OPT_DIRSTAT)
-		seq_puts(m, ",dirstat");
-	if ((args->flags & CEPH_OPT_RBYTES) == 0)
-		seq_puts(m, ",norbytes");
-	if (args->flags & CEPH_OPT_NOCRC)
+	if (opt->flags & CEPH_OPT_NOCRC)
 		seq_puts(m, ",nocrc");
-	if (args->flags & CEPH_OPT_NOASYNCREADDIR)
+
+	if (opt->name)
+		seq_printf(m, ",name=%s", opt->name);
+	if (opt->secret)
+		seq_puts(m, ",secret=<hidden>");
+
+	if (opt->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT)
+		seq_printf(m, ",mount_timeout=%d", opt->mount_timeout);
+	if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
+		seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl);
+	if (opt->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT)
+		seq_printf(m, ",osdtimeout=%d", opt->osd_timeout);
+	if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
+		seq_printf(m, ",osdkeepalivetimeout=%d",
+			   opt->osd_keepalive_timeout);
+
+	if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT)
+		seq_puts(m, ",dirstat");
+	if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES) == 0)
+		seq_puts(m, ",norbytes");
+	if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
 		seq_puts(m, ",noasyncreaddir");
 
-	if (args->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT)
-		seq_printf(m, ",mount_timeout=%d", args->mount_timeout);
-	if (args->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
-		seq_printf(m, ",osd_idle_ttl=%d", args->osd_idle_ttl);
-	if (args->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT)
-		seq_printf(m, ",osdtimeout=%d", args->osd_timeout);
-	if (args->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
-		seq_printf(m, ",osdkeepalivetimeout=%d",
-			 args->osd_keepalive_timeout);
-	if (args->wsize)
-		seq_printf(m, ",wsize=%d", args->wsize);
-	if (args->rsize != CEPH_MOUNT_RSIZE_DEFAULT)
-		seq_printf(m, ",rsize=%d", args->rsize);
-	if (args->congestion_kb != default_congestion_kb())
-		seq_printf(m, ",write_congestion_kb=%d", args->congestion_kb);
-	if (args->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
+	if (fsopt->wsize)
+		seq_printf(m, ",wsize=%d", fsopt->wsize);
+	if (fsopt->rsize != CEPH_MOUNT_RSIZE_DEFAULT)
+		seq_printf(m, ",rsize=%d", fsopt->rsize);
+	if (fsopt->congestion_kb != default_congestion_kb())
+		seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
+	if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
 		seq_printf(m, ",caps_wanted_delay_min=%d",
-			 args->caps_wanted_delay_min);
-	if (args->caps_wanted_delay_max != CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT)
+			 fsopt->caps_wanted_delay_min);
+	if (fsopt->caps_wanted_delay_max != CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT)
 		seq_printf(m, ",caps_wanted_delay_max=%d",
-			   args->caps_wanted_delay_max);
-	if (args->cap_release_safety != CEPH_CAP_RELEASE_SAFETY_DEFAULT)
+			   fsopt->caps_wanted_delay_max);
+	if (fsopt->cap_release_safety != CEPH_CAP_RELEASE_SAFETY_DEFAULT)
 		seq_printf(m, ",cap_release_safety=%d",
-			   args->cap_release_safety);
-	if (args->max_readdir != CEPH_MAX_READDIR_DEFAULT)
-		seq_printf(m, ",readdir_max_entries=%d", args->max_readdir);
-	if (args->max_readdir_bytes != CEPH_MAX_READDIR_BYTES_DEFAULT)
-		seq_printf(m, ",readdir_max_bytes=%d", args->max_readdir_bytes);
-	if (strcmp(args->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT))
-		seq_printf(m, ",snapdirname=%s", args->snapdir_name);
-	if (args->name)
-		seq_printf(m, ",name=%s", args->name);
-	if (args->secret)
-		seq_puts(m, ",secret=<hidden>");
+			   fsopt->cap_release_safety);
+	if (fsopt->max_readdir != CEPH_MAX_READDIR_DEFAULT)
+		seq_printf(m, ",readdir_max_entries=%d", fsopt->max_readdir);
+	if (fsopt->max_readdir_bytes != CEPH_MAX_READDIR_BYTES_DEFAULT)
+		seq_printf(m, ",readdir_max_bytes=%d", fsopt->max_readdir_bytes);
+	if (strcmp(fsopt->snapdir_name, CEPH_SNAPDIRNAME_DEFAULT))
+		seq_printf(m, ",snapdirname=%s", fsopt->snapdir_name);
 	return 0;
 }
 
 /*
+ * handle any mon messages the standard library doesn't understand.
+ * return error if we don't either.
+ */
+static int extra_mon_dispatch(struct ceph_client *client, struct ceph_msg *msg)
+{
+	struct ceph_fs_client *fsc = client->private;
+	int type = le16_to_cpu(msg->hdr.type);
+
+	switch (type) {
+	case CEPH_MSG_MDS_MAP:
+		ceph_mdsc_handle_map(fsc->mdsc, msg);
+		return 0;
+
+	default:
+		return -1;
+	}
+}
+
+/*
+ * create a new fs client
+ */
+struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
+					struct ceph_options *opt)
+{
+	struct ceph_fs_client *fsc;
+	int err = -ENOMEM;
+
+	fsc = kzalloc(sizeof(*fsc), GFP_KERNEL);
+	if (!fsc)
+		return ERR_PTR(-ENOMEM);
+
+	fsc->client = ceph_create_client(opt, fsc);
+	if (IS_ERR(fsc->client)) {
+		err = PTR_ERR(fsc->client);
+		goto fail;
+	}
+	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
+	fsc->client->supported_features |= CEPH_FEATURE_FLOCK;
+	fsc->client->monc.want_mdsmap = 1;
+
+	fsc->mount_options = fsopt;
+
+	fsc->sb = NULL;
+	fsc->mount_state = CEPH_MOUNT_MOUNTING;
+
+	atomic_long_set(&fsc->writeback_count, 0);
+
+	err = bdi_init(&fsc->backing_dev_info);
+	if (err < 0)
+		goto fail_client;
+
+	err = -ENOMEM;
+	fsc->wb_wq = create_workqueue("ceph-writeback");
+	if (fsc->wb_wq == NULL)
+		goto fail_bdi;
+	fsc->pg_inv_wq = create_singlethread_workqueue("ceph-pg-invalid");
+	if (fsc->pg_inv_wq == NULL)
+		goto fail_wb_wq;
+	fsc->trunc_wq = create_singlethread_workqueue("ceph-trunc");
+	if (fsc->trunc_wq == NULL)
+		goto fail_pg_inv_wq;
+
+	/* set up mempools */
+	err = -ENOMEM;
+	fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10,
+			      fsc->mount_options->wsize >> PAGE_CACHE_SHIFT);
+	if (!fsc->wb_pagevec_pool)
+		goto fail_trunc_wq;
+
+	/* caps */
+	fsc->min_caps = fsopt->max_readdir;
+
+	return fsc;
+
+fail_trunc_wq:
+	destroy_workqueue(fsc->trunc_wq);
+fail_pg_inv_wq:
+	destroy_workqueue(fsc->pg_inv_wq);
+fail_wb_wq:
+	destroy_workqueue(fsc->wb_wq);
+fail_bdi:
+	bdi_destroy(&fsc->backing_dev_info);
+fail_client:
+	ceph_destroy_client(fsc->client);
+fail:
+	kfree(fsc);
+	return ERR_PTR(err);
+}
+
+void destroy_fs_client(struct ceph_fs_client *fsc)
+{
+	dout("destroy_fs_client %p\n", fsc);
+
+	destroy_workqueue(fsc->wb_wq);
+	destroy_workqueue(fsc->pg_inv_wq);
+	destroy_workqueue(fsc->trunc_wq);
+
+	bdi_destroy(&fsc->backing_dev_info);
+
+	mempool_destroy(fsc->wb_pagevec_pool);
+
+	destroy_mount_options(fsc->mount_options);
+
+	ceph_fs_debugfs_cleanup(fsc);
+
+	ceph_destroy_client(fsc->client);
+
+	kfree(fsc);
+	dout("destroy_fs_client %p done\n", fsc);
+}
+
+/*
  * caches
  */
 struct kmem_cache *ceph_inode_cachep;
@@ -274,12 +567,12 @@
  */
 static void ceph_umount_begin(struct super_block *sb)
 {
-	struct ceph_client *client = ceph_sb_to_client(sb);
+	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
 
 	dout("ceph_umount_begin - starting forced umount\n");
-	if (!client)
+	if (!fsc)
 		return;
-	client->mount_state = CEPH_MOUNT_SHUTDOWN;
+	fsc->mount_state = CEPH_MOUNT_SHUTDOWN;
 	return;
 }
 
@@ -294,483 +587,15 @@
 	.umount_begin   = ceph_umount_begin,
 };
 
-
-const char *ceph_msg_type_name(int type)
-{
-	switch (type) {
-	case CEPH_MSG_SHUTDOWN: return "shutdown";
-	case CEPH_MSG_PING: return "ping";
-	case CEPH_MSG_AUTH: return "auth";
-	case CEPH_MSG_AUTH_REPLY: return "auth_reply";
-	case CEPH_MSG_MON_MAP: return "mon_map";
-	case CEPH_MSG_MON_GET_MAP: return "mon_get_map";
-	case CEPH_MSG_MON_SUBSCRIBE: return "mon_subscribe";
-	case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack";
-	case CEPH_MSG_STATFS: return "statfs";
-	case CEPH_MSG_STATFS_REPLY: return "statfs_reply";
-	case CEPH_MSG_MDS_MAP: return "mds_map";
-	case CEPH_MSG_CLIENT_SESSION: return "client_session";
-	case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect";
-	case CEPH_MSG_CLIENT_REQUEST: return "client_request";
-	case CEPH_MSG_CLIENT_REQUEST_FORWARD: return "client_request_forward";
-	case CEPH_MSG_CLIENT_REPLY: return "client_reply";
-	case CEPH_MSG_CLIENT_CAPS: return "client_caps";
-	case CEPH_MSG_CLIENT_CAPRELEASE: return "client_cap_release";
-	case CEPH_MSG_CLIENT_SNAP: return "client_snap";
-	case CEPH_MSG_CLIENT_LEASE: return "client_lease";
-	case CEPH_MSG_OSD_MAP: return "osd_map";
-	case CEPH_MSG_OSD_OP: return "osd_op";
-	case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
-	default: return "unknown";
-	}
-}
-
-
-/*
- * mount options
- */
-enum {
-	Opt_wsize,
-	Opt_rsize,
-	Opt_osdtimeout,
-	Opt_osdkeepalivetimeout,
-	Opt_mount_timeout,
-	Opt_osd_idle_ttl,
-	Opt_caps_wanted_delay_min,
-	Opt_caps_wanted_delay_max,
-	Opt_cap_release_safety,
-	Opt_readdir_max_entries,
-	Opt_readdir_max_bytes,
-	Opt_congestion_kb,
-	Opt_last_int,
-	/* int args above */
-	Opt_fsid,
-	Opt_snapdirname,
-	Opt_name,
-	Opt_secret,
-	Opt_last_string,
-	/* string args above */
-	Opt_ip,
-	Opt_noshare,
-	Opt_dirstat,
-	Opt_nodirstat,
-	Opt_rbytes,
-	Opt_norbytes,
-	Opt_nocrc,
-	Opt_noasyncreaddir,
-};
-
-static match_table_t arg_tokens = {
-	{Opt_wsize, "wsize=%d"},
-	{Opt_rsize, "rsize=%d"},
-	{Opt_osdtimeout, "osdtimeout=%d"},
-	{Opt_osdkeepalivetimeout, "osdkeepalive=%d"},
-	{Opt_mount_timeout, "mount_timeout=%d"},
-	{Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
-	{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
-	{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
-	{Opt_cap_release_safety, "cap_release_safety=%d"},
-	{Opt_readdir_max_entries, "readdir_max_entries=%d"},
-	{Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
-	{Opt_congestion_kb, "write_congestion_kb=%d"},
-	/* int args above */
-	{Opt_fsid, "fsid=%s"},
-	{Opt_snapdirname, "snapdirname=%s"},
-	{Opt_name, "name=%s"},
-	{Opt_secret, "secret=%s"},
-	/* string args above */
-	{Opt_ip, "ip=%s"},
-	{Opt_noshare, "noshare"},
-	{Opt_dirstat, "dirstat"},
-	{Opt_nodirstat, "nodirstat"},
-	{Opt_rbytes, "rbytes"},
-	{Opt_norbytes, "norbytes"},
-	{Opt_nocrc, "nocrc"},
-	{Opt_noasyncreaddir, "noasyncreaddir"},
-	{-1, NULL}
-};
-
-static int parse_fsid(const char *str, struct ceph_fsid *fsid)
-{
-	int i = 0;
-	char tmp[3];
-	int err = -EINVAL;
-	int d;
-
-	dout("parse_fsid '%s'\n", str);
-	tmp[2] = 0;
-	while (*str && i < 16) {
-		if (ispunct(*str)) {
-			str++;
-			continue;
-		}
-		if (!isxdigit(str[0]) || !isxdigit(str[1]))
-			break;
-		tmp[0] = str[0];
-		tmp[1] = str[1];
-		if (sscanf(tmp, "%x", &d) < 1)
-			break;
-		fsid->fsid[i] = d & 0xff;
-		i++;
-		str += 2;
-	}
-
-	if (i == 16)
-		err = 0;
-	dout("parse_fsid ret %d got fsid %pU", err, fsid);
-	return err;
-}
-
-static struct ceph_mount_args *parse_mount_args(int flags, char *options,
-						const char *dev_name,
-						const char **path)
-{
-	struct ceph_mount_args *args;
-	const char *c;
-	int err = -ENOMEM;
-	substring_t argstr[MAX_OPT_ARGS];
-
-	args = kzalloc(sizeof(*args), GFP_KERNEL);
-	if (!args)
-		return ERR_PTR(-ENOMEM);
-	args->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*args->mon_addr),
-				 GFP_KERNEL);
-	if (!args->mon_addr)
-		goto out;
-
-	dout("parse_mount_args %p, dev_name '%s'\n", args, dev_name);
-
-	/* start with defaults */
-	args->sb_flags = flags;
-	args->flags = CEPH_OPT_DEFAULT;
-	args->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
-	args->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
-	args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
-	args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;   /* seconds */
-	args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
-	args->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
-	args->rsize = CEPH_MOUNT_RSIZE_DEFAULT;
-	args->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
-	args->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
-	args->max_readdir = CEPH_MAX_READDIR_DEFAULT;
-	args->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
-	args->congestion_kb = default_congestion_kb();
-
-	/* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
-	err = -EINVAL;
-	if (!dev_name)
-		goto out;
-	*path = strstr(dev_name, ":/");
-	if (*path == NULL) {
-		pr_err("device name is missing path (no :/ in %s)\n",
-		       dev_name);
-		goto out;
-	}
-
-	/* get mon ip(s) */
-	err = ceph_parse_ips(dev_name, *path, args->mon_addr,
-			     CEPH_MAX_MON, &args->num_mon);
-	if (err < 0)
-		goto out;
-
-	/* path on server */
-	*path += 2;
-	dout("server path '%s'\n", *path);
-
-	/* parse mount options */
-	while ((c = strsep(&options, ",")) != NULL) {
-		int token, intval, ret;
-		if (!*c)
-			continue;
-		err = -EINVAL;
-		token = match_token((char *)c, arg_tokens, argstr);
-		if (token < 0) {
-			pr_err("bad mount option at '%s'\n", c);
-			goto out;
-		}
-		if (token < Opt_last_int) {
-			ret = match_int(&argstr[0], &intval);
-			if (ret < 0) {
-				pr_err("bad mount option arg (not int) "
-				       "at '%s'\n", c);
-				continue;
-			}
-			dout("got int token %d val %d\n", token, intval);
-		} else if (token > Opt_last_int && token < Opt_last_string) {
-			dout("got string token %d val %s\n", token,
-			     argstr[0].from);
-		} else {
-			dout("got token %d\n", token);
-		}
-		switch (token) {
-		case Opt_ip:
-			err = ceph_parse_ips(argstr[0].from,
-					     argstr[0].to,
-					     &args->my_addr,
-					     1, NULL);
-			if (err < 0)
-				goto out;
-			args->flags |= CEPH_OPT_MYIP;
-			break;
-
-		case Opt_fsid:
-			err = parse_fsid(argstr[0].from, &args->fsid);
-			if (err == 0)
-				args->flags |= CEPH_OPT_FSID;
-			break;
-		case Opt_snapdirname:
-			kfree(args->snapdir_name);
-			args->snapdir_name = kstrndup(argstr[0].from,
-					      argstr[0].to-argstr[0].from,
-					      GFP_KERNEL);
-			break;
-		case Opt_name:
-			args->name = kstrndup(argstr[0].from,
-					      argstr[0].to-argstr[0].from,
-					      GFP_KERNEL);
-			break;
-		case Opt_secret:
-			args->secret = kstrndup(argstr[0].from,
-						argstr[0].to-argstr[0].from,
-						GFP_KERNEL);
-			break;
-
-			/* misc */
-		case Opt_wsize:
-			args->wsize = intval;
-			break;
-		case Opt_rsize:
-			args->rsize = intval;
-			break;
-		case Opt_osdtimeout:
-			args->osd_timeout = intval;
-			break;
-		case Opt_osdkeepalivetimeout:
-			args->osd_keepalive_timeout = intval;
-			break;
-		case Opt_osd_idle_ttl:
-			args->osd_idle_ttl = intval;
-			break;
-		case Opt_mount_timeout:
-			args->mount_timeout = intval;
-			break;
-		case Opt_caps_wanted_delay_min:
-			args->caps_wanted_delay_min = intval;
-			break;
-		case Opt_caps_wanted_delay_max:
-			args->caps_wanted_delay_max = intval;
-			break;
-		case Opt_readdir_max_entries:
-			args->max_readdir = intval;
-			break;
-		case Opt_readdir_max_bytes:
-			args->max_readdir_bytes = intval;
-			break;
-		case Opt_congestion_kb:
-			args->congestion_kb = intval;
-			break;
-
-		case Opt_noshare:
-			args->flags |= CEPH_OPT_NOSHARE;
-			break;
-
-		case Opt_dirstat:
-			args->flags |= CEPH_OPT_DIRSTAT;
-			break;
-		case Opt_nodirstat:
-			args->flags &= ~CEPH_OPT_DIRSTAT;
-			break;
-		case Opt_rbytes:
-			args->flags |= CEPH_OPT_RBYTES;
-			break;
-		case Opt_norbytes:
-			args->flags &= ~CEPH_OPT_RBYTES;
-			break;
-		case Opt_nocrc:
-			args->flags |= CEPH_OPT_NOCRC;
-			break;
-		case Opt_noasyncreaddir:
-			args->flags |= CEPH_OPT_NOASYNCREADDIR;
-			break;
-
-		default:
-			BUG_ON(token);
-		}
-	}
-	return args;
-
-out:
-	kfree(args->mon_addr);
-	kfree(args);
-	return ERR_PTR(err);
-}
-
-static void destroy_mount_args(struct ceph_mount_args *args)
-{
-	dout("destroy_mount_args %p\n", args);
-	kfree(args->snapdir_name);
-	args->snapdir_name = NULL;
-	kfree(args->name);
-	args->name = NULL;
-	kfree(args->secret);
-	args->secret = NULL;
-	kfree(args);
-}
-
-/*
- * create a fresh client instance
- */
-static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
-{
-	struct ceph_client *client;
-	int err = -ENOMEM;
-
-	client = kzalloc(sizeof(*client), GFP_KERNEL);
-	if (client == NULL)
-		return ERR_PTR(-ENOMEM);
-
-	mutex_init(&client->mount_mutex);
-
-	init_waitqueue_head(&client->auth_wq);
-
-	client->sb = NULL;
-	client->mount_state = CEPH_MOUNT_MOUNTING;
-	client->mount_args = args;
-
-	client->msgr = NULL;
-
-	client->auth_err = 0;
-	atomic_long_set(&client->writeback_count, 0);
-
-	err = bdi_init(&client->backing_dev_info);
-	if (err < 0)
-		goto fail;
-
-	err = -ENOMEM;
-	client->wb_wq = create_workqueue("ceph-writeback");
-	if (client->wb_wq == NULL)
-		goto fail_bdi;
-	client->pg_inv_wq = create_singlethread_workqueue("ceph-pg-invalid");
-	if (client->pg_inv_wq == NULL)
-		goto fail_wb_wq;
-	client->trunc_wq = create_singlethread_workqueue("ceph-trunc");
-	if (client->trunc_wq == NULL)
-		goto fail_pg_inv_wq;
-
-	/* set up mempools */
-	err = -ENOMEM;
-	client->wb_pagevec_pool = mempool_create_kmalloc_pool(10,
-			      client->mount_args->wsize >> PAGE_CACHE_SHIFT);
-	if (!client->wb_pagevec_pool)
-		goto fail_trunc_wq;
-
-	/* caps */
-	client->min_caps = args->max_readdir;
-
-	/* subsystems */
-	err = ceph_monc_init(&client->monc, client);
-	if (err < 0)
-		goto fail_mempool;
-	err = ceph_osdc_init(&client->osdc, client);
-	if (err < 0)
-		goto fail_monc;
-	err = ceph_mdsc_init(&client->mdsc, client);
-	if (err < 0)
-		goto fail_osdc;
-	return client;
-
-fail_osdc:
-	ceph_osdc_stop(&client->osdc);
-fail_monc:
-	ceph_monc_stop(&client->monc);
-fail_mempool:
-	mempool_destroy(client->wb_pagevec_pool);
-fail_trunc_wq:
-	destroy_workqueue(client->trunc_wq);
-fail_pg_inv_wq:
-	destroy_workqueue(client->pg_inv_wq);
-fail_wb_wq:
-	destroy_workqueue(client->wb_wq);
-fail_bdi:
-	bdi_destroy(&client->backing_dev_info);
-fail:
-	kfree(client);
-	return ERR_PTR(err);
-}
-
-static void ceph_destroy_client(struct ceph_client *client)
-{
-	dout("destroy_client %p\n", client);
-
-	/* unmount */
-	ceph_mdsc_stop(&client->mdsc);
-	ceph_osdc_stop(&client->osdc);
-
-	/*
-	 * make sure mds and osd connections close out before destroying
-	 * the auth module, which is needed to free those connections'
-	 * ceph_authorizers.
-	 */
-	ceph_msgr_flush();
-
-	ceph_monc_stop(&client->monc);
-
-	ceph_debugfs_client_cleanup(client);
-	destroy_workqueue(client->wb_wq);
-	destroy_workqueue(client->pg_inv_wq);
-	destroy_workqueue(client->trunc_wq);
-
-	bdi_destroy(&client->backing_dev_info);
-
-	if (client->msgr)
-		ceph_messenger_destroy(client->msgr);
-	mempool_destroy(client->wb_pagevec_pool);
-
-	destroy_mount_args(client->mount_args);
-
-	kfree(client);
-	dout("destroy_client %p done\n", client);
-}
-
-/*
- * Initially learn our fsid, or verify an fsid matches.
- */
-int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid)
-{
-	if (client->have_fsid) {
-		if (ceph_fsid_compare(&client->fsid, fsid)) {
-			pr_err("bad fsid, had %pU got %pU",
-			       &client->fsid, fsid);
-			return -1;
-		}
-	} else {
-		pr_info("client%lld fsid %pU\n", client->monc.auth->global_id,
-			fsid);
-		memcpy(&client->fsid, fsid, sizeof(*fsid));
-		ceph_debugfs_client_init(client);
-		client->have_fsid = true;
-	}
-	return 0;
-}
-
-/*
- * true if we have the mon map (and have thus joined the cluster)
- */
-static int have_mon_and_osd_map(struct ceph_client *client)
-{
-	return client->monc.monmap && client->monc.monmap->epoch &&
-	       client->osdc.osdmap && client->osdc.osdmap->epoch;
-}
-
 /*
  * Bootstrap mount by opening the root directory.  Note the mount
  * @started time from caller, and time out if this takes too long.
  */
-static struct dentry *open_root_dentry(struct ceph_client *client,
+static struct dentry *open_root_dentry(struct ceph_fs_client *fsc,
 				       const char *path,
 				       unsigned long started)
 {
-	struct ceph_mds_client *mdsc = &client->mdsc;
+	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_request *req = NULL;
 	int err;
 	struct dentry *root;
@@ -784,14 +609,14 @@
 	req->r_ino1.ino = CEPH_INO_ROOT;
 	req->r_ino1.snap = CEPH_NOSNAP;
 	req->r_started = started;
-	req->r_timeout = client->mount_args->mount_timeout * HZ;
+	req->r_timeout = fsc->client->options->mount_timeout * HZ;
 	req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
 	req->r_num_caps = 2;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
 	if (err == 0) {
 		dout("open_root_inode success\n");
 		if (ceph_ino(req->r_target_inode) == CEPH_INO_ROOT &&
-		    client->sb->s_root == NULL)
+		    fsc->sb->s_root == NULL)
 			root = d_alloc_root(req->r_target_inode);
 		else
 			root = d_obtain_alias(req->r_target_inode);
@@ -804,105 +629,86 @@
 	return root;
 }
 
+
+
+
 /*
  * mount: join the ceph cluster, and open root directory.
  */
-static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
+static int ceph_mount(struct ceph_fs_client *fsc, struct vfsmount *mnt,
 		      const char *path)
 {
-	struct ceph_entity_addr *myaddr = NULL;
 	int err;
-	unsigned long timeout = client->mount_args->mount_timeout * HZ;
 	unsigned long started = jiffies;  /* note the start time */
 	struct dentry *root;
+	int first = 0;   /* first vfsmount for this super_block */
 
 	dout("mount start\n");
-	mutex_lock(&client->mount_mutex);
+	mutex_lock(&fsc->client->mount_mutex);
 
-	/* initialize the messenger */
-	if (client->msgr == NULL) {
-		if (ceph_test_opt(client, MYIP))
-			myaddr = &client->mount_args->my_addr;
-		client->msgr = ceph_messenger_create(myaddr);
-		if (IS_ERR(client->msgr)) {
-			err = PTR_ERR(client->msgr);
-			client->msgr = NULL;
-			goto out;
-		}
-		client->msgr->nocrc = ceph_test_opt(client, NOCRC);
-	}
-
-	/* open session, and wait for mon, mds, and osd maps */
-	err = ceph_monc_open_session(&client->monc);
+	err = __ceph_open_session(fsc->client, started);
 	if (err < 0)
 		goto out;
 
-	while (!have_mon_and_osd_map(client)) {
-		err = -EIO;
-		if (timeout && time_after_eq(jiffies, started + timeout))
-			goto out;
-
-		/* wait */
-		dout("mount waiting for mon_map\n");
-		err = wait_event_interruptible_timeout(client->auth_wq,
-		       have_mon_and_osd_map(client) || (client->auth_err < 0),
-		       timeout);
-		if (err == -EINTR || err == -ERESTARTSYS)
-			goto out;
-		if (client->auth_err < 0) {
-			err = client->auth_err;
-			goto out;
-		}
-	}
-
 	dout("mount opening root\n");
-	root = open_root_dentry(client, "", started);
+	root = open_root_dentry(fsc, "", started);
 	if (IS_ERR(root)) {
 		err = PTR_ERR(root);
 		goto out;
 	}
-	if (client->sb->s_root)
+	if (fsc->sb->s_root) {
 		dput(root);
-	else
-		client->sb->s_root = root;
+	} else {
+		fsc->sb->s_root = root;
+		first = 1;
+
+		err = ceph_fs_debugfs_init(fsc);
+		if (err < 0)
+			goto fail;
+	}
 
 	if (path[0] == 0) {
 		dget(root);
 	} else {
 		dout("mount opening base mountpoint\n");
-		root = open_root_dentry(client, path, started);
+		root = open_root_dentry(fsc, path, started);
 		if (IS_ERR(root)) {
 			err = PTR_ERR(root);
-			dput(client->sb->s_root);
-			client->sb->s_root = NULL;
-			goto out;
+			goto fail;
 		}
 	}
 
 	mnt->mnt_root = root;
-	mnt->mnt_sb = client->sb;
+	mnt->mnt_sb = fsc->sb;
 
-	client->mount_state = CEPH_MOUNT_MOUNTED;
+	fsc->mount_state = CEPH_MOUNT_MOUNTED;
 	dout("mount success\n");
 	err = 0;
 
 out:
-	mutex_unlock(&client->mount_mutex);
+	mutex_unlock(&fsc->client->mount_mutex);
 	return err;
+
+fail:
+	if (first) {
+		dput(fsc->sb->s_root);
+		fsc->sb->s_root = NULL;
+	}
+	goto out;
 }
 
 static int ceph_set_super(struct super_block *s, void *data)
 {
-	struct ceph_client *client = data;
+	struct ceph_fs_client *fsc = data;
 	int ret;
 
 	dout("set_super %p data %p\n", s, data);
 
-	s->s_flags = client->mount_args->sb_flags;
+	s->s_flags = fsc->mount_options->sb_flags;
 	s->s_maxbytes = 1ULL << 40;  /* temp value until we get mdsmap */
 
-	s->s_fs_info = client;
-	client->sb = s;
+	s->s_fs_info = fsc;
+	fsc->sb = s;
 
 	s->s_op = &ceph_super_ops;
 	s->s_export_op = &ceph_export_ops;
@@ -917,7 +723,7 @@
 
 fail:
 	s->s_fs_info = NULL;
-	client->sb = NULL;
+	fsc->sb = NULL;
 	return ret;
 }
 
@@ -926,30 +732,23 @@
  */
 static int ceph_compare_super(struct super_block *sb, void *data)
 {
-	struct ceph_client *new = data;
-	struct ceph_mount_args *args = new->mount_args;
-	struct ceph_client *other = ceph_sb_to_client(sb);
-	int i;
+	struct ceph_fs_client *new = data;
+	struct ceph_mount_options *fsopt = new->mount_options;
+	struct ceph_options *opt = new->client->options;
+	struct ceph_fs_client *other = ceph_sb_to_client(sb);
 
 	dout("ceph_compare_super %p\n", sb);
-	if (args->flags & CEPH_OPT_FSID) {
-		if (ceph_fsid_compare(&args->fsid, &other->fsid)) {
-			dout("fsid doesn't match\n");
-			return 0;
-		}
-	} else {
-		/* do we share (a) monitor? */
-		for (i = 0; i < new->monc.monmap->num_mon; i++)
-			if (ceph_monmap_contains(other->monc.monmap,
-					 &new->monc.monmap->mon_inst[i].addr))
-				break;
-		if (i == new->monc.monmap->num_mon) {
-			dout("mon ip not part of monmap\n");
-			return 0;
-		}
-		dout("mon ip matches existing sb %p\n", sb);
+
+	if (compare_mount_options(fsopt, opt, other)) {
+		dout("monitor(s)/mount options don't match\n");
+		return 0;
 	}
-	if (args->sb_flags != other->mount_args->sb_flags) {
+	if ((opt->flags & CEPH_OPT_FSID) &&
+	    ceph_fsid_compare(&opt->fsid, &other->client->fsid)) {
+		dout("fsid doesn't match\n");
+		return 0;
+	}
+	if (fsopt->sb_flags != other->mount_options->sb_flags) {
 		dout("flags differ\n");
 		return 0;
 	}
@@ -961,19 +760,20 @@
  */
 static atomic_long_t bdi_seq = ATOMIC_LONG_INIT(0);
 
-static int ceph_register_bdi(struct super_block *sb, struct ceph_client *client)
+static int ceph_register_bdi(struct super_block *sb,
+			     struct ceph_fs_client *fsc)
 {
 	int err;
 
 	/* set ra_pages based on rsize mount option? */
-	if (client->mount_args->rsize >= PAGE_CACHE_SIZE)
-		client->backing_dev_info.ra_pages =
-			(client->mount_args->rsize + PAGE_CACHE_SIZE - 1)
+	if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
+		fsc->backing_dev_info.ra_pages =
+			(fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
 			>> PAGE_SHIFT;
-	err = bdi_register(&client->backing_dev_info, NULL, "ceph-%d",
+	err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d",
 			   atomic_long_inc_return(&bdi_seq));
 	if (!err)
-		sb->s_bdi = &client->backing_dev_info;
+		sb->s_bdi = &fsc->backing_dev_info;
 	return err;
 }
 
@@ -982,46 +782,52 @@
 		       struct vfsmount *mnt)
 {
 	struct super_block *sb;
-	struct ceph_client *client;
+	struct ceph_fs_client *fsc;
 	int err;
 	int (*compare_super)(struct super_block *, void *) = ceph_compare_super;
 	const char *path = NULL;
-	struct ceph_mount_args *args;
+	struct ceph_mount_options *fsopt = NULL;
+	struct ceph_options *opt = NULL;
 
 	dout("ceph_get_sb\n");
-	args = parse_mount_args(flags, data, dev_name, &path);
-	if (IS_ERR(args)) {
-		err = PTR_ERR(args);
+	err = parse_mount_options(&fsopt, &opt, flags, data, dev_name, &path);
+	if (err < 0)
 		goto out_final;
-	}
 
 	/* create client (which we may/may not use) */
-	client = ceph_create_client(args);
-	if (IS_ERR(client)) {
-		err = PTR_ERR(client);
+	fsc = create_fs_client(fsopt, opt);
+	if (IS_ERR(fsc)) {
+		err = PTR_ERR(fsc);
+		kfree(fsopt);
+		kfree(opt);
 		goto out_final;
 	}
 
-	if (client->mount_args->flags & CEPH_OPT_NOSHARE)
+	err = ceph_mdsc_init(fsc);
+	if (err < 0)
+		goto out;
+
+	if (ceph_test_opt(fsc->client, NOSHARE))
 		compare_super = NULL;
-	sb = sget(fs_type, compare_super, ceph_set_super, client);
+	sb = sget(fs_type, compare_super, ceph_set_super, fsc);
 	if (IS_ERR(sb)) {
 		err = PTR_ERR(sb);
 		goto out;
 	}
 
-	if (ceph_sb_to_client(sb) != client) {
-		ceph_destroy_client(client);
-		client = ceph_sb_to_client(sb);
-		dout("get_sb got existing client %p\n", client);
+	if (ceph_sb_to_client(sb) != fsc) {
+		ceph_mdsc_destroy(fsc);
+		destroy_fs_client(fsc);
+		fsc = ceph_sb_to_client(sb);
+		dout("get_sb got existing client %p\n", fsc);
 	} else {
-		dout("get_sb using new client %p\n", client);
-		err = ceph_register_bdi(sb, client);
+		dout("get_sb using new client %p\n", fsc);
+		err = ceph_register_bdi(sb, fsc);
 		if (err < 0)
 			goto out_splat;
 	}
 
-	err = ceph_mount(client, mnt, path);
+	err = ceph_mount(fsc, mnt, path);
 	if (err < 0)
 		goto out_splat;
 	dout("root %p inode %p ino %llx.%llx\n", mnt->mnt_root,
@@ -1029,12 +835,13 @@
 	return 0;
 
 out_splat:
-	ceph_mdsc_close_sessions(&client->mdsc);
+	ceph_mdsc_close_sessions(fsc->mdsc);
 	deactivate_locked_super(sb);
 	goto out_final;
 
 out:
-	ceph_destroy_client(client);
+	ceph_mdsc_destroy(fsc);
+	destroy_fs_client(fsc);
 out_final:
 	dout("ceph_get_sb fail %d\n", err);
 	return err;
@@ -1042,11 +849,12 @@
 
 static void ceph_kill_sb(struct super_block *s)
 {
-	struct ceph_client *client = ceph_sb_to_client(s);
+	struct ceph_fs_client *fsc = ceph_sb_to_client(s);
 	dout("kill_sb %p\n", s);
-	ceph_mdsc_pre_umount(&client->mdsc);
+	ceph_mdsc_pre_umount(fsc->mdsc);
 	kill_anon_super(s);    /* will call put_super after sb is r/o */
-	ceph_destroy_client(client);
+	ceph_mdsc_destroy(fsc);
+	destroy_fs_client(fsc);
 }
 
 static struct file_system_type ceph_fs_type = {
@@ -1062,36 +870,20 @@
 
 static int __init init_ceph(void)
 {
-	int ret = 0;
-
-	ret = ceph_debugfs_init();
-	if (ret < 0)
-		goto out;
-
-	ret = ceph_msgr_init();
-	if (ret < 0)
-		goto out_debugfs;
-
-	ret = init_caches();
+	int ret = init_caches();
 	if (ret)
-		goto out_msgr;
+		goto out;
 
 	ret = register_filesystem(&ceph_fs_type);
 	if (ret)
 		goto out_icache;
 
-	pr_info("loaded (mon/mds/osd proto %d/%d/%d, osdmap %d/%d %d/%d)\n",
-		CEPH_MONC_PROTOCOL, CEPH_MDSC_PROTOCOL, CEPH_OSDC_PROTOCOL,
-		CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT,
-		CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT);
+	pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
+
 	return 0;
 
 out_icache:
 	destroy_caches();
-out_msgr:
-	ceph_msgr_exit();
-out_debugfs:
-	ceph_debugfs_cleanup();
 out:
 	return ret;
 }
@@ -1101,8 +893,6 @@
 	dout("exit_ceph\n");
 	unregister_filesystem(&ceph_fs_type);
 	destroy_caches();
-	ceph_msgr_exit();
-	ceph_debugfs_cleanup();
 }
 
 module_init(init_ceph);