Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull ceph updates from Sage Weil:
 "The big item here is support for inline data for CephFS and for
  message signatures from Zheng.  There are also several bug fixes,
  including interrupted flock request handling, 0-length xattrs, mksnap,
  cached readdir results, and a message version compat field.  Finally
  there are several cleanups from Ilya, Dan, and Markus.

  Note that there is another series coming soon that fixes some bugs in
  the RBD 'lingering' requests, but it isn't quite ready yet"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (27 commits)
  ceph: fix setting empty extended attribute
  ceph: fix mksnap crash
  ceph: do_sync is never initialized
  libceph: fixup includes in pagelist.h
  ceph: support inline data feature
  ceph: flush inline version
  ceph: convert inline data to normal data before data write
  ceph: sync read inline data
  ceph: fetch inline data when getting Fcr cap refs
  ceph: use getattr request to fetch inline data
  ceph: add inline data to pagecache
  ceph: parse inline data in MClientReply and MClientCaps
  libceph: specify position of extent operation
  libceph: add CREATE osd operation support
  libceph: add SETXATTR/CMPXATTR osd operations support
  rbd: don't treat CEPH_OSD_OP_DELETE as extent op
  ceph: remove unused stringification macros
  libceph: require cephx message signature by default
  ceph: introduce global empty snap context
  ceph: message versioning fixes
  ...
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 27b71a0..3ec85df 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -2370,8 +2370,12 @@
 		opcode = CEPH_OSD_OP_READ;
 	}
 
-	osd_req_op_extent_init(osd_request, num_ops, opcode, offset, length,
-				0, 0);
+	if (opcode == CEPH_OSD_OP_DELETE)
+		osd_req_op_init(osd_request, num_ops, opcode);
+	else
+		osd_req_op_extent_init(osd_request, num_ops, opcode,
+				       offset, length, 0, 0);
+
 	if (obj_request->type == OBJ_REQUEST_BIO)
 		osd_req_op_extent_osd_data_bio(osd_request, num_ops,
 					obj_request->bio_list, length);
@@ -3405,8 +3409,7 @@
 	if (result)
 		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
 			 obj_op_name(op_type), length, offset, result);
-	if (snapc)
-		ceph_put_snap_context(snapc);
+	ceph_put_snap_context(snapc);
 	blk_end_request_all(rq, result);
 }
 
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 18c06bb..f5013d9 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -192,17 +192,30 @@
 	struct ceph_osd_client *osdc =
 		&ceph_inode_to_client(inode)->client->osdc;
 	int err = 0;
+	u64 off = page_offset(page);
 	u64 len = PAGE_CACHE_SIZE;
 
-	err = ceph_readpage_from_fscache(inode, page);
+	if (off >= i_size_read(inode)) {
+		zero_user_segment(page, err, PAGE_CACHE_SIZE);
+		SetPageUptodate(page);
+		return 0;
+	}
 
+	/*
+	 * Uptodate inline data should have been added into page cache
+	 * while getting Fcr caps.
+	 */
+	if (ci->i_inline_version != CEPH_INLINE_NONE)
+		return -EINVAL;
+
+	err = ceph_readpage_from_fscache(inode, page);
 	if (err == 0)
 		goto out;
 
 	dout("readpage inode %p file %p page %p index %lu\n",
 	     inode, filp, page, page->index);
 	err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
-				  (u64) page_offset(page), &len,
+				  off, &len,
 				  ci->i_truncate_seq, ci->i_truncate_size,
 				  &page, 1, 0);
 	if (err == -ENOENT)
@@ -319,7 +332,7 @@
 	     off, len);
 	vino = ceph_vino(inode);
 	req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
-				    1, CEPH_OSD_OP_READ,
+				    0, 1, CEPH_OSD_OP_READ,
 				    CEPH_OSD_FLAG_READ, NULL,
 				    ci->i_truncate_seq, ci->i_truncate_size,
 				    false);
@@ -384,6 +397,9 @@
 	int rc = 0;
 	int max = 0;
 
+	if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
+		return -EINVAL;
+
 	rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
 					 &nr_pages);
 
@@ -673,7 +689,7 @@
 	int rc = 0;
 	unsigned wsize = 1 << inode->i_blkbits;
 	struct ceph_osd_request *req = NULL;
-	int do_sync;
+	int do_sync = 0;
 	u64 truncate_size, snap_size;
 	u32 truncate_seq;
 
@@ -750,7 +766,6 @@
 	last_snapc = snapc;
 
 	while (!done && index <= end) {
-		int num_ops = do_sync ? 2 : 1;
 		unsigned i;
 		int first;
 		pgoff_t next;
@@ -850,7 +865,8 @@
 				len = wsize;
 				req = ceph_osdc_new_request(&fsc->client->osdc,
 							&ci->i_layout, vino,
-							offset, &len, num_ops,
+							offset, &len, 0,
+							do_sync ? 2 : 1,
 							CEPH_OSD_OP_WRITE,
 							CEPH_OSD_FLAG_WRITE |
 							CEPH_OSD_FLAG_ONDISK,
@@ -862,6 +878,9 @@
 					break;
 				}
 
+				if (do_sync)
+					osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+
 				req->r_callback = writepages_finish;
 				req->r_inode = inode;
 
@@ -1204,6 +1223,7 @@
 	struct inode *inode = file_inode(vma->vm_file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_file_info *fi = vma->vm_file->private_data;
+	struct page *pinned_page = NULL;
 	loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
 	int want, got, ret;
 
@@ -1215,7 +1235,8 @@
 		want = CEPH_CAP_FILE_CACHE;
 	while (1) {
 		got = 0;
-		ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
+		ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want,
+				    -1, &got, &pinned_page);
 		if (ret == 0)
 			break;
 		if (ret != -ERESTARTSYS) {
@@ -1226,12 +1247,54 @@
 	dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
 	     inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got));
 
-	ret = filemap_fault(vma, vmf);
+	if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
+	    ci->i_inline_version == CEPH_INLINE_NONE)
+		ret = filemap_fault(vma, vmf);
+	else
+		ret = -EAGAIN;
 
 	dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
 	     inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
+	if (pinned_page)
+		page_cache_release(pinned_page);
 	ceph_put_cap_refs(ci, got);
 
+	if (ret != -EAGAIN)
+		return ret;
+
+	/* read inline data */
+	if (off >= PAGE_CACHE_SIZE) {
+		/* does not support inline data > PAGE_SIZE */
+		ret = VM_FAULT_SIGBUS;
+	} else {
+		int ret1;
+		struct address_space *mapping = inode->i_mapping;
+		struct page *page = find_or_create_page(mapping, 0,
+						mapping_gfp_mask(mapping) &
+						~__GFP_FS);
+		if (!page) {
+			ret = VM_FAULT_OOM;
+			goto out;
+		}
+		ret1 = __ceph_do_getattr(inode, page,
+					 CEPH_STAT_CAP_INLINE_DATA, true);
+		if (ret1 < 0 || off >= i_size_read(inode)) {
+			unlock_page(page);
+			page_cache_release(page);
+			ret = VM_FAULT_SIGBUS;
+			goto out;
+		}
+		if (ret1 < PAGE_CACHE_SIZE)
+			zero_user_segment(page, ret1, PAGE_CACHE_SIZE);
+		else
+			flush_dcache_page(page);
+		SetPageUptodate(page);
+		vmf->page = page;
+		ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
+	}
+out:
+	dout("filemap_fault %p %llu~%zd read inline data ret %d\n",
+	     inode, off, (size_t)PAGE_CACHE_SIZE, ret);
 	return ret;
 }
 
@@ -1250,6 +1313,19 @@
 	size_t len;
 	int want, got, ret;
 
+	if (ci->i_inline_version != CEPH_INLINE_NONE) {
+		struct page *locked_page = NULL;
+		if (off == 0) {
+			lock_page(page);
+			locked_page = page;
+		}
+		ret = ceph_uninline_data(vma->vm_file, locked_page);
+		if (locked_page)
+			unlock_page(locked_page);
+		if (ret < 0)
+			return VM_FAULT_SIGBUS;
+	}
+
 	if (off + PAGE_CACHE_SIZE <= size)
 		len = PAGE_CACHE_SIZE;
 	else
@@ -1263,7 +1339,8 @@
 		want = CEPH_CAP_FILE_BUFFER;
 	while (1) {
 		got = 0;
-		ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len);
+		ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
+				    &got, NULL);
 		if (ret == 0)
 			break;
 		if (ret != -ERESTARTSYS) {
@@ -1297,11 +1374,13 @@
 			ret = VM_FAULT_SIGBUS;
 	}
 out:
