Browse Source

Merge tag 'ceph-for-4.20-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "The highlights are:

   - a series that fixes some old memory allocation issues in libceph
     (myself). We no longer allocate memory in places where allocation
     failures cannot be handled and BUG when the allocation fails.

   - support for copy_file_range() syscall (Luis Henriques). If size and
     alignment conditions are met, it leverages RADOS copy-from
     operation. Otherwise, a local copy is performed.

   - a patch that reduces memory requirement of ceph_sync_read() from
     the size of the entire read to the size of one object (Zheng Yan).

   - fallocate() syscall is now restricted to FALLOC_FL_PUNCH_HOLE (Luis
     Henriques)"

* tag 'ceph-for-4.20-rc1' of git://github.com/ceph/ceph-client: (25 commits)
  ceph: new mount option to disable usage of copy-from op
  ceph: support copy_file_range file operation
  libceph: support the RADOS copy-from operation
  ceph: add non-blocking parameter to ceph_try_get_caps()
  libceph: check reply num_data_items in setup_request_data()
  libceph: preallocate message data items
  libceph, rbd, ceph: move ceph_osdc_alloc_messages() calls
  libceph: introduce alloc_watch_request()
  libceph: assign cookies in linger_submit()
  libceph: enable fallback to ceph_msg_new() in ceph_msgpool_get()
  ceph: num_ops is off by one in ceph_aio_retry_work()
  libceph: no need to call osd_req_opcode_valid() in osd_req_encode_op()
  ceph: set timeout conditionally in __cap_delay_requeue
  libceph: don't consume a ref on pagelist in ceph_msg_data_add_pagelist()
  libceph: introduce ceph_pagelist_alloc()
  libceph: osd_req_op_cls_init() doesn't need to take opcode
  libceph: bump CEPH_MSG_MAX_DATA_LEN
  ceph: only allow punch hole mode in fallocate
  ceph: refactor ceph_sync_read()
  ceph: check if LOOKUPNAME request was aborted when filling trace
  ...
Linus Torvalds 6 years ago
parent
commit
31990f0f53

+ 5 - 0
Documentation/filesystems/ceph.txt

@@ -151,6 +151,11 @@ Mount Options
         Report overall filesystem usage in statfs instead of using the root
         directory quota.
 
+  nocopyfrom
+        Don't use the RADOS 'copy-from' operation to perform remote object
+        copies.  Currently, it's only used in copy_file_range, which will revert
+        to the default VFS implementation if this option is used.
+
 More Information
 ================
 

+ 16 - 12
drivers/block/rbd.c

@@ -1500,9 +1500,6 @@ rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
 			rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
 		goto err_req;
 
-	if (ceph_osdc_alloc_messages(req, GFP_NOIO))
-		goto err_req;
-
 	return req;
 
 err_req:
@@ -1945,6 +1942,10 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
 		}
 		if (ret)
 			return ret;
+
+		ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+		if (ret)
+			return ret;
 	}
 
 	return 0;
@@ -2374,8 +2375,7 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
 	if (!obj_req->osd_req)
 		return -ENOMEM;
 
-	ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
-				  "copyup");
+	ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup");
 	if (ret)
 		return ret;
 
@@ -2405,6 +2405,10 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
 		rbd_assert(0);
 	}
 
+	ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
+	if (ret)
+		return ret;
+
 	rbd_obj_request_submit(obj_req);
 	return 0;
 }
@@ -3784,10 +3788,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
 	ceph_oloc_copy(&req->r_base_oloc, oloc);
 	req->r_flags = CEPH_OSD_FLAG_READ;
 
-	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
-	if (ret)
-		goto out_req;
-
 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
 	if (IS_ERR(pages)) {
 		ret = PTR_ERR(pages);
@@ -3798,6 +3798,10 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
 	osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
 					 true);
 
+	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+	if (ret)
+		goto out_req;
+
 	ceph_osdc_start_request(osdc, req, false);
 	ret = ceph_osdc_wait_request(osdc, req);
 	if (ret >= 0)
@@ -6067,7 +6071,7 @@ static ssize_t rbd_remove_single_major(struct bus_type *bus,
  * create control files in sysfs
  * /sys/bus/rbd/...
  */
-static int rbd_sysfs_init(void)
+static int __init rbd_sysfs_init(void)
 {
 	int ret;
 
@@ -6082,13 +6086,13 @@ static int rbd_sysfs_init(void)
 	return ret;
 }
 
-static void rbd_sysfs_cleanup(void)
+static void __exit rbd_sysfs_cleanup(void)
 {
 	bus_unregister(&rbd_bus_type);
 	device_unregister(&rbd_root_dev);
 }
 
-static int rbd_slab_init(void)
+static int __init rbd_slab_init(void)
 {
 	rbd_assert(!rbd_img_request_cache);
 	rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);

+ 6 - 7
fs/ceph/acl.c

@@ -104,6 +104,11 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
 	struct timespec64 old_ctime = inode->i_ctime;
 	umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
 
+	if (ceph_snap(inode) != CEPH_NOSNAP) {
+		ret = -EROFS;
+		goto out;
+	}
+
 	switch (type) {
 	case ACL_TYPE_ACCESS:
 		name = XATTR_NAME_POSIX_ACL_ACCESS;
@@ -138,11 +143,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type)
 			goto out_free;
 	}
 
-	if (ceph_snap(inode) != CEPH_NOSNAP) {
-		ret = -EROFS;
-		goto out_free;
-	}
-
 	if (new_mode != old_mode) {
 		newattrs.ia_ctime = current_time(inode);
 		newattrs.ia_mode = new_mode;
@@ -206,10 +206,9 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
 	tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL);
 	if (!tmp_buf)
 		goto out_err;
-	pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL);
+	pagelist = ceph_pagelist_alloc(GFP_KERNEL);
 	if (!pagelist)
 		goto out_err;
-	ceph_pagelist_init(pagelist);
 
 	err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
 	if (err)

+ 1 - 1
fs/ceph/addr.c

@@ -322,7 +322,7 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
 		/* caller of readpages does not hold buffer and read caps
 		 * (fadvise, madvise and readahead cases) */
 		int want = CEPH_CAP_FILE_CACHE;
-		ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
+		ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, true, &got);
 		if (ret < 0) {
 			dout("start_read %p, error getting cap\n", inode);
 		} else if (!(got & want)) {

+ 12 - 9
fs/ceph/caps.c

@@ -519,9 +519,9 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
  *    -> we take mdsc->cap_delay_lock
  */
 static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
-				struct ceph_inode_info *ci)
+				struct ceph_inode_info *ci,
+				bool set_timeout)
 {
-	__cap_set_timeouts(mdsc, ci);
 	dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode,
 	     ci->i_ceph_flags, ci->i_hold_caps_max);
 	if (!mdsc->stopping) {
@@ -531,6 +531,8 @@ static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
 				goto no_change;
 			list_del_init(&ci->i_cap_delay_list);
 		}
+		if (set_timeout)
+			__cap_set_timeouts(mdsc, ci);
 		list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
 no_change:
 		spin_unlock(&mdsc->cap_delay_lock);
@@ -720,7 +722,7 @@ void ceph_add_cap(struct inode *inode,
 		dout(" issued %s, mds wanted %s, actual %s, queueing\n",
 		     ceph_cap_string(issued), ceph_cap_string(wanted),
 		     ceph_cap_string(actual_wanted));
-		__cap_delay_requeue(mdsc, ci);
+		__cap_delay_requeue(mdsc, ci, true);
 	}
 
 	if (flags & CEPH_CAP_FLAG_AUTH) {
@@ -1647,7 +1649,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
 	if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
 	    (mask & CEPH_CAP_FILE_BUFFER))
 		dirty |= I_DIRTY_DATASYNC;
-	__cap_delay_requeue(mdsc, ci);
+	__cap_delay_requeue(mdsc, ci, true);
 	return dirty;
 }
 
@@ -2065,7 +2067,7 @@ ack:
 
 	/* Reschedule delayed caps release if we delayed anything */
 	if (delayed)
-		__cap_delay_requeue(mdsc, ci);
+		__cap_delay_requeue(mdsc, ci, false);
 
 	spin_unlock(&ci->i_ceph_lock);
 
@@ -2125,7 +2127,7 @@ retry:
 
 		if (delayed) {
 			spin_lock(&ci->i_ceph_lock);
-			__cap_delay_requeue(mdsc, ci);
+			__cap_delay_requeue(mdsc, ci, true);
 			spin_unlock(&ci->i_ceph_lock);
 		}
 	} else {
@@ -2671,17 +2673,18 @@ static void check_max_size(struct inode *inode, loff_t endoff)
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 }
 
-int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
+int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want,
+		      bool nonblock, int *got)
 {
 	int ret, err = 0;
 
 	BUG_ON(need & ~CEPH_CAP_FILE_RD);
-	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
+	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED));
 	ret = ceph_pool_perm_check(ci, need);
 	if (ret < 0)
 		return ret;
 