-	if (ret != VM_FAULT_LOCKED) {
+	if (ret != VM_FAULT_LOCKED)
 		unlock_page(page);
-	} else {
+	if (ret == VM_FAULT_LOCKED ||
+	    ci->i_inline_version != CEPH_INLINE_NONE) {
 		int dirty;
 		spin_lock(&ci->i_ceph_lock);
+		ci->i_inline_version = CEPH_INLINE_NONE;
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
@@ -1315,6 +1394,178 @@
 	return ret;
 }
 
+void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
+			   char	*data, size_t len)
+{
+	struct address_space *mapping = inode->i_mapping;
+	struct page *page;
+
+	if (locked_page) {
+		page = locked_page;
+	} else {
+		if (i_size_read(inode) == 0)
+			return;
+		page = find_or_create_page(mapping, 0,
+					   mapping_gfp_mask(mapping) & ~__GFP_FS);
+		if (!page)
+			return;
+		if (PageUptodate(page)) {
+			unlock_page(page);
+			page_cache_release(page);
+			return;
+		}
+	}
+
+	dout("fill_inline_data %p %llx.%llx len %lu locked_page %p\n",
+	     inode, ceph_vinop(inode), len, locked_page);
+
+	if (len > 0) {
+		void *kaddr = kmap_atomic(page);
+		memcpy(kaddr, data, len);
+		kunmap_atomic(kaddr);
+	}
+
+	if (page != locked_page) {
+		if (len < PAGE_CACHE_SIZE)
+			zero_user_segment(page, len, PAGE_CACHE_SIZE);
+		else
+			flush_dcache_page(page);
+
+		SetPageUptodate(page);
+		unlock_page(page);
+		page_cache_release(page);
+	}
+}
+
+int ceph_uninline_data(struct file *filp, struct page *locked_page)
+{
+	struct inode *inode = file_inode(filp);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_osd_request *req;
+	struct page *page = NULL;
+	u64 len, inline_version;
+	int err = 0;
+	bool from_pagecache = false;
+
+	spin_lock(&ci->i_ceph_lock);
+	inline_version = ci->i_inline_version;
+	spin_unlock(&ci->i_ceph_lock);
+
+	dout("uninline_data %p %llx.%llx inline_version %llu\n",
+	     inode, ceph_vinop(inode), inline_version);
+
+	if (inline_version == 1 || /* initial version, no data */
+	    inline_version == CEPH_INLINE_NONE)
+		goto out;
+
+	if (locked_page) {
+		page = locked_page;
+		WARN_ON(!PageUptodate(page));
+	} else if (ceph_caps_issued(ci) &
+		   (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
+		page = find_get_page(inode->i_mapping, 0);
+		if (page) {
+			if (PageUptodate(page)) {
+				from_pagecache = true;
+				lock_page(page);
+			} else {
+				page_cache_release(page);
+				page = NULL;
+			}
+		}
+	}
+
+	if (page) {
+		len = i_size_read(inode);
+		if (len > PAGE_CACHE_SIZE)
+			len = PAGE_CACHE_SIZE;
+	} else {
+		page = __page_cache_alloc(GFP_NOFS);
+		if (!page) {
+			err = -ENOMEM;
+			goto out;
+		}
+		err = __ceph_do_getattr(inode, page,
+					CEPH_STAT_CAP_INLINE_DATA, true);
+		if (err < 0) {
+			/* no inline data */
+			if (err == -ENODATA)
+				err = 0;
+			goto out;
+		}
+		len = err;
+	}
+
+	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+				    ceph_vino(inode), 0, &len, 0, 1,
+				    CEPH_OSD_OP_CREATE,
+				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+				    ci->i_snap_realm->cached_context,
+				    0, 0, false);
+	if (IS_ERR(req)) {
+		err = PTR_ERR(req);
+		goto out;
+	}
+
+	ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
+	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+	if (!err)
+		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+	ceph_osdc_put_request(req);
+	if (err < 0)
+		goto out;
+
+	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+				    ceph_vino(inode), 0, &len, 1, 3,
+				    CEPH_OSD_OP_WRITE,
+				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+				    ci->i_snap_realm->cached_context,
+				    ci->i_truncate_seq, ci->i_truncate_size,
+				    false);
+	if (IS_ERR(req)) {
+		err = PTR_ERR(req);
+		goto out;
+	}
+
+	osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
+
+	err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
+				    "inline_version", &inline_version,
+				    sizeof(inline_version),
+				    CEPH_OSD_CMPXATTR_OP_GT,
+				    CEPH_OSD_CMPXATTR_MODE_U64);
+	if (err)
+		goto out_put;
+
+	err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
+				    "inline_version", &inline_version,
+				    sizeof(inline_version), 0, 0);
+	if (err)
+		goto out_put;
+
+	ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime);
+	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+	if (!err)
+		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
+out_put:
+	ceph_osdc_put_request(req);
+	if (err == -ECANCELED)
+		err = 0;
+out:
+	if (page && page != locked_page) {
+		if (from_pagecache) {
+			unlock_page(page);
+			page_cache_release(page);
+		} else
+			__free_pages(page, 0);
+	}
+
+	dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
+	     inode, ceph_vinop(inode), inline_version, err);
+	return err;
+}
+
 static struct vm_operations_struct ceph_vmops = {
 	.fault		= ceph_filemap_fault,
 	.page_mkwrite	= ceph_page_mkwrite,
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index cefca66..b93c631 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -975,10 +975,12 @@
 			kuid_t uid, kgid_t gid, umode_t mode,
 			u64 xattr_version,
 			struct ceph_buffer *xattrs_buf,
-			u64 follows)
+			u64 follows, bool inline_data)
 {
 	struct ceph_mds_caps *fc;
 	struct ceph_msg *msg;
+	void *p;
+	size_t extra_len;
 
 	dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
 	     " seq %u/%u mseq %u follows %lld size %llu/%llu"
@@ -988,7 +990,10 @@
 	     seq, issue_seq, mseq, follows, size, max_size,
 	     xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
 
-	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
+	/* flock buffer size + inline version + inline data size */
+	extra_len = 4 + 8 + 4;
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
+			   GFP_NOFS, false);
 	if (!msg)
 		return -ENOMEM;
 
@@ -1020,6 +1025,14 @@
 	fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
 	fc->mode = cpu_to_le32(mode);
 
+	p = fc + 1;
+	/* flock buffer size */
+	ceph_encode_32(&p, 0);
+	/* inline version */
+	ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
+	/* inline data size */
+	ceph_encode_32(&p, 0);
+
 	fc->xattr_version = cpu_to_le64(xattr_version);
 	if (xattrs_buf) {
 		msg->middle = ceph_buffer_get(xattrs_buf);
@@ -1126,6 +1139,7 @@
 	u64 flush_tid = 0;
 	int i;
 	int ret;
+	bool inline_data;
 
 	held = cap->issued | cap->implemented;
 	revoking = cap->implemented & ~cap->issued;
@@ -1209,13 +1223,15 @@
 		xattr_version = ci->i_xattrs.version;
 	}
 
+	inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+
 	spin_unlock(&ci->i_ceph_lock);
 
 	ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
 		op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
 		size, max_size, &mtime, &atime, time_warp_seq,
 		uid, gid, mode, xattr_version, xattr_blob,
-		follows);
+		follows, inline_data);
 	if (ret < 0) {
 		dout("error sending cap msg, must requeue %p\n", inode);
 		delayed = 1;
@@ -1336,7 +1352,7 @@
 			     capsnap->time_warp_seq,
 			     capsnap->uid, capsnap->gid, capsnap->mode,
 			     capsnap->xattr_version, capsnap->xattr_blob,
-			     capsnap->follows);
+			     capsnap->follows, capsnap->inline_data);
 
 		next_follows = capsnap->follows + 1;
 		ceph_put_cap_snap(capsnap);
@@ -2057,15 +2073,17 @@
  * requested from the MDS.
  */
 static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
-			    int *got, loff_t endoff, int *check_max, int *err)
+			    loff_t endoff, int *got, struct page **pinned_page,
+			    int *check_max, int *err)
 {
 	struct inode *inode = &ci->vfs_inode;
 	int ret = 0;
-	int have, implemented;
+	int have, implemented, _got = 0;
 	int file_wanted;
 
 	dout("get_cap_refs %p need %s want %s\n", inode,
 	     ceph_cap_string(need), ceph_cap_string(want));
+again:
 	spin_lock(&ci->i_ceph_lock);
 
 	/* make sure file is actually open */
@@ -2075,7 +2093,7 @@
 		     ceph_cap_string(need), ceph_cap_string(file_wanted));
 		*err = -EBADF;
 		ret = 1;
-		goto out;
+		goto out_unlock;
 	}
 
 	/* finish pending truncate */
@@ -2095,7 +2113,7 @@
 				*check_max = 1;
 				ret = 1;
 			}
-			goto out;
+			goto out_unlock;
 		}
 		/*
 		 * If a sync write is in progress, we must wait, so that we
@@ -2103,7 +2121,7 @@
 		 */
 		if (__ceph_have_pending_cap_snap(ci)) {
 			dout("get_cap_refs %p cap_snap_pending\n", inode);
-			goto out;
+			goto out_unlock;
 		}
 	}
 
@@ -2120,18 +2138,50 @@
 		     inode, ceph_cap_string(have), ceph_cap_string(not),
 		     ceph_cap_string(revoking));
 		if ((revoking & not) == 0) {
-			*got = need | (have & want);
-			__take_cap_refs(ci, *got);
+			_got = need | (have & want);
+			__take_cap_refs(ci, _got);
 			ret = 1;
 		}
 	} else {
 		dout("get_cap_refs %p have %s needed %s\n", inode,
 		     ceph_cap_string(have), ceph_cap_string(need));
 	}
-out:
+out_unlock:
 	spin_unlock(&ci->i_ceph_lock);
+
+	if (ci->i_inline_version != CEPH_INLINE_NONE &&
+	    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+	    i_size_read(inode) > 0) {
+		int ret1;
+		struct page *page = find_get_page(inode->i_mapping, 0);
+		if (page) {
+			if (PageUptodate(page)) {
+				*pinned_page = page;
+				goto out;
+			}
+			page_cache_release(page);
+		}
+		/*
+		 * drop cap refs first because getattr while holding
+		 * caps refs can cause deadlock.
+		 */
+		ceph_put_cap_refs(ci, _got);
+		_got = 0;
+
+		/* getattr request will bring inline data into page cache */
+		ret1 = __ceph_do_getattr(inode, NULL,
+					 CEPH_STAT_CAP_INLINE_DATA, true);
+		if (ret1 >= 0) {
+			ret = 0;
+			goto again;
+		}
+		*err = ret1;
+		ret = 1;
+	}
+out:
 	dout("get_cap_refs %p ret %d got %s\n", inode,
-	     ret, ceph_cap_string(*got));
+	     ret, ceph_cap_string(_got));
+	*got = _got;
 	return ret;
 }
 
@@ -2168,8 +2218,8 @@
  * due to a small max_size, make sure we check_max_size (and possibly
  * ask the mds) so we don't get hung up indefinitely.
  */