-	ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
+	ret = try_get_cap_refs(ci, need, want, 0, nonblock, got, &err);
 	if (ret) {
 		if (err == -EAGAIN) {
 			ret = 0;

+ 417 - 156
fs/ceph/file.c

@@ -1,5 +1,6 @@
 // SPDX-License-Identifier: GPL-2.0
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/striper.h>
 
 #include <linux/module.h>
 #include <linux/sched.h>
@@ -556,91 +557,27 @@ enum {
 	READ_INLINE =  3,
 };
 
-/*
- * Read a range of bytes striped over one or more objects.  Iterate over
- * objects we stripe over.  (That's not atomic, but good enough for now.)
- *
- * If we get a short result from the OSD, check against i_size; we need to
- * only return a short read to the caller if we hit EOF.
- */
-static int striped_read(struct inode *inode,
-			u64 pos, u64 len,
-			struct page **pages, int num_pages,
-			int page_align, int *checkeof)
-{
-	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	u64 this_len;
-	loff_t i_size;
-	int page_idx;
-	int ret, read = 0;
-	bool hit_stripe, was_short;
-
-	/*
-	 * we may need to do multiple reads.  not atomic, unfortunately.
-	 */
-more:
-	this_len = len;
-	page_idx = (page_align + read) >> PAGE_SHIFT;
-	ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
-				  &ci->i_layout, pos, &this_len,
-				  ci->i_truncate_seq, ci->i_truncate_size,
-				  pages + page_idx, num_pages - page_idx,
-				  ((page_align + read) & ~PAGE_MASK));
-	if (ret == -ENOENT)
-		ret = 0;
-	hit_stripe = this_len < len;
-	was_short = ret >= 0 && ret < this_len;
-	dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
-	     ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
-
-	i_size = i_size_read(inode);
-	if (ret >= 0) {
-		if (was_short && (pos + ret < i_size)) {
-			int zlen = min(this_len - ret, i_size - pos - ret);
-			int zoff = page_align + read + ret;
-			dout(" zero gap %llu to %llu\n",
-			     pos + ret, pos + ret + zlen);
-			ceph_zero_page_vector_range(zoff, zlen, pages);
-			ret += zlen;
-		}
-
-		read += ret;
-		pos += ret;
-		len -= ret;
-
-		/* hit stripe and need continue*/
-		if (len && hit_stripe && pos < i_size)
-			goto more;
-	}
-
-	if (read > 0) {
-		ret = read;
-		/* did we bounce off eof? */
-		if (pos + len > i_size)
-			*checkeof = CHECK_EOF;
-	}
-
-	dout("striped_read returns %d\n", ret);
-	return ret;
-}
-
 /*
  * Completely synchronous read and write methods.  Direct from __user
  * buffer to osd, or directly to user pages (if O_DIRECT).
  *
- * If the read spans object boundary, just do multiple reads.
+ * If the read spans object boundary, just do multiple reads.  (That's not
+ * atomic, but good enough for now.)
+ *
+ * If we get a short result from the OSD, check against i_size; we need to
+ * only return a short read to the caller if we hit EOF.
  */
 static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
-			      int *checkeof)
+			      int *retry_op)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
-	struct page **pages;
-	u64 off = iocb->ki_pos;
-	int num_pages;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_osd_client *osdc = &fsc->client->osdc;
 	ssize_t ret;
-	size_t len = iov_iter_count(to);
+	u64 off = iocb->ki_pos;
+	u64 len = iov_iter_count(to);
 
 	dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
 	     (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
@@ -653,61 +590,118 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 	 * but it will at least behave sensibly when they are
 	 * in sequence.
 	 */
-	ret = filemap_write_and_wait_range(inode->i_mapping, off,
-						off + len);
+	ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len);
 	if (ret < 0)
 		return ret;
 