-int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got,
-		  loff_t endoff)
+int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
+		  loff_t endoff, int *got, struct page **pinned_page)
 {
 	int check_max, ret, err;
 
@@ -2179,8 +2229,8 @@
 	check_max = 0;
 	err = 0;
 	ret = wait_event_interruptible(ci->i_cap_wq,
-				       try_get_cap_refs(ci, need, want,
-							got, endoff,
+				       try_get_cap_refs(ci, need, want, endoff,
+							got, pinned_page,
 							&check_max, &err));
 	if (err)
 		ret = err;
@@ -2383,6 +2433,8 @@
 static void handle_cap_grant(struct ceph_mds_client *mdsc,
 			     struct inode *inode, struct ceph_mds_caps *grant,
 			     void *snaptrace, int snaptrace_len,
+			     u64 inline_version,
+			     void *inline_data, int inline_len,
 			     struct ceph_buffer *xattr_buf,
 			     struct ceph_mds_session *session,
 			     struct ceph_cap *cap, int issued)
@@ -2403,6 +2455,7 @@
 	bool queue_invalidate = false;
 	bool queue_revalidate = false;
 	bool deleted_inode = false;
+	bool fill_inline = false;
 
 	dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
 	     inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2576,6 +2629,13 @@
 	}
 	BUG_ON(cap->issued & ~cap->implemented);
 
+	if (inline_version > 0 && inline_version >= ci->i_inline_version) {
+		ci->i_inline_version = inline_version;
+		if (ci->i_inline_version != CEPH_INLINE_NONE &&
+		    (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
+			fill_inline = true;
+	}
+
 	spin_unlock(&ci->i_ceph_lock);
 
 	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
@@ -2589,6 +2649,9 @@
 			wake = true;
 	}
 
+	if (fill_inline)
+		ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
+
 	if (queue_trunc) {
 		ceph_queue_vmtruncate(inode);
 		ceph_queue_revalidate(inode);
@@ -2996,11 +3059,12 @@
 	u64 cap_id;
 	u64 size, max_size;
 	u64 tid;
+	u64 inline_version = 0;
+	void *inline_data = NULL;
+	u32  inline_len = 0;
 	void *snaptrace;
 	size_t snaptrace_len;
-	void *flock;
-	void *end;
-	u32 flock_len;
+	void *p, *end;
 
 	dout("handle_caps from mds%d\n", mds);
 
@@ -3021,30 +3085,37 @@
 
 	snaptrace = h + 1;
 	snaptrace_len = le32_to_cpu(h->snap_trace_len);
+	p = snaptrace + snaptrace_len;
 
 	if (le16_to_cpu(msg->hdr.version) >= 2) {
-		void *p = snaptrace + snaptrace_len;
+		u32 flock_len;
 		ceph_decode_32_safe(&p, end, flock_len, bad);
 		if (p + flock_len > end)
 			goto bad;
-		flock = p;
-	} else {
-		flock = NULL;
-		flock_len = 0;
+		p += flock_len;
 	}
 
 	if (le16_to_cpu(msg->hdr.version) >= 3) {
 		if (op == CEPH_CAP_OP_IMPORT) {
-			void *p = flock + flock_len;
 			if (p + sizeof(*peer) > end)
 				goto bad;
 			peer = p;
+			p += sizeof(*peer);
 		} else if (op == CEPH_CAP_OP_EXPORT) {
 			/* recorded in unused fields */
 			peer = (void *)&h->size;
 		}
 	}
 
+	if (le16_to_cpu(msg->hdr.version) >= 4) {
+		ceph_decode_64_safe(&p, end, inline_version, bad);
+		ceph_decode_32_safe(&p, end, inline_len, bad);
+		if (p + inline_len > end)
+			goto bad;
+		inline_data = p;
+		p += inline_len;
+	}
+
 	/* lookup ino */
 	inode = ceph_find_inode(sb, vino);
 	ci = ceph_inode(inode);
@@ -3085,6 +3156,7 @@
 		handle_cap_import(mdsc, inode, h, peer, session,
 				  &cap, &issued);
 		handle_cap_grant(mdsc, inode, h,  snaptrace, snaptrace_len,
+				 inline_version, inline_data, inline_len,
 				 msg->middle, session, cap, issued);
 		goto done_unlocked;
 	}
@@ -3105,8 +3177,9 @@
 	case CEPH_CAP_OP_GRANT:
 		__ceph_caps_issued(ci, &issued);
 		issued |= __ceph_caps_dirty(ci);
-		handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle,
-				 session, cap, issued);
+		handle_cap_grant(mdsc, inode, h, NULL, 0,
+				 inline_version, inline_data, inline_len,
+				 msg->middle, session, cap, issued);
 		goto done_unlocked;
 
 	case CEPH_CAP_OP_FLUSH_ACK:
@@ -3137,8 +3210,7 @@
 done:
 	mutex_unlock(&session->s_mutex);
 done_unlocked:
-	if (inode)
-		iput(inode);
+	iput(inode);
 	return;
 
 bad:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 681a853..c241603 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -183,7 +183,7 @@
 	spin_unlock(&parent->d_lock);
 
 	/* make sure a dentry wasn't dropped while we didn't have parent lock */
-	if (!ceph_dir_is_complete(dir)) {
+	if (!ceph_dir_is_complete_ordered(dir)) {
 		dout(" lost dir complete on %p; falling back to mds\n", dir);
 		dput(dentry);
 		err = -EAGAIN;
@@ -261,10 +261,6 @@
 
 	/* always start with . and .. */
 	if (ctx->pos == 0) {
-		/* note dir version at start of readdir so we can tell
-		 * if any dentries get dropped */
-		fi->dir_release_count = atomic_read(&ci->i_release_count);
-
 		dout("readdir off 0 -> '.'\n");
 		if (!dir_emit(ctx, ".", 1, 
 			    ceph_translate_ino(inode->i_sb, inode->i_ino),
@@ -289,7 +285,7 @@
 	if ((ctx->pos == 2 || fi->dentry) &&
 	    !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
 	    ceph_snap(inode) != CEPH_SNAPDIR &&
-	    __ceph_dir_is_complete(ci) &&
+	    __ceph_dir_is_complete_ordered(ci) &&
 	    __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
 		u32 shared_gen = ci->i_shared_gen;
 		spin_unlock(&ci->i_ceph_lock);
@@ -312,6 +308,13 @@
 
 	/* proceed with a normal readdir */
 
+	if (ctx->pos == 2) {
+		/* note dir version at start of readdir so we can tell
+		 * if any dentries get dropped */
+		fi->dir_release_count = atomic_read(&ci->i_release_count);
+		fi->dir_ordered_count = ci->i_ordered_count;
+	}
+
 more:
 	/* do we have the correct frag content buffered? */
 	if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -446,8 +449,12 @@
 	 */
 	spin_lock(&ci->i_ceph_lock);
 	if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
-		dout(" marking %p complete\n", inode);
-		__ceph_dir_set_complete(ci, fi->dir_release_count);
+		if (ci->i_ordered_count == fi->dir_ordered_count)
+			dout(" marking %p complete and ordered\n", inode);
+		else
+			dout(" marking %p complete\n", inode);
+		__ceph_dir_set_complete(ci, fi->dir_release_count,
+					fi->dir_ordered_count);
 	}
 	spin_unlock(&ci->i_ceph_lock);
 
@@ -805,7 +812,9 @@
 		acls.pagelist = NULL;
 	}
 	err = ceph_mdsc_do_request(mdsc, dir, req);
-	if (!err && !req->r_reply_info.head->is_dentry)
+	if (!err &&
+	    !req->r_reply_info.head->is_target &&
+	    !req->r_reply_info.head->is_dentry)
 		err = ceph_handle_notrace_create(dir, dentry);
 	ceph_mdsc_put_request(req);
 out:
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 9f8e357..ce74b39 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -333,6 +333,11 @@
 	return 0;
 }
 
+enum {
+	CHECK_EOF = 1,
+	READ_INLINE = 2,
+};
+
 /*
  * Read a range of bytes striped over one or more objects.  Iterate over
  * objects we stripe over.  (That's not atomic, but good enough for now.)
@@ -412,7 +417,7 @@
 		ret = read;
 		/* did we bounce off eof? */
 		if (pos + left > inode->i_size)
-			*checkeof = 1;
+			*checkeof = CHECK_EOF;
 	}
 
 	dout("striped_read returns %d\n", ret);
@@ -598,7 +603,7 @@
 		snapc = ci->i_snap_realm->cached_context;
 		vino = ceph_vino(inode);
 		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-					    vino, pos, &len,
+					    vino, pos, &len, 0,
 					    2,/*include a 'startsync' command*/
 					    CEPH_OSD_OP_WRITE, flags, snapc,
 					    ci->i_truncate_seq,
@@ -609,6 +614,8 @@
 			break;
 		}
 
+		osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+
 		n = iov_iter_get_pages_alloc(from, &pages, len, &start);
 		if (unlikely(n < 0)) {
 			ret = n;
@@ -713,7 +720,7 @@
 		snapc = ci->i_snap_realm->cached_context;
 		vino = ceph_vino(inode);
 		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-					    vino, pos, &len, 1,
+					    vino, pos, &len, 0, 1,
 					    CEPH_OSD_OP_WRITE, flags, snapc,
 					    ci->i_truncate_seq,
 					    ci->i_truncate_size,
@@ -803,9 +810,10 @@
 	size_t len = iocb->ki_nbytes;
 	struct inode *inode = file_inode(filp);
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct page *pinned_page = NULL;
 	ssize_t ret;
 	int want, got = 0;
-	int checkeof = 0, read = 0;
+	int retry_op = 0, read = 0;
 
 again:
 	dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
@@ -815,7 +823,7 @@
 		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
 	else
 		want = CEPH_CAP_FILE_CACHE;
-	ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
+	ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
 	if (ret < 0)
 		return ret;
 
@@ -827,8 +835,12 @@
 		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
 		     ceph_cap_string(got));
 
-		/* hmm, this isn't really async... */
-		ret = ceph_sync_read(iocb, to, &checkeof);
+		if (ci->i_inline_version == CEPH_INLINE_NONE) {
+			/* hmm, this isn't really async... */
+			ret = ceph_sync_read(iocb, to, &retry_op);
+		} else {
+			retry_op = READ_INLINE;
+		}
 	} else {
 		dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
 		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
@@ -838,13 +850,55 @@
 	}
 	dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
 	     inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
+	if (pinned_page) {
+		page_cache_release(pinned_page);
+		pinned_page = NULL;
+	}
 	ceph_put_cap_refs(ci, got);
+	if (retry_op && ret >= 0) {
+		int statret;
+		struct page *page = NULL;
+		loff_t i_size;
+		if (retry_op == READ_INLINE) {
+			page = __page_cache_alloc(GFP_NOFS);
+			if (!page)
+				return -ENOMEM;
+		}
 
-	if (checkeof && ret >= 0) {
-		int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
+		statret = __ceph_do_getattr(inode, page,
+					    CEPH_STAT_CAP_INLINE_DATA, !!page);
+		if (statret < 0) {
+			 __free_page(page);
+			if (statret == -ENODATA) {
+				BUG_ON(retry_op != READ_INLINE);
+				goto again;
+			}
+			return statret;
+		}
+
+		i_size = i_size_read(inode);
+		if (retry_op == READ_INLINE) {
+			/* does not support inline data > PAGE_SIZE */
+			if (i_size > PAGE_CACHE_SIZE) {
+				ret = -EIO;
+			} else if (iocb->ki_pos < i_size) {
+				loff_t end = min_t(loff_t, i_size,
+						   iocb->ki_pos + len);
+				if (statret < end)
+					zero_user_segment(page, statret, end);
+				ret = copy_page_to_iter(page,
+						iocb->ki_pos & ~PAGE_MASK,
+						end - iocb->ki_pos, to);
+				iocb->ki_pos += ret;
+			} else {
+				ret = 0;
+			}
+			__free_pages(page, 0);
+			return ret;
+		}
 
 		/* hit EOF or hole? */
-		if (statret == 0 && iocb->ki_pos < inode->i_size &&
+		if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
 			ret < len) {
 			dout("sync_read hit hole, ppos %lld < size %lld"
 			     ", reading more\n", iocb->ki_pos,
@@ -852,7 +906,7 @@
 
 			read += ret;
 			len -= ret;
-			checkeof = 0;
+			retry_op = 0;
 			goto again;
 		}
 	}
@@ -909,6 +963,12 @@
 	if (err)
 		goto out;
 
+	if (ci->i_inline_version != CEPH_INLINE_NONE) {
+		err = ceph_uninline_data(file, NULL);
+		if (err < 0)
+			goto out;
+	}
+
 retry_snap:
 	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
 		err = -ENOSPC;
@@ -922,7 +982,8 @@
 	else
 		want = CEPH_CAP_FILE_BUFFER;
 	got = 0;
-	err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count);
+	err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
+			    &got, NULL);
 	if (err < 0)
 		goto out;
 
@@ -969,6 +1030,7 @@
 	if (written >= 0) {
 		int dirty;
 		spin_lock(&ci->i_ceph_lock);
+		ci->i_inline_version = CEPH_INLINE_NONE;
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
@@ -1111,7 +1173,7 @@
 	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 					ceph_vino(inode),
 					offset, length,
-					1, op,
+					0, 1, op,
 					CEPH_OSD_FLAG_WRITE |
 					CEPH_OSD_FLAG_ONDISK,
 					NULL, 0, 0, false);
@@ -1214,6 +1276,12 @@
 		goto unlock;
 	}
 