-	if (unlikely(to->type & ITER_PIPE)) {
+	ret = 0;
+	while ((len = iov_iter_count(to)) > 0) {
+		struct ceph_osd_request *req;
+		struct page **pages;
+		int num_pages;
 		size_t page_off;
-		ret = iov_iter_get_pages_alloc(to, &pages, len,
-					       &page_off);
-		if (ret <= 0)
-			return -ENOMEM;
-		num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+		u64 i_size;
+		bool more;
+
+		req = ceph_osdc_new_request(osdc, &ci->i_layout,
+					ci->i_vino, off, &len, 0, 1,
+					CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+					NULL, ci->i_truncate_seq,
+					ci->i_truncate_size, false);
+		if (IS_ERR(req)) {
+			ret = PTR_ERR(req);
+			break;
+		}
+
+		more = len < iov_iter_count(to);
 
-		ret = striped_read(inode, off, ret, pages, num_pages,
-				   page_off, checkeof);
-		if (ret > 0) {
-			iov_iter_advance(to, ret);
-			off += ret;
+		if (unlikely(to->type & ITER_PIPE)) {
+			ret = iov_iter_get_pages_alloc(to, &pages, len,
+						       &page_off);
+			if (ret <= 0) {
+				ceph_osdc_put_request(req);
+				ret = -ENOMEM;
+				break;
+			}
+			num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+			if (ret < len) {
+				len = ret;
+				osd_req_op_extent_update(req, 0, len);
+				more = false;
+			}
 		} else {
-			iov_iter_advance(to, 0);
+			num_pages = calc_pages_for(off, len);
+			page_off = off & ~PAGE_MASK;
+			pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+			if (IS_ERR(pages)) {
+				ceph_osdc_put_request(req);
+				ret = PTR_ERR(pages);
+				break;
+			}
 		}
-		ceph_put_page_vector(pages, num_pages, false);
-	} else {
-		num_pages = calc_pages_for(off, len);
-		pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-		if (IS_ERR(pages))
-			return PTR_ERR(pages);
-
-		ret = striped_read(inode, off, len, pages, num_pages,
-				   (off & ~PAGE_MASK), checkeof);
-		if (ret > 0) {
-			int l, k = 0;
-			size_t left = ret;
-
-			while (left) {
-				size_t page_off = off & ~PAGE_MASK;
-				size_t copy = min_t(size_t, left,
-						    PAGE_SIZE - page_off);
-				l = copy_page_to_iter(pages[k++], page_off,
-						      copy, to);
-				off += l;
-				left -= l;
-				if (l < copy)
+
+		osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off,
+						 false, false);
+		ret = ceph_osdc_start_request(osdc, req, false);
+		if (!ret)
+			ret = ceph_osdc_wait_request(osdc, req);
+		ceph_osdc_put_request(req);
+
+		i_size = i_size_read(inode);
+		dout("sync_read %llu~%llu got %zd i_size %llu%s\n",
+		     off, len, ret, i_size, (more ? " MORE" : ""));
+
+		if (ret == -ENOENT)
+			ret = 0;
+		if (ret >= 0 && ret < len && (off + ret < i_size)) {
+			int zlen = min(len - ret, i_size - off - ret);
+			int zoff = page_off + ret;
+			dout("sync_read zero gap %llu~%llu\n",
+                             off + ret, off + ret + zlen);
+			ceph_zero_page_vector_range(zoff, zlen, pages);
+			ret += zlen;
+		}
+
+		if (unlikely(to->type & ITER_PIPE)) {
+			if (ret > 0) {
+				iov_iter_advance(to, ret);
+				off += ret;
+			} else {
+				iov_iter_advance(to, 0);
+			}
+			ceph_put_page_vector(pages, num_pages, false);
+		} else {
+			int idx = 0;
+			size_t left = ret > 0 ? ret : 0;
+			while (left > 0) {
+				size_t len, copied;
+				page_off = off & ~PAGE_MASK;
+				len = min_t(size_t, left, PAGE_SIZE - page_off);
+				copied = copy_page_to_iter(pages[idx++],
+							   page_off, len, to);
+				off += copied;
+				left -= copied;
+				if (copied < len) {
+					ret = -EFAULT;
 					break;
+				}
 			}
+			ceph_release_page_vector(pages, num_pages);
 		}
-		ceph_release_page_vector(pages, num_pages);
+
+		if (ret <= 0 || off >= i_size || !more)
+			break;
 	}
 
 	if (off > iocb->ki_pos) {
+		if (ret >= 0 &&
+		    iov_iter_count(to) > 0 && off >= i_size_read(inode))
+			*retry_op = CHECK_EOF;
 		ret = off - iocb->ki_pos;
 		iocb->ki_pos = off;
 	}
 
-	dout("sync_read result %zd\n", ret);
+	dout("sync_read result %zd retry_op %d\n", ret, *retry_op);
 	return ret;
 }
 
@@ -865,7 +859,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
 	}
 	spin_unlock(&ci->i_ceph_lock);
 
-	req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
+	req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1,
 			false, GFP_NOFS);
 	if (!req) {
 		ret = -ENOMEM;
@@ -877,6 +871,11 @@ static void ceph_aio_retry_work(struct work_struct *work)
 	ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
 	ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
 
+	req->r_ops[0] = orig_req->r_ops[0];
+
+	req->r_mtime = aio_req->mtime;
+	req->r_data_offset = req->r_ops[0].extent.offset;
+
 	ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
 	if (ret) {
 		ceph_osdc_put_request(req);
@@ -884,11 +883,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
 		goto out;
 	}
 
-	req->r_ops[0] = orig_req->r_ops[0];
-
-	req->r_mtime = aio_req->mtime;
-	req->r_data_offset = req->r_ops[0].extent.offset;
-
 	ceph_osdc_put_request(orig_req);
 
 	req->r_callback = ceph_aio_complete_req;
@@ -1735,7 +1729,6 @@ static long ceph_fallocate(struct file *file, int mode,
 	struct ceph_file_info *fi = file->private_data;
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_cap_flush *prealloc_cf;
 	int want, got = 0;
 	int dirty;
@@ -1743,10 +1736,7 @@ static long ceph_fallocate(struct file *file, int mode,
 	loff_t endoff = 0;
 	loff_t size;
 
-	if ((offset + length) > max(i_size_read(inode), fsc->max_file_size))
-		return -EFBIG;
-
-	if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
+	if (mode != (FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE))
 		return -EOPNOTSUPP;
 
 	if (!S_ISREG(inode->i_mode))
@@ -1763,18 +1753,6 @@ static long ceph_fallocate(struct file *file, int mode,
 		goto unlock;
 	}
 
-	if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) &&
-	    ceph_quota_is_max_bytes_exceeded(inode, offset + length)) {
-		ret = -EDQUOT;
-		goto unlock;
-	}
-
-	if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL) &&
-	    !(mode & FALLOC_FL_PUNCH_HOLE)) {
-		ret = -ENOSPC;
-		goto unlock;
-	}
-
 	if (ci->i_inline_version != CEPH_INLINE_NONE) {
 		ret = ceph_uninline_data(file, NULL);
 		if (ret < 0)
@@ -1782,12 +1760,12 @@ static long ceph_fallocate(struct file *file, int mode,
 	}
 
 	size = i_size_read(inode);
-	if (!(mode & FALLOC_FL_KEEP_SIZE)) {
-		endoff = offset + length;
-		ret = inode_newsize_ok(inode, endoff);
-		if (ret)
-			goto unlock;
-	}
+
+	/* Are we punching a hole beyond EOF? */
+	if (offset >= size)
+		goto unlock;
+	if ((offset + length) > size)
+		length = size - offset;
 
 	if (fi->fmode & CEPH_FILE_MODE_LAZY)
 		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
@@ -1798,16 +1776,8 @@ static long ceph_fallocate(struct file *file, int mode,
 	if (ret < 0)
 		goto unlock;
 
-	if (mode & FALLOC_FL_PUNCH_HOLE) {
-		if (offset < size)
-			ceph_zero_pagecache_range(inode, offset, length);
-		ret = ceph_zero_objects(inode, offset, length);
-	} else if (endoff > size) {
-		truncate_pagecache_range(inode, size, -1);
-		if (ceph_inode_set_size(inode, endoff))
-			ceph_check_caps(ceph_inode(inode),
-				CHECK_CAPS_AUTHONLY, NULL);
-	}
+	ceph_zero_pagecache_range(inode, offset, length);
+	ret = ceph_zero_objects(inode, offset, length);
 
 	if (!ret) {
 		spin_lock(&ci->i_ceph_lock);
@@ -1817,9 +1787,6 @@ static long ceph_fallocate(struct file *file, int mode,
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
-		if ((endoff > size) &&
-		    ceph_quota_is_max_bytes_approaching(inode, endoff))
-			ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL);
 	}
 
 	ceph_put_cap_refs(ci, got);
@@ -1829,6 +1796,300 @@ unlock:
 	return ret;
 }
 
+/*
+ * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
+ * src_ci.  Two attempts are made to obtain both caps, and an error is return if
+ * this fails; zero is returned on success.
+ */
+static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
+			  loff_t src_endoff, int *src_got,
+			  struct ceph_inode_info *dst_ci,
+			  loff_t dst_endoff, int *dst_got)
+{
+	int ret = 0;
+	bool retrying = false;
+
+retry_caps:
+	ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+			    dst_endoff, dst_got, NULL);
+	if (ret < 0)
+		return ret;
+
+	/*
+	 * Since we're already holding the FILE_WR capability for the dst file,
+	 * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
+	 * retry dance instead to try to get both capabilities.
+	 */
+	ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+				false, src_got);
+	if (ret <= 0) {
+		/* Start by dropping dst_ci caps and getting src_ci caps */
+		ceph_put_cap_refs(dst_ci, *dst_got);
+		if (retrying) {
+			if (!ret)
+				/* ceph_try_get_caps masks EAGAIN */
+				ret = -EAGAIN;
+			return ret;
+		}
+		ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
+				    CEPH_CAP_FILE_SHARED, src_endoff,
+				    src_got, NULL);
+		if (ret < 0)
+			return ret;
+		/*... drop src_ci caps too, and retry */
+		ceph_put_cap_refs(src_ci, *src_got);
+		retrying = true;
+		goto retry_caps;
+	}
+	return ret;
+}
+
+static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
+			   struct ceph_inode_info *dst_ci, int dst_got)
+{
+	ceph_put_cap_refs(src_ci, src_got);
+	ceph_put_cap_refs(dst_ci, dst_got);
+}
+
+/*
+ * This function does several size-related checks, returning an error if:
+ *  - source file is smaller than off+len
+ *  - destination file size is not OK (inode_newsize_ok())
+ *  - max bytes quotas is exceeded
+ */
+static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
+			   loff_t src_off, loff_t dst_off, size_t len)
+{
+	loff_t size, endoff;
+
+	size = i_size_read(src_inode);
+	/*
+	 * Don't copy beyond source file EOF.  Instead of simply setting length
+	 * to (size - src_off), just drop to VFS default implementation, as the
+	 * local i_size may be stale due to other clients writing to the source
+	 * inode.
+	 */
+	if (src_off + len > size) {
+		dout("Copy beyond EOF (%llu + %zu > %llu)\n",
+		     src_off, len, size);
+		return -EOPNOTSUPP;
+	}
+	size = i_size_read(dst_inode);
+
+	endoff = dst_off + len;
+	if (inode_newsize_ok(dst_inode, endoff))
+		return -EOPNOTSUPP;
+
+	if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
+		return -EDQUOT;
+
+	return 0;
+}
+
+static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
+				    struct file *dst_file, loff_t dst_off,
+				    size_t len, unsigned int flags)
+{
+	struct inode *src_inode = file_inode(src_file);
+	struct inode *dst_inode = file_inode(dst_file);
+	struct ceph_inode_info *src_ci = ceph_inode(src_inode);
+	struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
+	struct ceph_cap_flush *prealloc_cf;
+	struct ceph_object_locator src_oloc, dst_oloc;
+	struct ceph_object_id src_oid, dst_oid;
+	loff_t endoff = 0, size;
+	ssize_t ret = -EIO;
+	u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
+	u32 src_objlen, dst_objlen, object_size;
+	int src_got = 0, dst_got = 0, err, dirty;
+	bool do_final_copy = false;
+
+	if (src_inode == dst_inode)
+		return -EINVAL;
+	if (ceph_snap(dst_inode) != CEPH_NOSNAP)
+		return -EROFS;
+
+	/*
+	 * Some of the checks below will return -EOPNOTSUPP, which will force a
+	 * fallback to the default VFS copy_file_range implementation.  This is
+	 * desirable in several cases (for ex, the 'len' is smaller than the
+	 * size of the objects, or in cases where that would be more
+	 * efficient).
+	 */
+
+	if (ceph_test_mount_opt(ceph_inode_to_client(src_inode), NOCOPYFROM))
+		return -EOPNOTSUPP;
+
+	if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
+	    (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
+	    (src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
+		return -EOPNOTSUPP;
+
+	if (len < src_ci->i_layout.object_size)
+		return -EOPNOTSUPP; /* no remote copy will be done */
+
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
+	/* Start by sync'ing the source file */
+	ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
+	if (ret < 0)
+		goto out;
+
+	/*
+	 * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
+	 * clients may have dirty data in their caches.  And OSDs know nothing
+	 * about caps, so they can't safely do the remote object copies.
+	 */
+	err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
+			     dst_ci, (dst_off + len), &dst_got);
+	if (err < 0) {
+		dout("get_rd_wr_caps returned %d\n", err);
+		ret = -EOPNOTSUPP;
+		goto out;
+	}
+
+	ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
+	if (ret < 0)
+		goto out_caps;
+
+	size = i_size_read(dst_inode);
+	endoff = dst_off + len;
+
+	/* Drop dst file cached pages */
+	ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
+					    dst_off >> PAGE_SHIFT,
+					    endoff >> PAGE_SHIFT);
+	if (ret < 0) {
+		dout("Failed to invalidate inode pages (%zd)\n", ret);
+		ret = 0; /* XXX */
+	}
+	src_oloc.pool = src_ci->i_layout.pool_id;
+	src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
+	dst_oloc.pool = dst_ci->i_layout.pool_id;
+	dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
+
+	ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+				      src_ci->i_layout.object_size,
+				      &src_objnum, &src_objoff, &src_objlen);
+	ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+				      dst_ci->i_layout.object_size,
+				      &dst_objnum, &dst_objoff, &dst_objlen);
+	/* object-level offsets need to the same */
+	if (src_objoff != dst_objoff) {
+		ret = -EOPNOTSUPP;
+		goto out_caps;
+	}
+
+	/*
+	 * Do a manual copy if the object offset isn't object aligned.
+	 * 'src_objlen' contains the bytes left until the end of the object,
+	 * starting at the src_off
+	 */
+	if (src_objoff) {
+		/*
+		 * we need to temporarily drop all caps as we'll be calling
+		 * {read,write}_iter, which will get caps again.
+		 */
+		put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+		ret = do_splice_direct(src_file, &src_off, dst_file,
+				       &dst_off, src_objlen, flags);
+		if (ret < 0) {
+			dout("do_splice_direct returned %d\n", err);
+			goto out;
+		}
+		len -= ret;
+		err = get_rd_wr_caps(src_ci, (src_off + len),
+				     &src_got, dst_ci,
+				     (dst_off + len), &dst_got);
+		if (err < 0)
+			goto out;
+		err = is_file_size_ok(src_inode, dst_inode,
+				      src_off, dst_off, len);
+		if (err < 0)
+			goto out_caps;
+	}
+	object_size = src_ci->i_layout.object_size;
+	while (len >= object_size) {
+		ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
+					      object_size, &src_objnum,
+					      &src_objoff, &src_objlen);
+		ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
+					      object_size, &dst_objnum,
+					      &dst_objoff, &dst_objlen);
+		ceph_oid_init(&src_oid);
+		ceph_oid_printf(&src_oid, "%llx.%08llx",
+				src_ci->i_vino.ino, src_objnum);
+		ceph_oid_init(&dst_oid);
+		ceph_oid_printf(&dst_oid, "%llx.%08llx",
+				dst_ci->i_vino.ino, dst_objnum);
+		/* Do an object remote copy */
+		err = ceph_osdc_copy_from(
+			&ceph_inode_to_client(src_inode)->client->osdc,
+			src_ci->i_vino.snap, 0,
+			&src_oid, &src_oloc,
+			CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+			CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
+			&dst_oid, &dst_oloc,
+			CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
+			CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
+		if (err) {
+			dout("ceph_osdc_copy_from returned %d\n", err);
+			if (!ret)
+				ret = err;
+			goto out_caps;
+		}
+		len -= object_size;
+		src_off += object_size;
+		dst_off += object_size;
+		ret += object_size;
+	}
+
+	if (len)
+		/* We still need one final local copy */
+		do_final_copy = true;
+
+	file_update_time(dst_file);
+	if (endoff > size) {
+		int caps_flags = 0;
+
+		/* Let the MDS know about dst file size change */
+		if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_NODELAY;
+		if (ceph_inode_set_size(dst_inode, endoff))
+			caps_flags |= CHECK_CAPS_AUTHONLY;
+		if (caps_flags)
+			ceph_check_caps(dst_ci, caps_flags, NULL);
+	}
+	/* Mark Fw dirty */
+	spin_lock(&dst_ci->i_ceph_lock);
+	dst_ci->i_inline_version = CEPH_INLINE_NONE;
+	dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
+	spin_unlock(&dst_ci->i_ceph_lock);
+	if (dirty)
+		__mark_inode_dirty(dst_inode, dirty);
+
+out_caps:
+	put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
+
+	if (do_final_copy) {
+		err = do_splice_direct(src_file, &src_off, dst_file,
+				       &dst_off, len, flags);
+		if (err < 0) {
+			dout("do_splice_direct returned %d\n", err);
+			goto out;
+		}
+		len -= err;
+		ret += err;
+	}
+
+out:
+	ceph_free_cap_flush(prealloc_cf);
+
+	return ret;
+}
+
 const struct file_operations ceph_file_fops = {
 	.open = ceph_open,
 	.release = ceph_release,
@@ -1844,5 +2105,5 @@ const struct file_operations ceph_file_fops = {
 	.unlocked_ioctl = ceph_ioctl,
 	.compat_ioctl	= ceph_ioctl,
 	.fallocate	= ceph_fallocate,
+	.copy_file_range = ceph_copy_file_range,
 };
-

+ 9 - 4
fs/ceph/inode.c

@@ -1132,8 +1132,12 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
 	if (IS_ERR(realdn)) {
 		pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
 		       PTR_ERR(realdn), dn, in, ceph_vinop(in));
-		dput(dn);
-		dn = realdn; /* note realdn contains the error */
+		dn = realdn;
+		/*
+		 * Caller should release 'dn' in the case of error.
+		 * If 'req->r_dentry' is passed to this function,
+		 * caller should leave 'req->r_dentry' untouched.
+		 */
 		goto out;
 	} else if (realdn) {
 		dout("dn %p (%d) spliced with %p (%d) "
@@ -1196,7 +1200,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
 			WARN_ON_ONCE(1);
 		}
 
-		if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) {
+		if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME &&
+		    test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
+		    !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
 			struct qstr dname;
 			struct dentry *dn, *parent;
 
@@ -1677,7 +1683,6 @@ retry_lookup:
 			if (IS_ERR(realdn)) {
 				err = PTR_ERR(realdn);
 				d_drop(dn);
-				dn = NULL;
 				goto next_item;
 			}
 			dn = realdn;

+ 4 - 5
fs/ceph/mds_client.c

@@ -2071,7 +2071,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
 	if (req->r_old_dentry_drop)
 		len += req->r_old_dentry->d_name.len;
 
-	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
+	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
 	if (!msg) {
 		msg = ERR_PTR(-ENOMEM);
 		goto out_free2;
@@ -2136,7 +2136,6 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
 
 	if (req->r_pagelist) {
 		struct ceph_pagelist *pagelist = req->r_pagelist;
-		refcount_inc(&pagelist->refcnt);
 		ceph_msg_data_add_pagelist(msg, pagelist);
 		msg->hdr.data_len = cpu_to_le32(pagelist->length);
 	} else {
@@ -3126,12 +3125,11 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 
 	pr_info("mds%d reconnect start\n", mds);
 
-	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+	pagelist = ceph_pagelist_alloc(GFP_NOFS);
 	if (!pagelist)
 		goto fail_nopagelist;
-	ceph_pagelist_init(pagelist);
 
-	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
+	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
 	if (!reply)
 		goto fail_nomsg;
 
@@ -3241,6 +3239,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 	mutex_unlock(&mdsc->mutex);
 
 	up_read(&mdsc->snap_rwsem);
+	ceph_pagelist_release(pagelist);
 	return;
 
 fail:

+ 13 - 0
fs/ceph/super.c

@@ -165,6 +165,8 @@ enum {
 	Opt_noacl,
 	Opt_quotadf,
 	Opt_noquotadf,
+	Opt_copyfrom,
+	Opt_nocopyfrom,
 };
 
 static match_table_t fsopt_tokens = {
@@ -203,6 +205,8 @@ static match_table_t fsopt_tokens = {
 	{Opt_noacl, "noacl"},
 	{Opt_quotadf, "quotadf"},
 	{Opt_noquotadf, "noquotadf"},
+	{Opt_copyfrom, "copyfrom"},
+	{Opt_nocopyfrom, "nocopyfrom"},
 	{-1, NULL}
 };
 
@@ -355,6 +359,12 @@ static int parse_fsopt_token(char *c, void *private)
 	case Opt_noquotadf:
 		fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF;
 		break;
+	case Opt_copyfrom:
+		fsopt->flags &= ~CEPH_MOUNT_OPT_NOCOPYFROM;
+		break;
+	case Opt_nocopyfrom:
+		fsopt->flags |= CEPH_MOUNT_OPT_NOCOPYFROM;
+		break;
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
 	case Opt_acl:
 		fsopt->sb_flags |= SB_POSIXACL;
@@ -553,6 +563,9 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 		seq_puts(m, ",noacl");
 #endif
 
+	if (fsopt->flags & CEPH_MOUNT_OPT_NOCOPYFROM)
+		seq_puts(m, ",nocopyfrom");
+
 	if (fsopt->mds_namespace)
 		seq_show_option(m, "mds_namespace", fsopt->mds_namespace);
 	if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)

+ 2 - 1
fs/ceph/super.h

@@ -40,6 +40,7 @@
 #define CEPH_MOUNT_OPT_NOPOOLPERM      (1<<11) /* no pool permission check */
 #define CEPH_MOUNT_OPT_MOUNTWAIT       (1<<12) /* mount waits if no mds is up */
 #define CEPH_MOUNT_OPT_NOQUOTADF       (1<<13) /* no root dir quota in statfs */
+#define CEPH_MOUNT_OPT_NOCOPYFROM      (1<<14) /* don't use RADOS 'copy-from' op */
 
 #define CEPH_MOUNT_OPT_DEFAULT    CEPH_MOUNT_OPT_DCACHE
 
@@ -1008,7 +1009,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
 extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
 			 loff_t endoff, int *got, struct page **pinned_page);
 extern int ceph_try_get_caps(struct ceph_inode_info *ci,
-			     int need, int want, int *got);
+			     int need, int want, bool nonblock, int *got);
 
 /* for counting open files by mode */
 extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);

+ 1 - 2
fs/ceph/xattr.c

@@ -951,11 +951,10 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,
 
 	if (size > 0) {
 		/* copy value into pagelist */
-		pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+		pagelist = ceph_pagelist_alloc(GFP_NOFS);
 		if (!pagelist)
 			return -ENOMEM;
 
-		ceph_pagelist_init(pagelist);
 		err = ceph_pagelist_append(pagelist, value, size);
 		if (err)
 			goto out;

+ 7 - 1
include/linux/ceph/libceph.h

@@ -81,7 +81,13 @@ struct ceph_options {
 
 #define CEPH_MSG_MAX_FRONT_LEN	(16*1024*1024)
 #define CEPH_MSG_MAX_MIDDLE_LEN	(16*1024*1024)
-#define CEPH_MSG_MAX_DATA_LEN	(16*1024*1024)
+
+/*
+ * Handle the largest possible rbd object in one message.
+ * There is no limit on the size of cephfs objects, but it has to obey
+ * rsize and wsize mount options anyway.
+ */
+#define CEPH_MSG_MAX_DATA_LEN	(32*1024*1024)
 
 #define CEPH_AUTH_NAME_DEFAULT   "guest"
 

+ 5 - 19
include/linux/ceph/messenger.h

@@ -82,22 +82,6 @@ enum ceph_msg_data_type {
 	CEPH_MSG_DATA_BVECS,	/* data source/destination is a bio_vec array */
 };
 
-static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
-{
-	switch (type) {
-	case CEPH_MSG_DATA_NONE:
-	case CEPH_MSG_DATA_PAGES:
-	case CEPH_MSG_DATA_PAGELIST:
-#ifdef CONFIG_BLOCK
-	case CEPH_MSG_DATA_BIO:
-#endif /* CONFIG_BLOCK */
-	case CEPH_MSG_DATA_BVECS:
-		return true;
-	default:
-		return false;
-	}
-}
-
 #ifdef CONFIG_BLOCK
 
 struct ceph_bio_iter {
@@ -181,7 +165,6 @@ struct ceph_bvec_iter {
 } while (0)
 
 struct ceph_msg_data {
-	struct list_head		links;	/* ceph_msg->data */
 	enum ceph_msg_data_type		type;
 	union {
 #ifdef CONFIG_BLOCK
@@ -202,7 +185,6 @@ struct ceph_msg_data {
 
 struct ceph_msg_data_cursor {
 	size_t			total_resid;	/* across all data items */
-	struct list_head	*data_head;	/* = &ceph_msg->data */
 
 	struct ceph_msg_data	*data;		/* current data item */
 	size_t			resid;		/* bytes not yet consumed */
@@ -240,7 +222,9 @@ struct ceph_msg {
 	struct ceph_buffer *middle;
 
 	size_t				data_length;
-	struct list_head		data;
+	struct ceph_msg_data		*data;
+	int				num_data_items;
+	int				max_data_items;
 	struct ceph_msg_data_cursor	cursor;
 
 	struct ceph_connection *con;
@@ -381,6 +365,8 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
 void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
 			     struct ceph_bvec_iter *bvec_pos);
 
+struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
+			       gfp_t flags, bool can_fail);
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 				     bool can_fail);
 

+ 6 - 5
include/linux/ceph/msgpool.h

@@ -13,14 +13,15 @@ struct ceph_msgpool {
 	mempool_t *pool;
 	int type;               /* preallocated message type */
 	int front_len;          /* preallocated payload size */
+	int max_data_items;
 };
 
-extern int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
-			     int front_len, int size, bool blocking,
-			     const char *name);
+int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
+		      int front_len, int max_data_items, int size,
+		      const char *name);
 extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
-extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *,
-					 int front_len);
+struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len,
+				  int max_data_items);
 extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *);
 
 #endif