+	if (ci->i_inline_version != CEPH_INLINE_NONE) {
+		ret = ceph_uninline_data(file, NULL);
+		if (ret < 0)
+			goto unlock;
+	}
+
 	size = i_size_read(inode);
 	if (!(mode & FALLOC_FL_KEEP_SIZE))
 		endoff = offset + length;
@@ -1223,7 +1291,7 @@
 	else
 		want = CEPH_CAP_FILE_BUFFER;
 
-	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
+	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
 	if (ret < 0)
 		goto unlock;
 
@@ -1240,6 +1308,7 @@
 
 	if (!ret) {
 		spin_lock(&ci->i_ceph_lock);
+		ci->i_inline_version = CEPH_INLINE_NONE;
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index a5593d5..f61a741 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -387,8 +387,10 @@
 	spin_lock_init(&ci->i_ceph_lock);
 
 	ci->i_version = 0;
+	ci->i_inline_version = 0;
 	ci->i_time_warp_seq = 0;
 	ci->i_ceph_flags = 0;
+	ci->i_ordered_count = 0;
 	atomic_set(&ci->i_release_count, 1);
 	atomic_set(&ci->i_complete_count, 0);
 	ci->i_symlink = NULL;
@@ -657,7 +659,7 @@
  * Populate an inode based on info from mds.  May be called on new or
  * existing inodes.
  */
-static int fill_inode(struct inode *inode,
+static int fill_inode(struct inode *inode, struct page *locked_page,
 		      struct ceph_mds_reply_info_in *iinfo,
 		      struct ceph_mds_reply_dirfrag *dirinfo,
 		      struct ceph_mds_session *session,
@@ -675,6 +677,7 @@
 	bool wake = false;
 	bool queue_trunc = false;
 	bool new_version = false;
+	bool fill_inline = false;
 
 	dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
 	     inode, ceph_vinop(inode), le64_to_cpu(info->version),
@@ -845,7 +848,8 @@
 	    (issued & CEPH_CAP_FILE_EXCL) == 0 &&
 	    !__ceph_dir_is_complete(ci)) {
 		dout(" marking %p complete (empty)\n", inode);
-		__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
+		__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
+					ci->i_ordered_count);
 	}
 
 	/* were we issued a capability? */
@@ -873,8 +877,23 @@
 			   ceph_vinop(inode));
 		__ceph_get_fmode(ci, cap_fmode);
 	}
+
+	if (iinfo->inline_version > 0 &&
+	    iinfo->inline_version >= ci->i_inline_version) {
+		int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
+		ci->i_inline_version = iinfo->inline_version;
+		if (ci->i_inline_version != CEPH_INLINE_NONE &&
+		    (locked_page ||
+		     (le32_to_cpu(info->cap.caps) & cache_caps)))
+			fill_inline = true;
+	}
+
 	spin_unlock(&ci->i_ceph_lock);
 
+	if (fill_inline)
+		ceph_fill_inline_data(inode, locked_page,
+				      iinfo->inline_data, iinfo->inline_len);
+
 	if (wake)
 		wake_up_all(&ci->i_cap_wq);
 
@@ -1062,7 +1081,8 @@
 		struct inode *dir = req->r_locked_dir;
 
 		if (dir) {
-			err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag,
+			err = fill_inode(dir, NULL,
+					 &rinfo->diri, rinfo->dirfrag,
 					 session, req->r_request_started, -1,
 					 &req->r_caps_reservation);
 			if (err < 0)
@@ -1132,7 +1152,7 @@
 		}
 		req->r_target_inode = in;
 
-		err = fill_inode(in, &rinfo->targeti, NULL,
+		err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL,
 				session, req->r_request_started,
 				(!req->r_aborted && rinfo->head->result == 0) ?
 				req->r_fmode : -1,
@@ -1204,8 +1224,8 @@
 			ceph_invalidate_dentry_lease(dn);
 
 			/* d_move screws up sibling dentries' offsets */
-			ceph_dir_clear_complete(dir);
-			ceph_dir_clear_complete(olddir);
+			ceph_dir_clear_ordered(dir);
+			ceph_dir_clear_ordered(olddir);
 
 			dout("dn %p gets new offset %lld\n", req->r_old_dentry,
 			     ceph_dentry(req->r_old_dentry)->offset);
@@ -1217,6 +1237,7 @@
 		if (!rinfo->head->is_target) {
 			dout("fill_trace null dentry\n");
 			if (dn->d_inode) {
+				ceph_dir_clear_ordered(dir);
 				dout("d_delete %p\n", dn);
 				d_delete(dn);
 			} else {
@@ -1233,7 +1254,7 @@
 
 		/* attach proper inode */
 		if (!dn->d_inode) {
-			ceph_dir_clear_complete(dir);
+			ceph_dir_clear_ordered(dir);
 			ihold(in);
 			dn = splice_dentry(dn, in, &have_lease);
 			if (IS_ERR(dn)) {
@@ -1263,7 +1284,7 @@
 		BUG_ON(!dir);
 		BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
 		dout(" linking snapped dir %p to dn %p\n", in, dn);
-		ceph_dir_clear_complete(dir);
+		ceph_dir_clear_ordered(dir);
 		ihold(in);
 		dn = splice_dentry(dn, in, NULL);
 		if (IS_ERR(dn)) {
@@ -1300,7 +1321,7 @@
 			dout("new_inode badness got %d\n", err);
 			continue;
 		}
-		rc = fill_inode(in, &rinfo->dir_in[i], NULL, session,
+		rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
 				req->r_request_started, -1,
 				&req->r_caps_reservation);
 		if (rc < 0) {
@@ -1416,7 +1437,7 @@
 			}
 		}
 
-		if (fill_inode(in, &rinfo->dir_in[i], NULL, session,
+		if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
 			       req->r_request_started, -1,
 			       &req->r_caps_reservation) < 0) {
 			pr_err("fill_inode badness on %p\n", in);
@@ -1899,7 +1920,8 @@
  * Verify that we have a lease on the given mask.  If not,
  * do a getattr against an mds.
  */
-int ceph_do_getattr(struct inode *inode, int mask, bool force)
+int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
+		      int mask, bool force)
 {
 	struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1911,7 +1933,8 @@
 		return 0;
 	}
 
-	dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
+	dout("do_getattr inode %p mask %s mode 0%o\n",
+	     inode, ceph_cap_string(mask), inode->i_mode);
 	if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
 		return 0;
 
@@ -1922,7 +1945,19 @@
 	ihold(inode);
 	req->r_num_caps = 1;
 	req->r_args.getattr.mask = cpu_to_le32(mask);
+	req->r_locked_page = locked_page;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
+	if (locked_page && err == 0) {
+		u64 inline_version = req->r_reply_info.targeti.inline_version;
+		if (inline_version == 0) {
+			/* the reply is supposed to contain inline data */
+			err = -EINVAL;
+		} else if (inline_version == CEPH_INLINE_NONE) {
+			err = -ENODATA;
+		} else {
+			err = req->r_reply_info.targeti.inline_len;
+		}
+	}
 	ceph_mdsc_put_request(req);
 	dout("do_getattr result=%d\n", err);
 	return err;
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index fbc39c4..c35c5c6 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -9,6 +9,8 @@
 #include <linux/ceph/pagelist.h>
 
 static u64 lock_secret;
+static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
+                                         struct ceph_mds_request *req);
 
 static inline u64 secure_addr(void *addr)
 {
@@ -40,6 +42,9 @@
 	u64 length = 0;
 	u64 owner;
 
+	if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK)
+		wait = 0;
+
 	req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
 	if (IS_ERR(req))
 		return PTR_ERR(req);
@@ -68,6 +73,9 @@
 	req->r_args.filelock_change.length = cpu_to_le64(length);
 	req->r_args.filelock_change.wait = wait;
 
+	if (wait)
+		req->r_wait_for_completion = ceph_lock_wait_for_completion;
+
 	err = ceph_mdsc_do_request(mdsc, inode, req);
 
 	if (operation == CEPH_MDS_OP_GETFILELOCK) {
@@ -96,6 +104,52 @@
 	return err;
 }
 
+static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc,
+                                         struct ceph_mds_request *req)
+{
+	struct ceph_mds_request *intr_req;
+	struct inode *inode = req->r_inode;
+	int err, lock_type;
+
+	BUG_ON(req->r_op != CEPH_MDS_OP_SETFILELOCK);
+	if (req->r_args.filelock_change.rule == CEPH_LOCK_FCNTL)
+		lock_type = CEPH_LOCK_FCNTL_INTR;
+	else if (req->r_args.filelock_change.rule == CEPH_LOCK_FLOCK)
+		lock_type = CEPH_LOCK_FLOCK_INTR;
+	else
+		BUG_ON(1);
+	BUG_ON(req->r_args.filelock_change.type == CEPH_LOCK_UNLOCK);
+
+	err = wait_for_completion_interruptible(&req->r_completion);
+	if (!err)
+		return 0;
+
+	dout("ceph_lock_wait_for_completion: request %llu was interrupted\n",
+	     req->r_tid);
+
+	intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK,
+					    USE_AUTH_MDS);
+	if (IS_ERR(intr_req))
+		return PTR_ERR(intr_req);
+
+	intr_req->r_inode = inode;
+	ihold(inode);
+	intr_req->r_num_caps = 1;
+
+	intr_req->r_args.filelock_change = req->r_args.filelock_change;
+	intr_req->r_args.filelock_change.rule = lock_type;
+	intr_req->r_args.filelock_change.type = CEPH_LOCK_UNLOCK;
+
+	err = ceph_mdsc_do_request(mdsc, inode, intr_req);
+	ceph_mdsc_put_request(intr_req);
+
+	if (err && err != -ERESTARTSYS)
+		return err;
+
+	wait_for_completion(&req->r_completion);
+	return 0;
+}
+
 /**
  * Attempt to set an fcntl lock.
  * For now, this just goes away to the server. Later it may be more awesome.
@@ -143,11 +197,6 @@
 				     err);
 			}
 		}
-
-	} else if (err == -ERESTARTSYS) {
-		dout("undoing lock\n");
-		ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
-				  CEPH_LOCK_UNLOCK, 0, fl);
 	}
 	return err;
 }
@@ -186,11 +235,6 @@
 					  file, CEPH_LOCK_UNLOCK, 0, fl);
 			dout("got %d on flock_lock_file_wait, undid lock", err);
 		}
-	} else if (err == -ERESTARTSYS) {
-		dout("undoing lock\n");
-		ceph_lock_message(CEPH_LOCK_FLOCK,
-				  CEPH_MDS_OP_SETFILELOCK,
-				  file, CEPH_LOCK_UNLOCK, 0, fl);
 	}
 	return err;
 }
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a92d3f5..d2171f4 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -89,6 +89,16 @@
 	ceph_decode_need(p, end, info->xattr_len, bad);
 	info->xattr_data = *p;
 	*p += info->xattr_len;
+
+	if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
+		ceph_decode_64_safe(p, end, info->inline_version, bad);
+		ceph_decode_32_safe(p, end, info->inline_len, bad);
+		ceph_decode_need(p, end, info->inline_len, bad);
+		info->inline_data = *p;
+		*p += info->inline_len;
+	} else
+		info->inline_version = CEPH_INLINE_NONE;
+
 	return 0;
 bad:
 	return err;
@@ -524,8 +534,7 @@
 	}
 	if (req->r_locked_dir)
 		ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
-	if (req->r_target_inode)
-		iput(req->r_target_inode);
+	iput(req->r_target_inode);
 	if (req->r_dentry)
 		dput(req->r_dentry);
 	if (req->r_old_dentry)
@@ -861,8 +870,11 @@
 	/*
 	 * Serialize client metadata into waiting buffer space, using
 	 * the format that userspace expects for map<string, string>
+	 *
+	 * ClientSession messages with metadata are v2
 	 */
-	msg->hdr.version = 2;  /* ClientSession messages with metadata are v2 */
+	msg->hdr.version = cpu_to_le16(2);
+	msg->hdr.compat_version = cpu_to_le16(1);
 
 	/* The write pointer, following the session_head structure */
 	p = msg->front.iov_base + sizeof(*h);
@@ -1066,8 +1078,7 @@
 	session->s_cap_iterator = NULL;
 	spin_unlock(&session->s_cap_lock);
 
-	if (last_inode)
-		iput(last_inode);
+	iput(last_inode);
 	if (old_cap)
 		ceph_put_cap(session->s_mdsc, old_cap);
 
@@ -1874,7 +1885,7 @@
 		goto out_free2;
 	}
 