+ 19 - 3
include/linux/ceph/osd_client.h

@@ -136,6 +136,13 @@ struct ceph_osd_req_op {
 			u64 expected_object_size;
 			u64 expected_write_size;
 		} alloc_hint;
+		struct {
+			u64 snapid;
+			u64 src_version;
+			u8 flags;
+			u32 src_fadvise_flags;
+			struct ceph_osd_data osd_data;
+		} copy_from;
 	};
 };
 
@@ -444,9 +451,8 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
 					struct page **pages, u64 length,
 					u32 alignment, bool pages_from_pool,
 					bool own_pages);
-extern int osd_req_op_cls_init(struct ceph_osd_request *osd_req,
-					unsigned int which, u16 opcode,
-					const char *class, const char *method);
+int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
+			const char *class, const char *method);
 extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 				 u16 opcode, const char *name, const void *value,
 				 size_t size, u8 cmp_op, u8 cmp_mode);
@@ -511,6 +517,16 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
 				struct timespec64 *mtime,
 				struct page **pages, int nr_pages);
 
+int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
+			u64 src_snapid, u64 src_version,
+			struct ceph_object_id *src_oid,
+			struct ceph_object_locator *src_oloc,
+			u32 src_fadvise_flags,
+			struct ceph_object_id *dst_oid,
+			struct ceph_object_locator *dst_oloc,
+			u32 dst_fadvise_flags,
+			u8 copy_from_flags);
+
 /* watch/notify */
 struct ceph_osd_linger_request *
 ceph_osdc_watch(struct ceph_osd_client *osdc,

+ 1 - 10
include/linux/ceph/pagelist.h

@@ -23,16 +23,7 @@ struct ceph_pagelist_cursor {
 	size_t room;		    /* room remaining to reset to */
 };
 
-static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
-{
-	INIT_LIST_HEAD(&pl->head);
-	pl->mapped_tail = NULL;
-	pl->length = 0;
-	pl->room = 0;
-	INIT_LIST_HEAD(&pl->free_list);
-	pl->num_pages_free = 0;
-	refcount_set(&pl->refcnt, 1);
-}
+struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags);
 
 extern void ceph_pagelist_release(struct ceph_pagelist *pl);
 

+ 28 - 0
include/linux/ceph/rados.h

@@ -410,6 +410,14 @@ enum {
 enum {
 	CEPH_OSD_OP_FLAG_EXCL = 1,      /* EXCL object create */
 	CEPH_OSD_OP_FLAG_FAILOK = 2,    /* continue despite failure */
+	CEPH_OSD_OP_FLAG_FADVISE_RANDOM	    = 0x4, /* the op is random */
+	CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL = 0x8, /* the op is sequential */
+	CEPH_OSD_OP_FLAG_FADVISE_WILLNEED   = 0x10,/* data will be accessed in
+						      the near future */
+	CEPH_OSD_OP_FLAG_FADVISE_DONTNEED   = 0x20,/* data will not be accessed
+						      in the near future */
+	CEPH_OSD_OP_FLAG_FADVISE_NOCACHE    = 0x40,/* data will be accessed only
+						      once by this client */
 };
 
 #define EOLDSNAPC    ERESTART  /* ORDERSNAP flag set; writer has old snapc*/
@@ -431,6 +439,15 @@ enum {
 	CEPH_OSD_CMPXATTR_MODE_U64    = 2
 };
 
+enum {
+	CEPH_OSD_COPY_FROM_FLAG_FLUSH = 1,       /* part of a flush operation */
+	CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY = 2, /* ignore pool overlay */
+	CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE = 4,   /* ignore osd cache logic */
+	CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE = 8, /* map snap direct to
+						     * cloneid */
+	CEPH_OSD_COPY_FROM_FLAG_RWORDERED = 16,     /* order with write */
+};
+
 enum {
 	CEPH_OSD_WATCH_OP_UNWATCH = 0,
 	CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
@@ -497,6 +514,17 @@ struct ceph_osd_op {
 			__le64 expected_object_size;
 			__le64 expected_write_size;
 		} __attribute__ ((packed)) alloc_hint;
+		struct {
+			__le64 snapid;
+			__le64 src_version;
+			__u8 flags; /* CEPH_OSD_COPY_FROM_FLAG_* */
+			/*
+			 * CEPH_OSD_OP_FLAG_FADVISE_*: fadvise flags
+			 * for src object, flags for dest object are in
+			 * ceph_osd_op::flags.
+			 */
+			__le32 src_fadvise_flags;
+		} __attribute__ ((packed)) copy_from;
 	};
 	__le32 payload_len;
 } __attribute__ ((packed));

+ 40 - 67
net/ceph/messenger.c

@@ -156,7 +156,6 @@ static bool con_flag_test_and_set(struct ceph_connection *con,
 /* Slab caches for frequently-allocated structures */
 
 static struct kmem_cache	*ceph_msg_cache;
-static struct kmem_cache	*ceph_msg_data_cache;
 
 /* static tag bytes (protocol control messages) */
 static char tag_msg = CEPH_MSGR_TAG_MSG;
@@ -235,23 +234,11 @@ static int ceph_msgr_slab_init(void)
 	if (!ceph_msg_cache)
 		return -ENOMEM;
 
-	BUG_ON(ceph_msg_data_cache);
-	ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0);
-	if (ceph_msg_data_cache)
-		return 0;
-
-	kmem_cache_destroy(ceph_msg_cache);
-	ceph_msg_cache = NULL;
-
-	return -ENOMEM;
+	return 0;
 }
 
 static void ceph_msgr_slab_exit(void)
 {
-	BUG_ON(!ceph_msg_data_cache);
-	kmem_cache_destroy(ceph_msg_data_cache);
-	ceph_msg_data_cache = NULL;
-
 	BUG_ON(!ceph_msg_cache);
 	kmem_cache_destroy(ceph_msg_cache);
 	ceph_msg_cache = NULL;
@@ -1141,16 +1128,13 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
 static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
 {
 	struct ceph_msg_data_cursor *cursor = &msg->cursor;
-	struct ceph_msg_data *data;
 
 	BUG_ON(!length);
 	BUG_ON(length > msg->data_length);
-	BUG_ON(list_empty(&msg->data));
+	BUG_ON(!msg->num_data_items);
 
-	cursor->data_head = &msg->data;
 	cursor->total_resid = length;
-	data = list_first_entry(&msg->data, struct ceph_msg_data, links);
-	cursor->data = data;
+	cursor->data = msg->data;
 
 	__ceph_msg_data_cursor_init(cursor);
 }
@@ -1231,8 +1215,7 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
 
 	if (!cursor->resid && cursor->total_resid) {
 		WARN_ON(!cursor->last_piece);
-		BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
-		cursor->data = list_next_entry(cursor->data, links);
+		cursor->data++;
 		__ceph_msg_data_cursor_init(cursor);
 		new_piece = true;
 	}
@@ -1248,9 +1231,6 @@ static size_t sizeof_footer(struct ceph_connection *con)
 
 static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
 {
-	BUG_ON(!msg);
-	BUG_ON(!data_len);
-
 	/* Initialize data cursor */
 
 	ceph_msg_data_cursor_init(msg, (size_t)data_len);
@@ -1590,7 +1570,7 @@ static int write_partial_message_data(struct ceph_connection *con)
 
 	dout("%s %p msg %p\n", __func__, con, msg);
 
-	if (list_empty(&msg->data))
+	if (!msg->num_data_items)
 		return -EINVAL;
 
 	/*
@@ -2347,8 +2327,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
 	u32 crc = 0;
 	int ret;
 
-	BUG_ON(!msg);
-	if (list_empty(&msg->data))
+	if (!msg->num_data_items)
 		return -EIO;
 
 	if (do_datacrc)
@@ -3256,32 +3235,16 @@ bool ceph_con_keepalive_expired(struct ceph_connection *con,
 	return false;
 }
 
-static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
+static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg)
 {
-	struct ceph_msg_data *data;
-
-	if (WARN_ON(!ceph_msg_data_type_valid(type)))
-		return NULL;
-
-	data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS);
-	if (!data)
-		return NULL;
-
-	data->type = type;
-	INIT_LIST_HEAD(&data->links);
-
-	return data;
+	BUG_ON(msg->num_data_items >= msg->max_data_items);
+	return &msg->data[msg->num_data_items++];
 }
 
 static void ceph_msg_data_destroy(struct ceph_msg_data *data)
 {
-	if (!data)
-		return;
-
-	WARN_ON(!list_empty(&data->links));
 	if (data->type == CEPH_MSG_DATA_PAGELIST)
 		ceph_pagelist_release(data->pagelist);
-	kmem_cache_free(ceph_msg_data_cache, data);
 }
 
 void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
@@ -3292,13 +3255,12 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
 	BUG_ON(!pages);
 	BUG_ON(!length);
 
-	data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
-	BUG_ON(!data);
+	data = ceph_msg_data_add(msg);
+	data->type = CEPH_MSG_DATA_PAGES;
 	data->pages = pages;
 	data->length = length;
 	data->alignment = alignment & ~PAGE_MASK;
 
-	list_add_tail(&data->links, &msg->data);
 	msg->data_length += length;
 }
 EXPORT_SYMBOL(ceph_msg_data_add_pages);
@@ -3311,11 +3273,11 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 	BUG_ON(!pagelist);
 	BUG_ON(!pagelist->length);
 
-	data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
-	BUG_ON(!data);
+	data = ceph_msg_data_add(msg);
+	data->type = CEPH_MSG_DATA_PAGELIST;
+	refcount_inc(&pagelist->refcnt);
 	data->pagelist = pagelist;
 
-	list_add_tail(&data->links, &msg->data);
 	msg->data_length += pagelist->length;
 }
 EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
@@ -3326,12 +3288,11 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
 {
 	struct ceph_msg_data *data;
 
-	data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
-	BUG_ON(!data);
+	data = ceph_msg_data_add(msg);
+	data->type = CEPH_MSG_DATA_BIO;
 	data->bio_pos = *bio_pos;
 	data->bio_length = length;
 
-	list_add_tail(&data->links, &msg->data);
 	msg->data_length += length;
 }
 EXPORT_SYMBOL(ceph_msg_data_add_bio);
@@ -3342,11 +3303,10 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
 {
 	struct ceph_msg_data *data;
 
-	data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS);
-	BUG_ON(!data);
+	data = ceph_msg_data_add(msg);
+	data->type = CEPH_MSG_DATA_BVECS;
 	data->bvec_pos = *bvec_pos;
 
-	list_add_tail(&data->links, &msg->data);
 	msg->data_length += bvec_pos->iter.bi_size;
 }
 EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
@@ -3355,8 +3315,8 @@ EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
  */
-struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
-			      bool can_fail)
+struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items,
+			       gfp_t flags, bool can_fail)
 {
 	struct ceph_msg *m;
 
@@ -3370,7 +3330,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 
 	INIT_LIST_HEAD(&m->list_head);
 	kref_init(&m->kref);
-	INIT_LIST_HEAD(&m->data);
 
 	/* front */
 	if (front_len) {
@@ -3385,6 +3344,15 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 	}
 	m->front_alloc_len = m->front.iov_len = front_len;
 
+	if (max_data_items) {
+		m->data = kmalloc_array(max_data_items, sizeof(*m->data),
+					flags);
+		if (!m->data)
+			goto out2;
+
+		m->max_data_items = max_data_items;
+	}
+
 	dout("ceph_msg_new %p front %d\n", m, front_len);
 	return m;
 
@@ -3401,6 +3369,13 @@ out:
 	}
 	return NULL;
 }
+EXPORT_SYMBOL(ceph_msg_new2);
+
+struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
+			      bool can_fail)
+{
+	return ceph_msg_new2(type, front_len, 0, flags, can_fail);
+}
 EXPORT_SYMBOL(ceph_msg_new);
 
 /*
@@ -3496,13 +3471,14 @@ static void ceph_msg_free(struct ceph_msg *m)
 {
 	dout("%s %p\n", __func__, m);
 	kvfree(m->front.iov_base);
+	kfree(m->data);
 	kmem_cache_free(ceph_msg_cache, m);
 }
 
 static void ceph_msg_release(struct kref *kref)
 {
 	struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
-	struct ceph_msg_data *data, *next;
+	int i;
 
 	dout("%s %p\n", __func__, m);
 	WARN_ON(!list_empty(&m->list_head));
@@ -3515,11 +3491,8 @@ static void ceph_msg_release(struct kref *kref)
 		m->middle = NULL;
 	}
 
-	list_for_each_entry_safe(data, next, &m->data, links) {
-		list_del_init(&data->links);
-		ceph_msg_data_destroy(data);
-	}
-	m->data_length = 0;
+	for (i = 0; i < m->num_data_items; i++)
+		ceph_msg_data_destroy(&m->data[i]);
 
 	if (m->pool)
 		ceph_msgpool_put(m->pool, m);

+ 18 - 9
net/ceph/msgpool.c

@@ -14,7 +14,8 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
 	struct ceph_msgpool *pool = arg;
 	struct ceph_msg *msg;
 
-	msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true);
+	msg = ceph_msg_new2(pool->type, pool->front_len, pool->max_data_items,
+			    gfp_mask, true);
 	if (!msg) {
 		dout("msgpool_alloc %s failed\n", pool->name);
 	} else {
@@ -35,11 +36,13 @@ static void msgpool_free(void *element, void *arg)
 }
 
 int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
-		      int front_len, int size, bool blocking, const char *name)
+		      int front_len, int max_data_items, int size,
+		      const char *name)
 {
 	dout("msgpool %s init\n", name);
 	pool->type = type;
 	pool->front_len = front_len;
+	pool->max_data_items = max_data_items;
 	pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool);
 	if (!pool->pool)
 		return -ENOMEM;
@@ -53,18 +56,21 @@ void ceph_msgpool_destroy(struct ceph_msgpool *pool)
 	mempool_destroy(pool->pool);
 }
 
-struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool,
-				  int front_len)
+struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len,
+				  int max_data_items)
 {
 	struct ceph_msg *msg;
 
-	if (front_len > pool->front_len) {
-		dout("msgpool_get %s need front %d, pool size is %d\n",
-		       pool->name, front_len, pool->front_len);
-		WARN_ON(1);
+	if (front_len > pool->front_len ||
+	    max_data_items > pool->max_data_items) {
+		pr_warn_ratelimited("%s need %d/%d, pool %s has %d/%d\n",
+		    __func__, front_len, max_data_items, pool->name,
+		    pool->front_len, pool->max_data_items);
+		WARN_ON_ONCE(1);
 
 		/* try to alloc a fresh message */
-		return ceph_msg_new(pool->type, front_len, GFP_NOFS, false);
+		return ceph_msg_new2(pool->type, front_len, max_data_items,
+				     GFP_NOFS, false);
 	}
 
 	msg = mempool_alloc(pool->pool, GFP_NOFS);
@@ -80,6 +86,9 @@ void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg)
 	msg->front.iov_len = pool->front_len;
 	msg->hdr.front_len = cpu_to_le32(pool->front_len);
 
+	msg->data_length = 0;
+	msg->num_data_items = 0;
+
 	kref_init(&msg->kref);  /* retake single ref */
 	mempool_free(msg, pool->pool);
 }

+ 270 - 93
net/ceph/osd_client.c

@@ -126,6 +126,9 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
 	osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
 }
 
+/*
+ * Consumes @pages if @own_pages is true.
+ */
 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
 			struct page **pages, u64 length, u32 alignment,
 			bool pages_from_pool, bool own_pages)
@@ -138,6 +141,9 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
 	osd_data->own_pages = own_pages;
 }
 
+/*
+ * Consumes a ref on @pagelist.
+ */
 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
 			struct ceph_pagelist *pagelist)
 {
@@ -362,6 +368,8 @@ static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
 		num_pages = calc_pages_for((u64)osd_data->alignment,
 						(u64)osd_data->length);
 		ceph_release_page_vector(osd_data->pages, num_pages);
+	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
+		ceph_pagelist_release(osd_data->pagelist);
 	}
 	ceph_osd_data_init(osd_data);
 }
@@ -402,6 +410,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
 	case CEPH_OSD_OP_LIST_WATCHERS:
 		ceph_osd_data_release(&op->list_watchers.response_data);
 		break;
+	case CEPH_OSD_OP_COPY_FROM:
+		ceph_osd_data_release(&op->copy_from.osd_data);
+		break;
 	default:
 		break;
 	}
@@ -606,12 +617,15 @@ static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
 	return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
 }
 
-int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
+				      int num_request_data_items,
+				      int num_reply_data_items)
 {
 	struct ceph_osd_client *osdc = req->r_osdc;
 	struct ceph_msg *msg;
 	int msg_size;
 
+	WARN_ON(req->r_request || req->r_reply);
 	WARN_ON(ceph_oid_empty(&req->r_base_oid));
 	WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
 
@@ -633,9 +647,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 	msg_size += 4 + 8; /* retry_attempt, features */
 
 	if (req->r_mempool)
-		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
+		msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
+				       num_request_data_items);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
+		msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
+				    num_request_data_items, gfp, true);
 	if (!msg)
 		return -ENOMEM;
 
@@ -648,9 +664,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 	msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
 
 	if (req->r_mempool)
-		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
+				       num_reply_data_items);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
+		msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
+				    num_reply_data_items, gfp, true);
 	if (!msg)
 		return -ENOMEM;
 
@@ -658,7 +676,6 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 
 	return 0;
 }