-	msg->hdr.version = 2;
+	msg->hdr.version = cpu_to_le16(2);
 	msg->hdr.tid = cpu_to_le64(req->r_tid);
 
 	head = msg->front.iov_base;
@@ -2208,6 +2219,8 @@
 			&req->r_completion, req->r_timeout);
 		if (err == 0)
 			err = -EIO;
+	} else if (req->r_wait_for_completion) {
+		err = req->r_wait_for_completion(mdsc, req);
 	} else {
 		err = wait_for_completion_killable(&req->r_completion);
 	}
@@ -3744,6 +3757,20 @@
 	return msg;
 }
 
+static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
+{
+       struct ceph_mds_session *s = con->private;
+       struct ceph_auth_handshake *auth = &s->s_auth;
+       return ceph_auth_sign_message(auth, msg);
+}
+
+static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
+{
+       struct ceph_mds_session *s = con->private;
+       struct ceph_auth_handshake *auth = &s->s_auth;
+       return ceph_auth_check_message_signature(auth, msg);
+}
+
 static const struct ceph_connection_operations mds_con_ops = {
 	.get = con_get,
 	.put = con_put,
@@ -3753,6 +3780,8 @@
 	.invalidate_authorizer = invalidate_authorizer,
 	.peer_reset = peer_reset,
 	.alloc_msg = mds_alloc_msg,
+	.sign_message = sign_message,
+	.check_message_signature = check_message_signature,
 };
 
 /* eof */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 3288359..e2817d0 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -41,6 +41,9 @@
 	char *symlink;
 	u32 xattr_len;
 	char *xattr_data;
+	u64 inline_version;
+	u32 inline_len;
+	char *inline_data;
 };
 
 /*
@@ -166,6 +169,11 @@
  */
 typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
 					     struct ceph_mds_request *req);
+/*
+ * wait for request completion callback
+ */
+typedef int (*ceph_mds_request_wait_callback_t) (struct ceph_mds_client *mdsc,
+						 struct ceph_mds_request *req);
 
 /*
  * an in-flight mds request
@@ -215,6 +223,7 @@
 	int r_request_release_offset;
 	struct ceph_msg  *r_reply;
 	struct ceph_mds_reply_info_parsed r_reply_info;
+	struct page *r_locked_page;
 	int r_err;
 	bool r_aborted;
 
@@ -239,6 +248,7 @@
 	struct completion r_completion;
 	struct completion r_safe_completion;
 	ceph_mds_request_callback_t r_callback;
+	ceph_mds_request_wait_callback_t r_wait_for_completion;
 	struct list_head  r_unsafe_item;  /* per-session unsafe list item */
 	bool		  r_got_unsafe, r_got_safe, r_got_result;
 
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index f01645a..ce35fbd 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -288,6 +288,9 @@
 	return 0;
 }
 
+
+static struct ceph_snap_context *empty_snapc;
+
 /*
  * build the snap context for a given realm.
  */
@@ -328,6 +331,12 @@
 		return 0;
 	}
 
+	if (num == 0 && realm->seq == empty_snapc->seq) {
+		ceph_get_snap_context(empty_snapc);
+		snapc = empty_snapc;
+		goto done;
+	}
+
 	/* alloc new snap context */
 	err = -ENOMEM;
 	if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64))
@@ -365,8 +374,8 @@
 	     realm->ino, realm, snapc, snapc->seq,
 	     (unsigned int) snapc->num_snaps);
 
-	if (realm->cached_context)
-		ceph_put_snap_context(realm->cached_context);
+done:
+	ceph_put_snap_context(realm->cached_context);
 	realm->cached_context = snapc;
 	return 0;
 
@@ -466,6 +475,9 @@
 		   cap_snap.  lucky us. */
 		dout("queue_cap_snap %p already pending\n", inode);
 		kfree(capsnap);
+	} else if (ci->i_snap_realm->cached_context == empty_snapc) {
+		dout("queue_cap_snap %p empty snapc\n", inode);
+		kfree(capsnap);
 	} else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
 			    CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
 		struct ceph_snap_context *snapc = ci->i_head_snapc;
@@ -504,6 +516,8 @@
 			capsnap->xattr_version = 0;
 		}
 
+		capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+
 		/* dirty page count moved from _head to this cap_snap;
 		   all subsequent writes page dirties occur _after_ this
 		   snapshot. */
@@ -590,15 +604,13 @@
 		if (!inode)
 			continue;
 		spin_unlock(&realm->inodes_with_caps_lock);
-		if (lastinode)
-			iput(lastinode);
+		iput(lastinode);
 		lastinode = inode;
 		ceph_queue_cap_snap(ci);
 		spin_lock(&realm->inodes_with_caps_lock);
 	}
 	spin_unlock(&realm->inodes_with_caps_lock);
-	if (lastinode)
-		iput(lastinode);
+	iput(lastinode);
 
 	list_for_each_entry(child, &realm->children, child_item) {
 		dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n",
@@ -928,5 +940,16 @@
 	return;
 }
 
+int __init ceph_snap_init(void)
+{
+	empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
+	if (!empty_snapc)
+		return -ENOMEM;
+	empty_snapc->seq = 1;
+	return 0;
+}
 
-
+void ceph_snap_exit(void)
+{
+	ceph_put_snap_context(empty_snapc);
+}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index f6e1237..50f06cd 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -515,7 +515,8 @@
 	struct ceph_fs_client *fsc;
 	const u64 supported_features =
 		CEPH_FEATURE_FLOCK |
-		CEPH_FEATURE_DIRLAYOUTHASH;
+		CEPH_FEATURE_DIRLAYOUTHASH |
+		CEPH_FEATURE_MDS_INLINE_DATA;
 	const u64 required_features = 0;
 	int page_count;
 	size_t size;
@@ -1017,9 +1018,6 @@
 };
 MODULE_ALIAS_FS("ceph");
 
-#define _STRINGIFY(x) #x
-#define STRINGIFY(x) _STRINGIFY(x)
-
 static int __init init_ceph(void)
 {
 	int ret = init_caches();
@@ -1028,15 +1026,20 @@
 
 	ceph_flock_init();
 	ceph_xattr_init();
+	ret = ceph_snap_init();
+	if (ret)
+		goto out_xattr;
 	ret = register_filesystem(&ceph_fs_type);
 	if (ret)
-		goto out_icache;
+		goto out_snap;
 
 	pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
 
 	return 0;
 
-out_icache:
+out_snap:
+	ceph_snap_exit();
+out_xattr:
 	ceph_xattr_exit();
 	destroy_caches();
 out:
@@ -1047,6 +1050,7 @@
 {
 	dout("exit_ceph\n");
 	unregister_filesystem(&ceph_fs_type);
+	ceph_snap_exit();
 	ceph_xattr_exit();
 	destroy_caches();
 }
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index b82f507..e1aa32d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -161,6 +161,7 @@
 	u64 time_warp_seq;
 	int writing;   /* a sync write is still in progress */
 	int dirty_pages;     /* dirty pages awaiting writeback */
+	bool inline_data;
 };
 
 static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
@@ -253,9 +254,11 @@
 	spinlock_t i_ceph_lock;
 
 	u64 i_version;
+	u64 i_inline_version;
 	u32 i_time_warp_seq;
 
 	unsigned i_ceph_flags;
+	int i_ordered_count;
 	atomic_t i_release_count;
 	atomic_t i_complete_count;
 
@@ -434,14 +437,19 @@
 /*
  * Ceph inode.
  */
-#define CEPH_I_NODELAY   4  /* do not delay cap release */
-#define CEPH_I_FLUSH     8  /* do not delay flush of dirty metadata */
-#define CEPH_I_NOFLUSH  16  /* do not flush dirty caps */
+#define CEPH_I_DIR_ORDERED	1  /* dentries in dir are ordered */
+#define CEPH_I_NODELAY		4  /* do not delay cap release */
+#define CEPH_I_FLUSH		8  /* do not delay flush of dirty metadata */
+#define CEPH_I_NOFLUSH		16 /* do not flush dirty caps */
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
-					   int release_count)
+					   int release_count, int ordered_count)
 {
 	atomic_set(&ci->i_complete_count, release_count);
+	if (ci->i_ordered_count == ordered_count)
+		ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
+	else
+		ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
 }
 
 static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
@@ -455,16 +463,35 @@
 		atomic_read(&ci->i_release_count);
 }
 
+static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
+{
+	return __ceph_dir_is_complete(ci) &&
+		(ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
+}
+
 static inline void ceph_dir_clear_complete(struct inode *inode)
 {
 	__ceph_dir_clear_complete(ceph_inode(inode));
 }
 
-static inline bool ceph_dir_is_complete(struct inode *inode)
+static inline void ceph_dir_clear_ordered(struct inode *inode)
 {
-	return __ceph_dir_is_complete(ceph_inode(inode));
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	spin_lock(&ci->i_ceph_lock);
+	ci->i_ordered_count++;
+	ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
+	spin_unlock(&ci->i_ceph_lock);
 }
 
+static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	bool ret;
+	spin_lock(&ci->i_ceph_lock);
+	ret = __ceph_dir_is_complete_ordered(ci);
+	spin_unlock(&ci->i_ceph_lock);
+	return ret;
+}
 
 /* find a specific frag @f */
 extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
@@ -580,6 +607,7 @@
 	char *last_name;       /* last entry in previous chunk */
 	struct dentry *dentry; /* next dentry (for dcache readdir) */
 	int dir_release_count;
+	int dir_ordered_count;
 
 	/* used for -o dirstat read() on directory thing */
 	char *dir_info;
@@ -673,6 +701,8 @@
 extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
 				  struct ceph_cap_snap *capsnap);
 extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
+extern int ceph_snap_init(void);
+extern void ceph_snap_exit(void);
 
 /*
  * a cap_snap is "pending" if it is still awaiting an in-progress
@@ -715,7 +745,12 @@
 extern void ceph_queue_invalidate(struct inode *inode);
 extern void ceph_queue_writeback(struct inode *inode);
 
-extern int ceph_do_getattr(struct inode *inode, int mask, bool force);
+extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
+			     int mask, bool force);
+static inline int ceph_do_getattr(struct inode *inode, int mask, bool force)
+{
+	return __ceph_do_getattr(inode, NULL, mask, force);
+}
 extern int ceph_permission(struct inode *inode, int mask);
 extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
 extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
@@ -830,7 +865,7 @@
 				      int mds, int drop, int unless);
 
 extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
-			 int *got, loff_t endoff);
+			 loff_t endoff, int *got, struct page **pinned_page);
 
 /* for counting open files by mode */
 static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode)
@@ -852,7 +887,9 @@
 			    struct file *file, unsigned flags, umode_t mode,
 			    int *opened);
 extern int ceph_release(struct inode *inode, struct file *filp);
-
+extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
+				  char *data, size_t len);
+int ceph_uninline_data(struct file *filp, struct page *locked_page);
 /* dir.c */
 extern const struct file_operations ceph_dir_fops;
 extern const struct inode_operations ceph_dir_iops;
diff --git a/fs/ceph/super.h.rej b/fs/ceph/super.h.rej
new file mode 100644
index 0000000..88fe3df
--- /dev/null
+++ b/fs/ceph/super.h.rej
@@ -0,0 +1,10 @@
+--- fs/ceph/super.h
++++ fs/ceph/super.h
+@@ -254,6 +255,7 @@
+	spinlock_t i_ceph_lock;
+
+	u64 i_version;
++	u64 i_inline_version;
+	u32 i_time_warp_seq;
+
+	unsigned i_ceph_flags;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 678b0d2..5a492ca 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -854,7 +854,7 @@
 	struct ceph_pagelist *pagelist = NULL;
 	int err;
 
-	if (value) {
+	if (size > 0) {
 		/* copy value into pagelist */
 		pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
 		if (!pagelist)
@@ -864,7 +864,7 @@
 		err = ceph_pagelist_append(pagelist, value, size);
 		if (err)
 			goto out;
-	} else {
+	} else if (!value) {
 		flags |= CEPH_XATTR_REMOVE;
 	}
 
@@ -1001,6 +1001,9 @@
 	if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN))
 		return generic_setxattr(dentry, name, value, size, flags);
 
+	if (size == 0)
+		value = "";  /* empty EA, do not remove */
+
 	return __ceph_setxattr(dentry, name, value, size, flags);
 }
 
diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h
index 5f338684..260d78b 100644
--- a/include/linux/ceph/auth.h
+++ b/include/linux/ceph/auth.h
@@ -13,6 +13,7 @@
 
 struct ceph_auth_client;
 struct ceph_authorizer;
+struct ceph_msg;
 
 struct ceph_auth_handshake {
 	struct ceph_authorizer *authorizer;
@@ -20,6 +21,10 @@
 	size_t authorizer_buf_len;
 	void *authorizer_reply_buf;
 	size_t authorizer_reply_buf_len;
+	int (*sign_message)(struct ceph_auth_handshake *auth,
+			    struct ceph_msg *msg);
+	int (*check_message_signature)(struct ceph_auth_handshake *auth,
+				       struct ceph_msg *msg);
 };
 
 struct ceph_auth_client_ops {
@@ -66,6 +71,11 @@
 	void (*reset)(struct ceph_auth_client *ac);
 
 	void (*destroy)(struct ceph_auth_client *ac);
+
+	int (*sign_message)(struct ceph_auth_handshake *auth,
+			    struct ceph_msg *msg);
+	int (*check_message_signature)(struct ceph_auth_handshake *auth,
+				       struct ceph_msg *msg);
 };
 
 struct ceph_auth_client {
@@ -113,4 +123,20 @@
 extern void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac,
 					    int peer_type);
 
+static inline int ceph_auth_sign_message(struct ceph_auth_handshake *auth,
+					 struct ceph_msg *msg)
+{
+	if (auth->sign_message)
+		return auth->sign_message(auth, msg);
+	return 0;
+}
+
+static inline
+int ceph_auth_check_message_signature(struct ceph_auth_handshake *auth,
+				      struct ceph_msg *msg)
+{
+	if (auth->check_message_signature)
+		return auth->check_message_signature(auth, msg);
+	return 0;
+}
 #endif
diff --git a/include/linux/ceph/buffer.h b/include/linux/ceph/buffer.h
index 07ad423..07ca15e 100644
--- a/include/linux/ceph/buffer.h
+++ b/include/linux/ceph/buffer.h
@@ -10,8 +10,7 @@
 /*
  * a simple reference counted buffer.
  *
- * use kmalloc for small sizes (<= one page), vmalloc for larger
- * sizes.
+ * use kmalloc for smaller sizes, vmalloc for larger sizes.
  */
 struct ceph_buffer {
 	struct kref kref;
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
index d12659c..71e05bb 100644
--- a/include/linux/ceph/ceph_features.h
+++ b/include/linux/ceph/ceph_features.h
@@ -84,6 +84,7 @@
 	 CEPH_FEATURE_PGPOOL3 |			\
 	 CEPH_FEATURE_OSDENC |			\
 	 CEPH_FEATURE_CRUSH_TUNABLES |		\
+	 CEPH_FEATURE_MSG_AUTH |		\
 	 CEPH_FEATURE_CRUSH_TUNABLES2 |		\
 	 CEPH_FEATURE_REPLY_CREATE_INODE |	\
 	 CEPH_FEATURE_OSDHASHPSPOOL |		\
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 3c97d5e..c0dadaa 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -522,8 +522,11 @@
 	__le32 dist[];
 } __attribute__ ((packed));
 
-#define CEPH_LOCK_FCNTL    1
-#define CEPH_LOCK_FLOCK    2
+#define CEPH_LOCK_FCNTL		1
+#define CEPH_LOCK_FLOCK		2
+#define CEPH_LOCK_FCNTL_INTR    3
+#define CEPH_LOCK_FLOCK_INTR    4
+
 
 #define CEPH_LOCK_SHARED   1
 #define CEPH_LOCK_EXCL     2
@@ -549,6 +552,7 @@
 
 int ceph_flags_to_mode(int flags);
 
+#define CEPH_INLINE_NONE	((__u64)-1)
 
 /* capability bits */
 #define CEPH_CAP_PIN         1  /* no specific capabilities beyond the pin */
@@ -613,6 +617,8 @@
 				 CEPH_CAP_LINK_SHARED |	\
 				 CEPH_CAP_FILE_SHARED |	\
 				 CEPH_CAP_XATTR_SHARED)
+#define CEPH_STAT_CAP_INLINE_DATA (CEPH_CAP_FILE_SHARED | \
+				   CEPH_CAP_FILE_RD)
 
 #define CEPH_CAP_ANY_SHARED (CEPH_CAP_AUTH_SHARED |			\
 			      CEPH_CAP_LINK_SHARED |			\
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 07bc359..8b11a79 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -29,6 +29,7 @@
 #define CEPH_OPT_NOSHARE          (1<<1) /* don't share client with other sbs */
 #define CEPH_OPT_MYIP             (1<<2) /* specified my ip */
 #define CEPH_OPT_NOCRC            (1<<3) /* no data crc on writes */
+#define CEPH_OPT_NOMSGAUTH	  (1<<4) /* not require cephx message signature */
 
 #define CEPH_OPT_DEFAULT   (0)
 
@@ -184,7 +185,6 @@
 extern const char *ceph_msg_type_name(int type);
 extern int ceph_check_fsid(struct ceph_client *client, struct ceph_fsid *fsid);
 extern void *ceph_kvmalloc(size_t size, gfp_t flags);
-extern void ceph_kvfree(const void *ptr);
 
 extern struct ceph_options *ceph_parse_options(char *options,
 			      const char *dev_name, const char *dev_name_end,
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 40ae58e..d9d396c 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -42,6 +42,10 @@
 	struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
 					struct ceph_msg_header *hdr,
 					int *skip);
+	int (*sign_message) (struct ceph_connection *con, struct ceph_msg *msg);
+
+	int (*check_message_signature) (struct ceph_connection *con,
+					struct ceph_msg *msg);
 };
 
 /* use format string %s%d */