-EXPORT_SYMBOL(ceph_osdc_alloc_messages);
 
 static bool osd_req_opcode_valid(u16 opcode)
 {
@@ -671,6 +688,65 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
 	}
 }
 
+static void get_num_data_items(struct ceph_osd_request *req,
+			       int *num_request_data_items,
+			       int *num_reply_data_items)
+{
+	struct ceph_osd_req_op *op;
+
+	*num_request_data_items = 0;
+	*num_reply_data_items = 0;
+
+	for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
+		switch (op->op) {
+		/* request */
+		case CEPH_OSD_OP_WRITE:
+		case CEPH_OSD_OP_WRITEFULL:
+		case CEPH_OSD_OP_SETXATTR:
+		case CEPH_OSD_OP_CMPXATTR:
+		case CEPH_OSD_OP_NOTIFY_ACK:
+		case CEPH_OSD_OP_COPY_FROM:
+			*num_request_data_items += 1;
+			break;
+
+		/* reply */
+		case CEPH_OSD_OP_STAT:
+		case CEPH_OSD_OP_READ:
+		case CEPH_OSD_OP_LIST_WATCHERS:
+			*num_reply_data_items += 1;
+			break;
+
+		/* both */
+		case CEPH_OSD_OP_NOTIFY:
+			*num_request_data_items += 1;
+			*num_reply_data_items += 1;
+			break;
+		case CEPH_OSD_OP_CALL:
+			*num_request_data_items += 2;
+			*num_reply_data_items += 1;
+			break;
+
+		default:
+			WARN_ON(!osd_req_opcode_valid(op->op));
+			break;
+		}
+	}
+}
+
+/*
+ * oid, oloc and OSD op opcode(s) must be filled in before this function
+ * is called.
+ */
+int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
+{
+	int num_request_data_items, num_reply_data_items;
+
+	get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
+	return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
+					  num_reply_data_items);
+}
+EXPORT_SYMBOL(ceph_osdc_alloc_messages);
+
 /*
  * This is an osd op init function for opcodes that have no data or
  * other information associated with them.  It also serves as a
@@ -767,22 +843,19 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
 
 int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
-			u16 opcode, const char *class, const char *method)
+			const char *class, const char *method)
 {
-	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
-						      opcode, 0);
+	struct ceph_osd_req_op *op;
 	struct ceph_pagelist *pagelist;
 	size_t payload_len = 0;
 	size_t size;
 
-	BUG_ON(opcode != CEPH_OSD_OP_CALL);
+	op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
 
-	pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
+	pagelist = ceph_pagelist_alloc(GFP_NOFS);
 	if (!pagelist)
 		return -ENOMEM;
 
-	ceph_pagelist_init(pagelist);
-
 	op->cls.class_name = class;
 	size = strlen(class);
 	BUG_ON(size > (size_t) U8_MAX);
@@ -815,12 +888,10 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 
 	BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
 
-	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+	pagelist = ceph_pagelist_alloc(GFP_NOFS);
 	if (!pagelist)
 		return -ENOMEM;
 
-	ceph_pagelist_init(pagelist);
-
 	payload_len = strlen(name);
 	op->xattr.name_len = payload_len;
 	ceph_pagelist_append(pagelist, name, payload_len);
@@ -900,12 +971,6 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 			     const struct ceph_osd_req_op *src)
 {
-	if (WARN_ON(!osd_req_opcode_valid(src->op))) {
-		pr_err("unrecognized osd opcode %d\n", src->op);
-
-		return 0;
-	}
-
 	switch (src->op) {
 	case CEPH_OSD_OP_STAT:
 		break;
@@ -955,6 +1020,14 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
 	case CEPH_OSD_OP_CREATE:
 	case CEPH_OSD_OP_DELETE:
 		break;
+	case CEPH_OSD_OP_COPY_FROM:
+		dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
+		dst->copy_from.src_version =
+			cpu_to_le64(src->copy_from.src_version);
+		dst->copy_from.flags = src->copy_from.flags;
+		dst->copy_from.src_fadvise_flags =
+			cpu_to_le32(src->copy_from.src_fadvise_flags);
+		break;
 	default:
 		pr_err("unsupported osd opcode %s\n",
 			ceph_osd_op_name(src->op));
@@ -1038,7 +1111,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	if (flags & CEPH_OSD_FLAG_WRITE)
 		req->r_data_offset = off;
 
-	r = ceph_osdc_alloc_messages(req, GFP_NOFS);
+	if (num_ops > 1)
+		/*
+		 * This is a special case for ceph_writepages_start(), but it
+		 * also covers ceph_uninline_data().  If more multi-op request
+		 * use cases emerge, we will need a separate helper.
+		 */
+		r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0);
+	else
+		r = ceph_osdc_alloc_messages(req, GFP_NOFS);
 	if (r)
 		goto fail;
 
@@ -1845,48 +1926,55 @@ static bool should_plug_request(struct ceph_osd_request *req)
 	return true;
 }
 
-static void setup_request_data(struct ceph_osd_request *req,
-			       struct ceph_msg *msg)
+/*
+ * Keep get_num_data_items() in sync with this function.
+ */
+static void setup_request_data(struct ceph_osd_request *req)
 {
-	u32 data_len = 0;
-	int i;
+	struct ceph_msg *request_msg = req->r_request;
+	struct ceph_msg *reply_msg = req->r_reply;
+	struct ceph_osd_req_op *op;
 
-	if (!list_empty(&msg->data))
+	if (req->r_request->num_data_items || req->r_reply->num_data_items)
 		return;
 
-	WARN_ON(msg->data_length);
-	for (i = 0; i < req->r_num_ops; i++) {
-		struct ceph_osd_req_op *op = &req->r_ops[i];
-
+	WARN_ON(request_msg->data_length || reply_msg->data_length);
+	for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
 		switch (op->op) {
 		/* request */
 		case CEPH_OSD_OP_WRITE:
 		case CEPH_OSD_OP_WRITEFULL:
 			WARN_ON(op->indata_len != op->extent.length);
-			ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->extent.osd_data);
 			break;
 		case CEPH_OSD_OP_SETXATTR:
 		case CEPH_OSD_OP_CMPXATTR:
 			WARN_ON(op->indata_len != op->xattr.name_len +
 						  op->xattr.value_len);
-			ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->xattr.osd_data);
 			break;
 		case CEPH_OSD_OP_NOTIFY_ACK:
-			ceph_osdc_msg_data_add(msg,
+			ceph_osdc_msg_data_add(request_msg,
 					       &op->notify_ack.request_data);
 			break;
+		case CEPH_OSD_OP_COPY_FROM:
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->copy_from.osd_data);
+			break;
 
 		/* reply */
 		case CEPH_OSD_OP_STAT:
-			ceph_osdc_msg_data_add(req->r_reply,
+			ceph_osdc_msg_data_add(reply_msg,
 					       &op->raw_data_in);
 			break;
 		case CEPH_OSD_OP_READ:
-			ceph_osdc_msg_data_add(req->r_reply,
+			ceph_osdc_msg_data_add(reply_msg,
 					       &op->extent.osd_data);
 			break;
 		case CEPH_OSD_OP_LIST_WATCHERS:
-			ceph_osdc_msg_data_add(req->r_reply,
+			ceph_osdc_msg_data_add(reply_msg,
 					       &op->list_watchers.response_data);
 			break;
 
@@ -1895,25 +1983,23 @@ static void setup_request_data(struct ceph_osd_request *req,
 			WARN_ON(op->indata_len != op->cls.class_len +
 						  op->cls.method_len +
 						  op->cls.indata_len);
-			ceph_osdc_msg_data_add(msg, &op->cls.request_info);
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->cls.request_info);
 			/* optional, can be NONE */
-			ceph_osdc_msg_data_add(msg, &op->cls.request_data);
+			ceph_osdc_msg_data_add(request_msg,
+					       &op->cls.request_data);
 			/* optional, can be NONE */
-			ceph_osdc_msg_data_add(req->r_reply,
+			ceph_osdc_msg_data_add(reply_msg,
 					       &op->cls.response_data);
 			break;
 		case CEPH_OSD_OP_NOTIFY:
-			ceph_osdc_msg_data_add(msg,
+			ceph_osdc_msg_data_add(request_msg,
 					       &op->notify.request_data);
-			ceph_osdc_msg_data_add(req->r_reply,
+			ceph_osdc_msg_data_add(reply_msg,
 					       &op->notify.response_data);
 			break;
 		}
-
-		data_len += op->indata_len;
 	}
-
-	WARN_ON(data_len != msg->data_length);
 }
 
 static void encode_pgid(void **p, const struct ceph_pg *pgid)
@@ -1961,7 +2047,7 @@ static void encode_request_partial(struct ceph_osd_request *req,
 			req->r_data_offset || req->r_snapc);
 	}
 
-	setup_request_data(req, msg);
+	setup_request_data(req);
 
 	encode_spgid(&p, &req->r_t.spgid); /* actual spg */
 	ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
@@ -3001,11 +3087,21 @@ static void linger_submit(struct ceph_osd_linger_request *lreq)
 	struct ceph_osd_client *osdc = lreq->osdc;
 	struct ceph_osd *osd;
 
+	down_write(&osdc->lock);
+	linger_register(lreq);
+	if (lreq->is_watch) {
+		lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id;
+		lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id;
+	} else {
+		lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id;
+	}
+
 	calc_target(osdc, &lreq->t, NULL, false);
 	osd = lookup_create_osd(osdc, lreq->t.osd, true);
 	link_linger(osd, lreq);
 
 	send_linger(lreq);
+	up_write(&osdc->lock);
 }
 
 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
@@ -4318,9 +4414,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
 			     lreq->notify_id, notify_id);
 		} else if (!completion_done(&lreq->notify_finish_wait)) {
 			struct ceph_msg_data *data =
-			    list_first_entry_or_null(&msg->data,
-						     struct ceph_msg_data,
-						     links);
+			    msg->num_data_items ? &msg->data[0] : NULL;
 
 			if (data) {
 				if (lreq->preply_pages) {
@@ -4476,6 +4570,23 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq)
 
 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
+	return req;
+}
+
+static struct ceph_osd_request *
+alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode)
+{
+	struct ceph_osd_request *req;
+
+	req = alloc_linger_request(lreq);
+	if (!req)
+		return NULL;
+
+	/*
+	 * Pass 0 for cookie because we don't know it yet, it will be
+	 * filled in by linger_submit().
+	 */
+	osd_req_op_watch_init(req, 0, 0, watch_opcode);
 
 	if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
 		ceph_osdc_put_request(req);
@@ -4514,27 +4625,19 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
 	lreq->t.flags = CEPH_OSD_FLAG_WRITE;
 	ktime_get_real_ts64(&lreq->mtime);
 
-	lreq->reg_req = alloc_linger_request(lreq);
+	lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH);
 	if (!lreq->reg_req) {
 		ret = -ENOMEM;
 		goto err_put_lreq;
 	}
 
-	lreq->ping_req = alloc_linger_request(lreq);
+	lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING);
 	if (!lreq->ping_req) {
 		ret = -ENOMEM;
 		goto err_put_lreq;
 	}
 
-	down_write(&osdc->lock);
-	linger_register(lreq); /* before osd_req_op_* */
-	osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
-			      CEPH_OSD_WATCH_OP_WATCH);
-	osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
-			      CEPH_OSD_WATCH_OP_PING);
 	linger_submit(lreq);
-	up_write(&osdc->lock);
-
 	ret = linger_reg_commit_wait(lreq);
 	if (ret) {
 		linger_cancel(lreq);
@@ -4599,11 +4702,10 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
 
 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
 
-	pl = kmalloc(sizeof(*pl), GFP_NOIO);
+	pl = ceph_pagelist_alloc(GFP_NOIO);
 	if (!pl)
 		return -ENOMEM;
 
-	ceph_pagelist_init(pl);
 	ret = ceph_pagelist_encode_64(pl, notify_id);
 	ret |= ceph_pagelist_encode_64(pl, cookie);
 	if (payload) {
@@ -4641,12 +4743,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
 	ceph_oloc_copy(&req->r_base_oloc, oloc);
 	req->r_flags = CEPH_OSD_FLAG_READ;
 
-	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
+					 payload_len);
 	if (ret)
 		goto out_put_req;
 
-	ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
-					 payload_len);
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
 	if (ret)
 		goto out_put_req;
 
@@ -4670,11 +4772,10 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
 	op->notify.cookie = cookie;
 
-	pl = kmalloc(sizeof(*pl), GFP_NOIO);
+	pl = ceph_pagelist_alloc(GFP_NOIO);
 	if (!pl)
 		return -ENOMEM;
 
-	ceph_pagelist_init(pl);
 	ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
 	ret |= ceph_pagelist_encode_32(pl, timeout);
 	ret |= ceph_pagelist_encode_32(pl, payload_len);
@@ -4733,29 +4834,30 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
 		goto out_put_lreq;
 	}
 
+	/*
+	 * Pass 0 for cookie because we don't know it yet, it will be
+	 * filled in by linger_submit().
+	 */
+	ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout,
+				     payload, payload_len);
+	if (ret)
+		goto out_put_lreq;
+
 	/* for notify_id */
 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
 	if (IS_ERR(pages)) {
 		ret = PTR_ERR(pages);
 		goto out_put_lreq;
 	}
-
-	down_write(&osdc->lock);
-	linger_register(lreq); /* before osd_req_op_* */
-	ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
-				     timeout, payload, payload_len);
-	if (ret) {
-		linger_unregister(lreq);
-		up_write(&osdc->lock);
-		ceph_release_page_vector(pages, 1);
-		goto out_put_lreq;
-	}
 	ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
 						 response_data),
 				 pages, PAGE_SIZE, 0, false, true);
-	linger_submit(lreq);
-	up_write(&osdc->lock);
 
+	ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO);
+	if (ret)
+		goto out_put_lreq;
+
+	linger_submit(lreq);
 	ret = linger_reg_commit_wait(lreq);
 	if (!ret)
 		ret = linger_notify_finish_wait(lreq);
@@ -4881,10 +4983,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
 	ceph_oloc_copy(&req->r_base_oloc, oloc);
 	req->r_flags = CEPH_OSD_FLAG_READ;
 
-	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
-	if (ret)
-		goto out_put_req;
-
 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
 	if (IS_ERR(pages)) {
 		ret = PTR_ERR(pages);
@@ -4896,6 +4994,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
 						 response_data),
 				 pages, PAGE_SIZE, 0, false, true);
 
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_put_req;
+
 	ceph_osdc_start_request(osdc, req, false);
 	ret = ceph_osdc_wait_request(osdc, req);
 	if (ret >= 0) {
@@ -4958,11 +5060,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
 	ceph_oloc_copy(&req->r_base_oloc, oloc);
 	req->r_flags = flags;
 
-	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
-	if (ret)
-		goto out_put_req;
-
-	ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
+	ret = osd_req_op_cls_init(req, 0, class, method);
 	if (ret)
 		goto out_put_req;
 
@@ -4973,6 +5071,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
 		osd_req_op_cls_response_data_pages(req, 0, &resp_page,
 						   *resp_len, 0, false, false);
 
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_put_req;
+
 	ceph_osdc_start_request(osdc, req, false);
 	ret = ceph_osdc_wait_request(osdc, req);
 	if (ret >= 0) {
@@ -5021,11 +5123,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 		goto out_map;
 
 	err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
-				PAGE_SIZE, 10, true, "osd_op");
+				PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
 	if (err < 0)
 		goto out_mempool;
 	err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
-				PAGE_SIZE, 10, true, "osd_op_reply");
+				PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
+				"osd_op_reply");
 	if (err < 0)
 		goto out_msgpool;
 
@@ -5168,6 +5271,80 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 }
 EXPORT_SYMBOL(ceph_osdc_writepages);
 
+static int osd_req_op_copy_from_init(struct ceph_osd_request *req,
+				     u64 src_snapid, u64 src_version,
+				     struct ceph_object_id *src_oid,
+				     struct ceph_object_locator *src_oloc,
+				     u32 src_fadvise_flags,
+				     u32 dst_fadvise_flags,
+				     u8 copy_from_flags)
+{
+	struct ceph_osd_req_op *op;
+	struct page **pages;
+	void *p, *end;
+
+	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
+	if (IS_ERR(pages))
+		return PTR_ERR(pages);
+
+	op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags);
+	op->copy_from.snapid = src_snapid;
+	op->copy_from.src_version = src_version;
+	op->copy_from.flags = copy_from_flags;
+	op->copy_from.src_fadvise_flags = src_fadvise_flags;
+
+	p = page_address(pages[0]);
+	end = p + PAGE_SIZE;
+	ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
+	encode_oloc(&p, end, src_oloc);
+	op->indata_len = PAGE_SIZE - (end - p);
+
+	ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
+				 op->indata_len, 0, false, true);
+	return 0;
+}
+
+int ceph_osdc_copy_from(struct ceph_osd_client *osdc,
+			u64 src_snapid, u64 src_version,
+			struct ceph_object_id *src_oid,
+			struct ceph_object_locator *src_oloc,
+			u32 src_fadvise_flags,
+			struct ceph_object_id *dst_oid,
+			struct ceph_object_locator *dst_oloc,
+			u32 dst_fadvise_flags,
+			u8 copy_from_flags)
+{
+	struct ceph_osd_request *req;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
+	if (!req)
+		return -ENOMEM;
+
+	req->r_flags = CEPH_OSD_FLAG_WRITE;
+
+	ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc);
+	ceph_oid_copy(&req->r_t.base_oid, dst_oid);
+
+	ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid,
+					src_oloc, src_fadvise_flags,
+					dst_fadvise_flags, copy_from_flags);
+	if (ret)
+		goto out;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
+	if (ret)
+		goto out;
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+
+out:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_copy_from);
+
 int __init ceph_osdc_setup(void)
 {
 	size_t size = sizeof(struct ceph_osd_request) +
@@ -5295,7 +5472,7 @@ static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
 	u32 front_len = le32_to_cpu(hdr->front_len);
 	u32 data_len = le32_to_cpu(hdr->data_len);
 
-	m = ceph_msg_new(type, front_len, GFP_NOIO, false);
+	m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
 	if (!m)
 		return NULL;
 

+ 20 - 0
net/ceph/pagelist.c

@@ -6,6 +6,26 @@
 #include <linux/highmem.h>
 #include <linux/ceph/pagelist.h>
 
+struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags)
+{
+	struct ceph_pagelist *pl;
+
+	pl = kmalloc(sizeof(*pl), gfp_flags);
+	if (!pl)
+		return NULL;
+
+	INIT_LIST_HEAD(&pl->head);
+	pl->mapped_tail = NULL;
+	pl->length = 0;
+	pl->room = 0;
+	INIT_LIST_HEAD(&pl->free_list);
+	pl->num_pages_free = 0;
+	refcount_set(&pl->refcnt, 1);
+
+	return pl;
+}
+EXPORT_SYMBOL(ceph_pagelist_alloc);
+
 static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
 {
 	if (pl->mapped_tail) {