@@ -142,7 +146,10 @@
  */
 struct ceph_msg {
 	struct ceph_msg_header hdr;	/* header */
-	struct ceph_msg_footer footer;	/* footer */
+	union {
+		struct ceph_msg_footer footer;		/* footer */
+		struct ceph_msg_footer_old old_footer;	/* old format footer */
+	};
 	struct kvec front;              /* unaligned blobs of message */
 	struct ceph_buffer *middle;
 
diff --git a/include/linux/ceph/msgr.h b/include/linux/ceph/msgr.h
index 3d94a73..1c18872 100644
--- a/include/linux/ceph/msgr.h
+++ b/include/linux/ceph/msgr.h
@@ -152,7 +152,8 @@
 			     receiver: mask against ~PAGE_MASK */
 
 	struct ceph_entity_name src;
-	__le32 reserved;
+	__le16 compat_version;
+	__le16 reserved;
 	__le32 crc;       /* header crc32c */
 } __attribute__ ((packed));
 
@@ -164,13 +165,21 @@
 /*
  * follows data payload
  */
+struct ceph_msg_footer_old {
+	__le32 front_crc, middle_crc, data_crc;
+	__u8 flags;
+} __attribute__ ((packed));
+
 struct ceph_msg_footer {
 	__le32 front_crc, middle_crc, data_crc;
+	// sig holds the 64 bits of the digital signature for the message PLR
+	__le64  sig;
 	__u8 flags;
 } __attribute__ ((packed));
 
 #define CEPH_MSG_FOOTER_COMPLETE  (1<<0)   /* msg wasn't aborted */
 #define CEPH_MSG_FOOTER_NOCRC     (1<<1)   /* no data crc */
+#define CEPH_MSG_FOOTER_SIGNED	  (1<<2)   /* msg was signed */
 
 
 #endif
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 03aeb27..5d86416 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -87,6 +87,13 @@
 			struct ceph_osd_data osd_data;
 		} extent;
 		struct {
+			__le32 name_len;
+			__le32 value_len;
+			__u8 cmp_op;       /* CEPH_OSD_CMPXATTR_OP_* */
+			__u8 cmp_mode;     /* CEPH_OSD_CMPXATTR_MODE_* */
+			struct ceph_osd_data osd_data;
+		} xattr;
+		struct {
 			const char *class_name;
 			const char *method_name;
 			struct ceph_osd_data request_info;
@@ -295,6 +302,9 @@
 extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
 					const char *class, const char *method);
+extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
+				 u16 opcode, const char *name, const void *value,
+				 size_t size, u8 cmp_op, u8 cmp_mode);
 extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
 					u64 cookie, u64 version, int flag);
@@ -318,7 +328,8 @@
 				      struct ceph_file_layout *layout,
 				      struct ceph_vino vino,
 				      u64 offset, u64 *len,
-				      int num_ops, int opcode, int flags,
+				      unsigned int which, int num_ops,
+				      int opcode, int flags,
 				      struct ceph_snap_context *snapc,
 				      u32 truncate_seq, u64 truncate_size,
 				      bool use_mempool);
diff --git a/include/linux/ceph/pagelist.h b/include/linux/ceph/pagelist.h
index 5f871d8..13d71fe 100644
--- a/include/linux/ceph/pagelist.h
+++ b/include/linux/ceph/pagelist.h
@@ -1,8 +1,10 @@
 #ifndef __FS_CEPH_PAGELIST_H
 #define __FS_CEPH_PAGELIST_H
 
-#include <linux/list.h>
+#include <asm/byteorder.h>
 #include <linux/atomic.h>
+#include <linux/list.h>
+#include <linux/types.h>
 
 struct ceph_pagelist {
 	struct list_head head;
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index 7e38b72..1584581 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -8,6 +8,7 @@
 
 #include <linux/ceph/decode.h>
 #include <linux/ceph/auth.h>
+#include <linux/ceph/messenger.h>
 
 #include "crypto.h"
 #include "auth_x.h"
@@ -293,6 +294,11 @@
 	dout("build_authorizer for %s %p\n",
 	     ceph_entity_type_name(th->service), au);
 
+	ceph_crypto_key_destroy(&au->session_key);
+	ret = ceph_crypto_key_clone(&au->session_key, &th->session_key);
+	if (ret)
+		return ret;
+
 	maxlen = sizeof(*msg_a) + sizeof(msg_b) +
 		ceph_x_encrypt_buflen(ticket_blob_len);
 	dout("  need len %d\n", maxlen);
@@ -302,8 +308,10 @@
 	}
 	if (!au->buf) {
 		au->buf = ceph_buffer_new(maxlen, GFP_NOFS);
-		if (!au->buf)
+		if (!au->buf) {
+			ceph_crypto_key_destroy(&au->session_key);
 			return -ENOMEM;
+		}
 	}
 	au->service = th->service;
 	au->secret_id = th->secret_id;
@@ -329,7 +337,7 @@
 	get_random_bytes(&au->nonce, sizeof(au->nonce));
 	msg_b.struct_v = 1;
 	msg_b.nonce = cpu_to_le64(au->nonce);
-	ret = ceph_x_encrypt(&th->session_key, &msg_b, sizeof(msg_b),
+	ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b),
 			     p, end - p);
 	if (ret < 0)
 		goto out_buf;
@@ -560,6 +568,8 @@
 	auth->authorizer_buf_len = au->buf->vec.iov_len;
 	auth->authorizer_reply_buf = au->reply_buf;
 	auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
+	auth->sign_message = ac->ops->sign_message;
+	auth->check_message_signature = ac->ops->check_message_signature;
 
 	return 0;
 }
@@ -588,17 +598,13 @@
 					  struct ceph_authorizer *a, size_t len)
 {
 	struct ceph_x_authorizer *au = (void *)a;
-	struct ceph_x_ticket_handler *th;
 	int ret = 0;
 	struct ceph_x_authorize_reply reply;
 	void *preply = &reply;
 	void *p = au->reply_buf;
 	void *end = p + sizeof(au->reply_buf);
 
-	th = get_ticket_handler(ac, au->service);
-	if (IS_ERR(th))
-		return PTR_ERR(th);
-	ret = ceph_x_decrypt(&th->session_key, &p, end, &preply, sizeof(reply));
+	ret = ceph_x_decrypt(&au->session_key, &p, end, &preply, sizeof(reply));
 	if (ret < 0)
 		return ret;
 	if (ret != sizeof(reply))
@@ -618,6 +624,7 @@
 {
 	struct ceph_x_authorizer *au = (void *)a;
 
+	ceph_crypto_key_destroy(&au->session_key);
 	ceph_buffer_put(au->buf);
 	kfree(au);
 }
@@ -663,6 +670,59 @@
 		memset(&th->validity, 0, sizeof(th->validity));
 }
 
+static int calcu_signature(struct ceph_x_authorizer *au,
+			   struct ceph_msg *msg, __le64 *sig)
+{
+	int ret;
+	char tmp_enc[40];
+	__le32 tmp[5] = {
+		16u, msg->hdr.crc, msg->footer.front_crc,
+		msg->footer.middle_crc, msg->footer.data_crc,
+	};
+	ret = ceph_x_encrypt(&au->session_key, &tmp, sizeof(tmp),
+			     tmp_enc, sizeof(tmp_enc));
+	if (ret < 0)
+		return ret;
+	*sig = *(__le64*)(tmp_enc + 4);
+	return 0;
+}
+
+static int ceph_x_sign_message(struct ceph_auth_handshake *auth,
+			       struct ceph_msg *msg)
+{
+	int ret;
+	if (!auth->authorizer)
+		return 0;
+	ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
+			      msg, &msg->footer.sig);
+	if (ret < 0)
+		return ret;
+	msg->footer.flags |= CEPH_MSG_FOOTER_SIGNED;
+	return 0;
+}
+
+static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth,
+					  struct ceph_msg *msg)
+{
+	__le64 sig_check;
+	int ret;
+
+	if (!auth->authorizer)
+		return 0;
+	ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
+			      msg, &sig_check);
+	if (ret < 0)
+		return ret;
+	if (sig_check == msg->footer.sig)
+		return 0;
+	if (msg->footer.flags & CEPH_MSG_FOOTER_SIGNED)
+		dout("ceph_x_check_message_signature %p has signature %llx "
+		     "expect %llx\n", msg, msg->footer.sig, sig_check);
+	else
+		dout("ceph_x_check_message_signature %p sender did not set "
+		     "CEPH_MSG_FOOTER_SIGNED\n", msg);
+	return -EBADMSG;
+}
 
 static const struct ceph_auth_client_ops ceph_x_ops = {
 	.name = "x",
@@ -677,6 +737,8 @@
 	.invalidate_authorizer = ceph_x_invalidate_authorizer,
 	.reset =  ceph_x_reset,
 	.destroy = ceph_x_destroy,
+	.sign_message = ceph_x_sign_message,
+	.check_message_signature = ceph_x_check_message_signature,
 };
 
 
diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h
index 65ee720..e8b7c69 100644
--- a/net/ceph/auth_x.h
+++ b/net/ceph/auth_x.h
@@ -26,6 +26,7 @@
 
 
 struct ceph_x_authorizer {
+	struct ceph_crypto_key session_key;
 	struct ceph_buffer *buf;
 	unsigned int service;
 	u64 nonce;
diff --git a/net/ceph/buffer.c b/net/ceph/buffer.c
index 621b5f65..add5f92 100644
--- a/net/ceph/buffer.c
+++ b/net/ceph/buffer.c
@@ -6,7 +6,7 @@
 
 #include <linux/ceph/buffer.h>
 #include <linux/ceph/decode.h>
-#include <linux/ceph/libceph.h> /* for ceph_kv{malloc,free} */
+#include <linux/ceph/libceph.h> /* for ceph_kvmalloc */
 
 struct ceph_buffer *ceph_buffer_new(size_t len, gfp_t gfp)
 {
@@ -35,7 +35,7 @@
 	struct ceph_buffer *b = container_of(kref, struct ceph_buffer, kref);
 
 	dout("buffer_release %p\n", b);
-	ceph_kvfree(b->vec.iov_base);
+	kvfree(b->vec.iov_base);
 	kfree(b);
 }
 EXPORT_SYMBOL(ceph_buffer_release);
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 58fbfe1..5d5ab67 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -184,14 +184,6 @@
 	return __vmalloc(size, flags | __GFP_HIGHMEM, PAGE_KERNEL);
 }
 
-void ceph_kvfree(const void *ptr)
-{
-	if (is_vmalloc_addr(ptr))
-		vfree(ptr);
-	else
-		kfree(ptr);
-}
-
 
 static int parse_fsid(const char *str, struct ceph_fsid *fsid)
 {
@@ -245,6 +237,8 @@
 	Opt_noshare,
 	Opt_crc,
 	Opt_nocrc,
+	Opt_cephx_require_signatures,
+	Opt_nocephx_require_signatures,
 };
 
 static match_table_t opt_tokens = {
@@ -263,6 +257,8 @@
 	{Opt_noshare, "noshare"},
 	{Opt_crc, "crc"},
 	{Opt_nocrc, "nocrc"},
+	{Opt_cephx_require_signatures, "cephx_require_signatures"},
+	{Opt_nocephx_require_signatures, "nocephx_require_signatures"},
 	{-1, NULL}
 };
 
@@ -461,6 +457,12 @@
 		case Opt_nocrc:
 			opt->flags |= CEPH_OPT_NOCRC;
 			break;
+		case Opt_cephx_require_signatures:
+			opt->flags &= ~CEPH_OPT_NOMSGAUTH;
+			break;
+		case Opt_nocephx_require_signatures:
+			opt->flags |= CEPH_OPT_NOMSGAUTH;
+			break;
 
 		default:
 			BUG_ON(token);
@@ -504,6 +506,9 @@
 	init_waitqueue_head(&client->auth_wq);
 	client->auth_err = 0;
 
+	if (!ceph_test_opt(client, NOMSGAUTH))
+		required_features |= CEPH_FEATURE_MSG_AUTH;
+
 	client->extra_mon_dispatch = NULL;
 	client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
 		supported_features;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 8d1653c..33a2f201e 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1196,8 +1196,18 @@
 	dout("prepare_write_message_footer %p\n", con);
 	con->out_kvec_is_msg = true;
 	con->out_kvec[v].iov_base = &m->footer;
-	con->out_kvec[v].iov_len = sizeof(m->footer);
-	con->out_kvec_bytes += sizeof(m->footer);
+	if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
+		if (con->ops->sign_message)
+			con->ops->sign_message(con, m);
+		else
+			m->footer.sig = 0;
+		con->out_kvec[v].iov_len = sizeof(m->footer);
+		con->out_kvec_bytes += sizeof(m->footer);
+	} else {
+		m->old_footer.flags = m->footer.flags;
+		con->out_kvec[v].iov_len = sizeof(m->old_footer);
+		con->out_kvec_bytes += sizeof(m->old_footer);
+	}
 	con->out_kvec_left++;
 	con->out_more = m->more_to_follow;
 	con->out_msg_done = true;
@@ -2249,6 +2259,7 @@
 	int ret;
 	unsigned int front_len, middle_len, data_len;
 	bool do_datacrc = !con->msgr->nocrc;
+	bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
 	u64 seq;
 	u32 crc;
 
@@ -2361,12 +2372,21 @@
 	}
 
 	/* footer */
-	size = sizeof (m->footer);
+	if (need_sign)
+		size = sizeof(m->footer);
+	else
+		size = sizeof(m->old_footer);
+
 	end += size;
 	ret = read_partial(con, end, size, &m->footer);
 	if (ret <= 0)
 		return ret;
 
+	if (!need_sign) {
+		m->footer.flags = m->old_footer.flags;
+		m->footer.sig = 0;
+	}
+
 	dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
 	     m, front_len, m->footer.front_crc, middle_len,
 	     m->footer.middle_crc, data_len, m->footer.data_crc);
@@ -2390,6 +2410,12 @@
 		return -EBADMSG;
 	}
 
+	if (need_sign && con->ops->check_message_signature &&
+	    con->ops->check_message_signature(con, m)) {
+		pr_err("read_partial_message %p signature check failed\n", m);
+		return -EBADMSG;
+	}
+
 	return 1; /* done! */
 }
 
@@ -3288,7 +3314,7 @@
 static void ceph_msg_free(struct ceph_msg *m)
 {
 	dout("%s %p\n", __func__, m);
-	ceph_kvfree(m->front.iov_base);
+	kvfree(m->front.iov_base);
 	kmem_cache_free(ceph_msg_cache, m);
 }
 
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 6f16428..53299c7 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -292,6 +292,10 @@
 		ceph_osd_data_release(&op->cls.request_data);
 		ceph_osd_data_release(&op->cls.response_data);
 		break;
+	case CEPH_OSD_OP_SETXATTR:
+	case CEPH_OSD_OP_CMPXATTR:
+		ceph_osd_data_release(&op->xattr.osd_data);
+		break;
 	default:
 		break;
 	}
@@ -476,8 +480,7 @@
 	size_t payload_len = 0;
 
 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
-	       opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
-	       opcode != CEPH_OSD_OP_TRUNCATE);
+	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE);
 
 	op->extent.offset = offset;
 	op->extent.length = length;
@@ -545,6 +548,39 @@
 }
 EXPORT_SYMBOL(osd_req_op_cls_init);
 
+int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
+			  u16 opcode, const char *name, const void *value,
+			  size_t size, u8 cmp_op, u8 cmp_mode)
+{
+	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+	struct ceph_pagelist *pagelist;
+	size_t payload_len;
+
+	BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
+
+	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+	if (!pagelist)
+		return -ENOMEM;
+
+	ceph_pagelist_init(pagelist);
+
+	payload_len = strlen(name);
+	op->xattr.name_len = payload_len;
+	ceph_pagelist_append(pagelist, name, payload_len);
+
+	op->xattr.value_len = size;
+	ceph_pagelist_append(pagelist, value, size);
+	payload_len += size;
+
+	op->xattr.cmp_op = cmp_op;
+	op->xattr.cmp_mode = cmp_mode;
+
+	ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
+	op->payload_len = payload_len;
+	return 0;
+}
+EXPORT_SYMBOL(osd_req_op_xattr_init);
+
 void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
 				unsigned int which, u16 opcode,
 				u64 cookie, u64 version, int flag)
@@ -626,7 +662,6 @@
 	case CEPH_OSD_OP_READ:
 	case CEPH_OSD_OP_WRITE:
 	case CEPH_OSD_OP_ZERO:
-	case CEPH_OSD_OP_DELETE:
 	case CEPH_OSD_OP_TRUNCATE:
 		if (src->op == CEPH_OSD_OP_WRITE)
 			request_data_len = src->extent.length;
@@ -676,6 +711,19 @@
 		dst->alloc_hint.expected_write_size =
 		    cpu_to_le64(src->alloc_hint.expected_write_size);
 		break;
+	case CEPH_OSD_OP_SETXATTR:
+	case CEPH_OSD_OP_CMPXATTR:
+		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
+		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
+		dst->xattr.cmp_op = src->xattr.cmp_op;
+		dst->xattr.cmp_mode = src->xattr.cmp_mode;
+		osd_data = &src->xattr.osd_data;
+		ceph_osdc_msg_data_add(req->r_request, osd_data);
+		request_data_len = osd_data->pagelist->length;
+		break;
+	case CEPH_OSD_OP_CREATE:
+	case CEPH_OSD_OP_DELETE:
+		break;
 	default:
 		pr_err("unsupported osd opcode %s\n",
 			ceph_osd_op_name(src->op));
@@ -705,7 +753,8 @@
 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 					       struct ceph_file_layout *layout,
 					       struct ceph_vino vino,
-					       u64 off, u64 *plen, int num_ops,
+					       u64 off, u64 *plen,
+					       unsigned int which, int num_ops,
 					       int opcode, int flags,
 					       struct ceph_snap_context *snapc,
 					       u32 truncate_seq,
@@ -716,13 +765,11 @@
 	u64 objnum = 0;
 	u64 objoff = 0;
 	u64 objlen = 0;
-	u32 object_size;
-	u64 object_base;
 	int r;
 
 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
-	       opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
-	       opcode != CEPH_OSD_OP_TRUNCATE);
+	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
+	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
 
 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
 					GFP_NOFS);
@@ -738,29 +785,24 @@
 		return ERR_PTR(r);
 	}
 
-	object_size = le32_to_cpu(layout->fl_object_size);
-	object_base = off - objoff;
-	if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
-		if (truncate_size <= object_base) {
-			truncate_size = 0;
-		} else {
-			truncate_size -= object_base;
-			if (truncate_size > object_size)
-				truncate_size = object_size;
+	if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
+		osd_req_op_init(req, which, opcode);
+	} else {
+		u32 object_size = le32_to_cpu(layout->fl_object_size);
+		u32 object_base = off - objoff;
+		if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
+			if (truncate_size <= object_base) {
+				truncate_size = 0;
+			} else {
+				truncate_size -= object_base;
+				if (truncate_size > object_size)
+					truncate_size = object_size;
+			}
 		}
+		osd_req_op_extent_init(req, which, opcode, objoff, objlen,
+				       truncate_size, truncate_seq);
 	}
 
-	osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
-				truncate_size, truncate_seq);
-
-	/*
-	 * A second op in the ops array means the caller wants to
-	 * also issue a include a 'startsync' command so that the
-	 * osd will flush data quickly.
-	 */
-	if (num_ops > 1)
-		osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
-
 	req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
 
 	snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name),
@@ -2626,7 +2668,7 @@
 
 	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
 	     vino.snap, off, *plen);
-	req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1,
+	req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
 				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
 				    NULL, truncate_seq, truncate_size,
 				    false);
@@ -2669,7 +2711,7 @@
 	int page_align = off & ~PAGE_MASK;
 
 	BUG_ON(vino.snap != CEPH_NOSNAP);	/* snapshots aren't writeable */
-	req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1,
+	req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
 				    CEPH_OSD_OP_WRITE,
 				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
 				    snapc, truncate_seq, truncate_size,
@@ -2920,6 +2962,20 @@
 	return ceph_monc_validate_auth(&osdc->client->monc);
 }
 
+static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
+{
+	struct ceph_osd *o = con->private;
+	struct ceph_auth_handshake *auth = &o->o_auth;
+	return ceph_auth_sign_message(auth, msg);
+}
+
+static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
+{
+	struct ceph_osd *o = con->private;
+	struct ceph_auth_handshake *auth = &o->o_auth;
+	return ceph_auth_check_message_signature(auth, msg);
+}
+
 static const struct ceph_connection_operations osd_con_ops = {
 	.get = get_osd_con,
 	.put = put_osd_con,
@@ -2928,5 +2984,7 @@
 	.verify_authorizer_reply = verify_authorizer_reply,
 	.invalidate_authorizer = invalidate_authorizer,
 	.alloc_msg = alloc_msg,
+	.sign_message = sign_message,
+	.check_message_signature = check_message_signature,
 	.fault = osd_reset,
 };