Browse Source

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph fixes from Sage Weil:
 "Yes, this is a much larger pull than I would like after -rc1.  There
  are a few things included:

   - a few fixes for leaks and incorrect assertions
   - a few patches fixing behavior when mapped images are resized
   - handling for cloned/layered images that are flattened out from
     underneath the client

  The last bit was non-trivial, and there is some code movement and
  associated cleanup mixed in.  This was ready and was meant to go in
  last week but I missed the boat on Friday.  My only excuse is that I
  was waiting for an all clear from the testing and there were many
  other shiny things to distract me.

  Strictly speaking, handling the flatten case isn't a regression and
  could wait, so if you like we can try to pull the series apart, but
  Alex and I would much prefer to have it all in as it is a case real
  users will hit with 3.10."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (33 commits)
  rbd: re-submit flattened write request (part 2)
  rbd: re-submit write request for flattened clone
  rbd: re-submit read request for flattened clone
  rbd: detect when clone image is flattened
  rbd: reference count parent requests
  rbd: define parent image request routines
  rbd: define rbd_dev_unparent()
  rbd: don't release write request until necessary
  rbd: get parent info on refresh
  rbd: ignore zero-overlap parent
  rbd: support reading parent page data for writes
  rbd: fix parent request size assumption
  libceph: init sent and completed when starting
  rbd: kill rbd_img_request_get()
  rbd: only set up watch for mapped images
  rbd: set mapping read-only flag in rbd_add()
  rbd: support reading parent page data
  rbd: fix an incorrect assertion condition
  rbd: define rbd_dev_v2_header_info()
  rbd: get rid of trivial v1 header wrappers
  ...
Linus Torvalds 12 years ago
parent
commit
109c3c0292
2 changed files with 553 additions and 387 deletions
  1. 549 386
      drivers/block/rbd.c
  2. 4 1
      net/ceph/osd_client.c

+ 549 - 386
drivers/block/rbd.c

@@ -55,6 +55,39 @@
 #define	SECTOR_SHIFT	9
 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
 
+/*
+ * Increment the given counter and return its updated value.
+ * If the counter is already 0 it will not be incremented.
+ * If the counter is already at its maximum value returns
+ * -EINVAL without updating it.
+ */
+static int atomic_inc_return_safe(atomic_t *v)
+{
+	unsigned int counter;
+
+	counter = (unsigned int)__atomic_add_unless(v, 1, 0);
+	if (counter <= (unsigned int)INT_MAX)
+		return (int)counter;
+
+	atomic_dec(v);
+
+	return -EINVAL;
+}
+
+/* Decrement the counter.  Return the resulting value, or -EINVAL */
+static int atomic_dec_return_safe(atomic_t *v)
+{
+	int counter;
+
+	counter = atomic_dec_return(v);
+	if (counter >= 0)
+		return counter;
+
+	atomic_inc(v);
+
+	return -EINVAL;
+}
+
 #define RBD_DRV_NAME "rbd"
 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
 
@@ -100,21 +133,20 @@
  * block device image metadata (in-memory version)
  */
 struct rbd_image_header {
-	/* These four fields never change for a given rbd image */
+	/* These six fields never change for a given rbd image */
 	char *object_prefix;
-	u64 features;
 	__u8 obj_order;
 	__u8 crypt_type;
 	__u8 comp_type;
+	u64 stripe_unit;
+	u64 stripe_count;
+	u64 features;		/* Might be changeable someday? */
 
 	/* The remaining fields need to be updated occasionally */
 	u64 image_size;
 	struct ceph_snap_context *snapc;
-	char *snap_names;
-	u64 *snap_sizes;
-
-	u64 stripe_unit;
-	u64 stripe_count;
+	char *snap_names;	/* format 1 only */
+	u64 *snap_sizes;	/* format 1 only */
 };
 
 /*
@@ -225,6 +257,7 @@ struct rbd_obj_request {
 		};
 	};
 	struct page		**copyup_pages;
+	u32			copyup_page_count;
 
 	struct ceph_osd_request	*osd_req;
 
@@ -257,6 +290,7 @@ struct rbd_img_request {
 		struct rbd_obj_request	*obj_request;	/* obj req initiator */
 	};
 	struct page		**copyup_pages;
+	u32			copyup_page_count;
 	spinlock_t		completion_lock;/* protects next_completion */
 	u32			next_completion;
 	rbd_img_callback_t	callback;
@@ -311,6 +345,7 @@ struct rbd_device {
 
 	struct rbd_spec		*parent_spec;
 	u64			parent_overlap;
+	atomic_t		parent_ref;
 	struct rbd_device	*parent;
 
 	/* protects updating the header */
@@ -359,7 +394,8 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 		       size_t count);
 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
 			  size_t count);
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
+static void rbd_spec_put(struct rbd_spec *spec);
 
 static struct bus_attribute rbd_bus_attrs[] = {
 	__ATTR(add, S_IWUSR, NULL, rbd_add),
@@ -426,7 +462,8 @@ static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 
 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
-static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 					u64 snap_id);
 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
@@ -726,88 +763,123 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 }
 
 /*
- * Create a new header structure, translate header format from the on-disk
- * header.
+ * Fill an rbd image header with information from the given format 1
+ * on-disk header.
  */
-static int rbd_header_from_disk(struct rbd_image_header *header,
+static int rbd_header_from_disk(struct rbd_device *rbd_dev,
 				 struct rbd_image_header_ondisk *ondisk)
 {
+	struct rbd_image_header *header = &rbd_dev->header;
+	bool first_time = header->object_prefix == NULL;
+	struct ceph_snap_context *snapc;
+	char *object_prefix = NULL;
+	char *snap_names = NULL;
+	u64 *snap_sizes = NULL;
 	u32 snap_count;
-	size_t len;
 	size_t size;
+	int ret = -ENOMEM;
 	u32 i;
 
-	memset(header, 0, sizeof (*header));
+	/* Allocate this now to avoid having to handle failure below */
 
-	snap_count = le32_to_cpu(ondisk->snap_count);
+	if (first_time) {
+		size_t len;
 
-	len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
-	header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
-	if (!header->object_prefix)
-		return -ENOMEM;
-	memcpy(header->object_prefix, ondisk->object_prefix, len);
-	header->object_prefix[len] = '\0';
+		len = strnlen(ondisk->object_prefix,
+				sizeof (ondisk->object_prefix));
+		object_prefix = kmalloc(len + 1, GFP_KERNEL);
+		if (!object_prefix)
+			return -ENOMEM;
+		memcpy(object_prefix, ondisk->object_prefix, len);
+		object_prefix[len] = '\0';
+	}
 
+	/* Allocate the snapshot context and fill it in */
+
+	snap_count = le32_to_cpu(ondisk->snap_count);
+	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
+	if (!snapc)
+		goto out_err;
+	snapc->seq = le64_to_cpu(ondisk->snap_seq);
 	if (snap_count) {
+		struct rbd_image_snap_ondisk *snaps;
 		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 
-		/* Save a copy of the snapshot names */
+		/* We'll keep a copy of the snapshot names... */
 
-		if (snap_names_len > (u64) SIZE_MAX)
-			return -EIO;
-		header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
-		if (!header->snap_names)
+		if (snap_names_len > (u64)SIZE_MAX)
+			goto out_2big;
+		snap_names = kmalloc(snap_names_len, GFP_KERNEL);
+		if (!snap_names)
 			goto out_err;
+
+		/* ...as well as the array of their sizes. */
+
+		size = snap_count * sizeof (*header->snap_sizes);
+		snap_sizes = kmalloc(size, GFP_KERNEL);
+		if (!snap_sizes)
+			goto out_err;
+
 		/*
-		 * Note that rbd_dev_v1_header_read() guarantees
-		 * the ondisk buffer we're working with has
+		 * Copy the names, and fill in each snapshot's id
+		 * and size.
+		 *
+		 * Note that rbd_dev_v1_header_info() guarantees the
+		 * ondisk buffer we're working with has
 		 * snap_names_len bytes beyond the end of the
 		 * snapshot id array, this memcpy() is safe.
 		 */
-		memcpy(header->snap_names, &ondisk->snaps[snap_count],
-			snap_names_len);
+		memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
+		snaps = ondisk->snaps;
+		for (i = 0; i < snap_count; i++) {
+			snapc->snaps[i] = le64_to_cpu(snaps[i].id);
+			snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
+		}
+	}
 
-		/* Record each snapshot's size */
+	/* We won't fail any more, fill in the header */
 
-		size = snap_count * sizeof (*header->snap_sizes);
-		header->snap_sizes = kmalloc(size, GFP_KERNEL);
-		if (!header->snap_sizes)
-			goto out_err;
-		for (i = 0; i < snap_count; i++)
-			header->snap_sizes[i] =
-				le64_to_cpu(ondisk->snaps[i].image_size);
+	down_write(&rbd_dev->header_rwsem);
+	if (first_time) {
+		header->object_prefix = object_prefix;
+		header->obj_order = ondisk->options.order;
+		header->crypt_type = ondisk->options.crypt_type;
+		header->comp_type = ondisk->options.comp_type;
+		/* The rest aren't used for format 1 images */
+		header->stripe_unit = 0;
+		header->stripe_count = 0;
+		header->features = 0;
 	} else {
-		header->snap_names = NULL;
-		header->snap_sizes = NULL;
+		ceph_put_snap_context(header->snapc);
+		kfree(header->snap_names);
+		kfree(header->snap_sizes);
 	}
 
-	header->features = 0;	/* No features support in v1 images */
-	header->obj_order = ondisk->options.order;
-	header->crypt_type = ondisk->options.crypt_type;
-	header->comp_type = ondisk->options.comp_type;
-
-	/* Allocate and fill in the snapshot context */
+	/* The remaining fields always get updated (when we refresh) */
 
 	header->image_size = le64_to_cpu(ondisk->image_size);
+	header->snapc = snapc;
+	header->snap_names = snap_names;
+	header->snap_sizes = snap_sizes;
 
-	header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
-	if (!header->snapc)
-		goto out_err;
-	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
-	for (i = 0; i < snap_count; i++)
-		header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
+	/* Make sure mapping size is consistent with header info */
 
-	return 0;
+	if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
+		if (rbd_dev->mapping.size != header->image_size)
+			rbd_dev->mapping.size = header->image_size;
+
+	up_write(&rbd_dev->header_rwsem);
 
+	return 0;
+out_2big:
+	ret = -EIO;
 out_err:
-	kfree(header->snap_sizes);
-	header->snap_sizes = NULL;
-	kfree(header->snap_names);
-	header->snap_names = NULL;
-	kfree(header->object_prefix);
-	header->object_prefix = NULL;
+	kfree(snap_sizes);
+	kfree(snap_names);
+	ceph_put_snap_context(snapc);
+	kfree(object_prefix);
 
-	return -ENOMEM;
+	return ret;
 }
 
 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
@@ -934,20 +1006,11 @@ static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
 
 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
 {
-	const char *snap_name = rbd_dev->spec->snap_name;
-	u64 snap_id;
+	u64 snap_id = rbd_dev->spec->snap_id;
 	u64 size = 0;
 	u64 features = 0;
 	int ret;
 
-	if (strcmp(snap_name, RBD_SNAP_HEAD_NAME)) {
-		snap_id = rbd_snap_id_by_name(rbd_dev, snap_name);
-		if (snap_id == CEPH_NOSNAP)
-			return -ENOENT;
-	} else {
-		snap_id = CEPH_NOSNAP;
-	}
-
 	ret = rbd_snap_size(rbd_dev, snap_id, &size);
 	if (ret)
 		return ret;
@@ -958,11 +1021,6 @@ static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
 	rbd_dev->mapping.size = size;
 	rbd_dev->mapping.features = features;
 
-	/* If we are mapping a snapshot it must be marked read-only */
-
-	if (snap_id != CEPH_NOSNAP)
-		rbd_dev->mapping.read_only = true;
-
 	return 0;
 }
 
@@ -970,14 +1028,6 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
 {
 	rbd_dev->mapping.size = 0;
 	rbd_dev->mapping.features = 0;
-	rbd_dev->mapping.read_only = true;
-}
-
-static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
-{
-	rbd_dev->mapping.size = 0;
-	rbd_dev->mapping.features = 0;
-	rbd_dev->mapping.read_only = true;
 }
 
 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
@@ -1342,20 +1392,18 @@ static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
 	kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
-static void rbd_img_request_get(struct rbd_img_request *img_request)
-{
-	dout("%s: img %p (was %d)\n", __func__, img_request,
-		atomic_read(&img_request->kref.refcount));
-	kref_get(&img_request->kref);
-}
-
+static bool img_request_child_test(struct rbd_img_request *img_request);
+static void rbd_parent_request_destroy(struct kref *kref);
 static void rbd_img_request_destroy(struct kref *kref);
 static void rbd_img_request_put(struct rbd_img_request *img_request)
 {
 	rbd_assert(img_request != NULL);
 	dout("%s: img %p (was %d)\n", __func__, img_request,
 		atomic_read(&img_request->kref.refcount));
-	kref_put(&img_request->kref, rbd_img_request_destroy);
+	if (img_request_child_test(img_request))
+		kref_put(&img_request->kref, rbd_parent_request_destroy);
+	else
+		kref_put(&img_request->kref, rbd_img_request_destroy);
 }
 
 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
@@ -1472,6 +1520,12 @@ static void img_request_child_set(struct rbd_img_request *img_request)
 	smp_mb();
 }
 
+static void img_request_child_clear(struct rbd_img_request *img_request)
+{
+	clear_bit(IMG_REQ_CHILD, &img_request->flags);
+	smp_mb();
+}
+
 static bool img_request_child_test(struct rbd_img_request *img_request)
 {
 	smp_mb();
@@ -1484,6 +1538,12 @@ static void img_request_layered_set(struct rbd_img_request *img_request)
 	smp_mb();
 }
 
+static void img_request_layered_clear(struct rbd_img_request *img_request)
+{
+	clear_bit(IMG_REQ_LAYERED, &img_request->flags);
+	smp_mb();
+}
+
 static bool img_request_layered_test(struct rbd_img_request *img_request)
 {
 	smp_mb();
@@ -1827,6 +1887,74 @@ static void rbd_obj_request_destroy(struct kref *kref)
 	kmem_cache_free(rbd_obj_request_cache, obj_request);
 }
 
+/* It's OK to call this for a device with no parent */
+
+static void rbd_spec_put(struct rbd_spec *spec);
+static void rbd_dev_unparent(struct rbd_device *rbd_dev)
+{
+	rbd_dev_remove_parent(rbd_dev);
+	rbd_spec_put(rbd_dev->parent_spec);
+	rbd_dev->parent_spec = NULL;
+	rbd_dev->parent_overlap = 0;
+}
+
+/*
+ * Parent image reference counting is used to determine when an
+ * image's parent fields can be safely torn down--after there are no
+ * more in-flight requests to the parent image.  When the last
+ * reference is dropped, cleaning them up is safe.
+ */
+static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
+{
+	int counter;
+
+	if (!rbd_dev->parent_spec)
+		return;
+
+	counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
+	if (counter > 0)
+		return;
+
+	/* Last reference; clean up parent data structures */
+
+	if (!counter)
+		rbd_dev_unparent(rbd_dev);
+	else
+		rbd_warn(rbd_dev, "parent reference underflow\n");
+}
+
+/*
+ * If an image has a non-zero parent overlap, get a reference to its
+ * parent.
+ *
+ * We must get the reference before checking for the overlap to
+ * coordinate properly with zeroing the parent overlap in
+ * rbd_dev_v2_parent_info() when an image gets flattened.  We
+ * drop it again if there is no overlap.
+ *
+ * Returns true if the rbd device has a parent with a non-zero
+ * overlap and a reference for it was successfully taken, or
+ * false otherwise.
+ */
+static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
+{
+	int counter;
+
+	if (!rbd_dev->parent_spec)
+		return false;
+
+	counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
+	if (counter > 0 && rbd_dev->parent_overlap)
+		return true;
+
+	/* Image was flattened, but parent is not yet torn down */
+
+	if (counter < 0)
+		rbd_warn(rbd_dev, "parent reference overflow\n");
+
+	return false;
+}
+
 /*
  * Caller is responsible for filling in the list of object requests
  * that comprises the image request, and the Linux request pointer
@@ -1835,8 +1963,7 @@ static void rbd_obj_request_destroy(struct kref *kref)
 static struct rbd_img_request *rbd_img_request_create(
 					struct rbd_device *rbd_dev,
 					u64 offset, u64 length,
-					bool write_request,
-					bool child_request)
+					bool write_request)
 {
 	struct rbd_img_request *img_request;
 
@@ -1861,9 +1988,7 @@ static struct rbd_img_request *rbd_img_request_create(
 	} else {
 		img_request->snap_id = rbd_dev->spec->snap_id;
 	}
-	if (child_request)
-		img_request_child_set(img_request);
-	if (rbd_dev->parent_spec)
+	if (rbd_dev_parent_get(rbd_dev))
 		img_request_layered_set(img_request);
 	spin_lock_init(&img_request->completion_lock);
 	img_request->next_completion = 0;
@@ -1873,9 +1998,6 @@ static struct rbd_img_request *rbd_img_request_create(
 	INIT_LIST_HEAD(&img_request->obj_requests);
 	kref_init(&img_request->kref);
 
-	rbd_img_request_get(img_request);	/* Avoid a warning */
-	rbd_img_request_put(img_request);	/* TEMPORARY */
-
 	dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
 		write_request ? "write" : "read", offset, length,
 		img_request);
@@ -1897,15 +2019,54 @@ static void rbd_img_request_destroy(struct kref *kref)
 		rbd_img_obj_request_del(img_request, obj_request);
 	rbd_assert(img_request->obj_request_count == 0);
 
+	if (img_request_layered_test(img_request)) {
+		img_request_layered_clear(img_request);
+		rbd_dev_parent_put(img_request->rbd_dev);
+	}
+
 	if (img_request_write_test(img_request))
 		ceph_put_snap_context(img_request->snapc);
 
-	if (img_request_child_test(img_request))
-		rbd_obj_request_put(img_request->obj_request);
-
 	kmem_cache_free(rbd_img_request_cache, img_request);
 }
 
+static struct rbd_img_request *rbd_parent_request_create(
+					struct rbd_obj_request *obj_request,
+					u64 img_offset, u64 length)
+{
+	struct rbd_img_request *parent_request;
+	struct rbd_device *rbd_dev;
+
+	rbd_assert(obj_request->img_request);
+	rbd_dev = obj_request->img_request->rbd_dev;
+
+	parent_request = rbd_img_request_create(rbd_dev->parent,
+						img_offset, length, false);
+	if (!parent_request)
+		return NULL;
+
+	img_request_child_set(parent_request);
+	rbd_obj_request_get(obj_request);
+	parent_request->obj_request = obj_request;
+
+	return parent_request;
+}
+
+static void rbd_parent_request_destroy(struct kref *kref)
+{
+	struct rbd_img_request *parent_request;
+	struct rbd_obj_request *orig_request;
+
+	parent_request = container_of(kref, struct rbd_img_request, kref);
+	orig_request = parent_request->obj_request;
+
+	parent_request->obj_request = NULL;
+	rbd_obj_request_put(orig_request);
+	img_request_child_clear(parent_request);
+
+	rbd_img_request_destroy(kref);
+}
+
 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
 {
 	struct rbd_img_request *img_request;
@@ -2114,7 +2275,7 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
 {
 	struct rbd_img_request *img_request;
 	struct rbd_device *rbd_dev;
-	u64 length;
+	struct page **pages;
 	u32 page_count;
 
 	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
@@ -2124,12 +2285,14 @@ rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
 
 	rbd_dev = img_request->rbd_dev;
 	rbd_assert(rbd_dev);
-	length = (u64)1 << rbd_dev->header.obj_order;
-	page_count = (u32)calc_pages_for(0, length);
 
-	rbd_assert(obj_request->copyup_pages);
-	ceph_release_page_vector(obj_request->copyup_pages, page_count);
+	pages = obj_request->copyup_pages;
+	rbd_assert(pages != NULL);
 	obj_request->copyup_pages = NULL;
+	page_count = obj_request->copyup_page_count;
+	rbd_assert(page_count);
+	obj_request->copyup_page_count = 0;
+	ceph_release_page_vector(pages, page_count);
 
 	/*
 	 * We want the transfer count to reflect the size of the
@@ -2153,9 +2316,11 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
 	struct ceph_osd_client *osdc;
 	struct rbd_device *rbd_dev;
 	struct page **pages;
-	int result;
-	u64 obj_size;
-	u64 xferred;
+	u32 page_count;
+	int img_result;
+	u64 parent_length;
+	u64 offset;
+	u64 length;
 
 	rbd_assert(img_request_child_test(img_request));
 
@@ -2164,46 +2329,74 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
 	pages = img_request->copyup_pages;
 	rbd_assert(pages != NULL);
 	img_request->copyup_pages = NULL;
+	page_count = img_request->copyup_page_count;
+	rbd_assert(page_count);
+	img_request->copyup_page_count = 0;
 
 	orig_request = img_request->obj_request;
 	rbd_assert(orig_request != NULL);
-	rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
-	result = img_request->result;
-	obj_size = img_request->length;
-	xferred = img_request->xferred;
+	rbd_assert(obj_request_type_valid(orig_request->type));
+	img_result = img_request->result;
+	parent_length = img_request->length;
+	rbd_assert(parent_length == img_request->xferred);
+	rbd_img_request_put(img_request);
 
-	rbd_dev = img_request->rbd_dev;
+	rbd_assert(orig_request->img_request);
+	rbd_dev = orig_request->img_request->rbd_dev;
 	rbd_assert(rbd_dev);
-	rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
 
-	rbd_img_request_put(img_request);
+	/*
+	 * If the overlap has become 0 (most likely because the
+	 * image has been flattened) we need to free the pages
+	 * and re-submit the original write request.
+	 */
+	if (!rbd_dev->parent_overlap) {
+		struct ceph_osd_client *osdc;
 
-	if (result)
-		goto out_err;
+		ceph_release_page_vector(pages, page_count);
+		osdc = &rbd_dev->rbd_client->client->osdc;
+		img_result = rbd_obj_request_submit(osdc, orig_request);
+		if (!img_result)
+			return;
+	}
 
-	/* Allocate the new copyup osd request for the original request */
+	if (img_result)
+		goto out_err;
 
-	result = -ENOMEM;
-	rbd_assert(!orig_request->osd_req);
+	/*
+	 * The original osd request is of no use to use any more.
+	 * We need a new one that can hold the two ops in a copyup
+	 * request.  Allocate the new copyup osd request for the
+	 * original request, and release the old one.
+	 */
+	img_result = -ENOMEM;
 	osd_req = rbd_osd_req_create_copyup(orig_request);
 	if (!osd_req)
 		goto out_err;
+	rbd_osd_req_destroy(orig_request->osd_req);
 	orig_request->osd_req = osd_req;
 	orig_request->copyup_pages = pages;
+	orig_request->copyup_page_count = page_count;
 
 	/* Initialize the copyup op */
 
 	osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
-	osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
+	osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
 						false, false);
 
 	/* Then the original write request op */
 
+	offset = orig_request->offset;
+	length = orig_request->length;
 	osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
-					orig_request->offset,
-					orig_request->length, 0, 0);
-	osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
-					orig_request->length);
+					offset, length, 0, 0);
+	if (orig_request->type == OBJ_REQUEST_BIO)
+		osd_req_op_extent_osd_data_bio(osd_req, 1,
+					orig_request->bio_list, length);
+	else
+		osd_req_op_extent_osd_data_pages(osd_req, 1,
+					orig_request->pages, length,
+					offset & ~PAGE_MASK, false, false);
 
 	rbd_osd_req_format_write(orig_request);
 
@@ -2211,13 +2404,13 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
 
 	orig_request->callback = rbd_img_obj_copyup_callback;
 	osdc = &rbd_dev->rbd_client->client->osdc;
-	result = rbd_obj_request_submit(osdc, orig_request);
-	if (!result)
+	img_result = rbd_obj_request_submit(osdc, orig_request);
+	if (!img_result)
 		return;
 out_err:
 	/* Record the error code and complete the request */
 
-	orig_request->result = result;
+	orig_request->result = img_result;
 	orig_request->xferred = 0;
 	obj_request_done_set(orig_request);
 	rbd_obj_request_complete(orig_request);
@@ -2249,22 +2442,13 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
 	int result;
 
 	rbd_assert(obj_request_img_data_test(obj_request));
-	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+	rbd_assert(obj_request_type_valid(obj_request->type));
 
 	img_request = obj_request->img_request;
 	rbd_assert(img_request != NULL);
 	rbd_dev = img_request->rbd_dev;
 	rbd_assert(rbd_dev->parent != NULL);
 
-	/*
-	 * First things first.  The original osd request is of no
-	 * use to use any more, we'll need a new one that can hold
-	 * the two ops in a copyup request.  We'll get that later,
-	 * but for now we can release the old one.
-	 */
-	rbd_osd_req_destroy(obj_request->osd_req);
-	obj_request->osd_req = NULL;
-
 	/*
 	 * Determine the byte range covered by the object in the
 	 * child image to which the original request was to be sent.
@@ -2295,18 +2479,16 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
 	}
 
 	result = -ENOMEM;
-	parent_request = rbd_img_request_create(rbd_dev->parent,
-						img_offset, length,
-						false, true);
+	parent_request = rbd_parent_request_create(obj_request,
+						img_offset, length);
 	if (!parent_request)
 		goto out_err;
-	rbd_obj_request_get(obj_request);
-	parent_request->obj_request = obj_request;
 
 	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
 	if (result)
 		goto out_err;
 	parent_request->copyup_pages = pages;
+	parent_request->copyup_page_count = page_count;
 
 	parent_request->callback = rbd_img_obj_parent_read_full_callback;
 	result = rbd_img_request_submit(parent_request);
@@ -2314,6 +2496,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
 		return 0;
 
 	parent_request->copyup_pages = NULL;
+	parent_request->copyup_page_count = 0;
 	parent_request->obj_request = NULL;
 	rbd_obj_request_put(obj_request);
 out_err:
@@ -2331,6 +2514,7 @@ out_err:
 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
 {
 	struct rbd_obj_request *orig_request;
+	struct rbd_device *rbd_dev;
 	int result;
 
 	rbd_assert(!obj_request_img_data_test(obj_request));
@@ -2353,8 +2537,21 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
 		obj_request->xferred, obj_request->length);
 	rbd_obj_request_put(obj_request);
 
-	rbd_assert(orig_request);
-	rbd_assert(orig_request->img_request);
+	/*
+	 * If the overlap has become 0 (most likely because the
+	 * image has been flattened) we need to free the pages
+	 * and re-submit the original write request.
+	 */
+	rbd_dev = orig_request->img_request->rbd_dev;
+	if (!rbd_dev->parent_overlap) {
+		struct ceph_osd_client *osdc;
+
+		rbd_obj_request_put(orig_request);
+		osdc = &rbd_dev->rbd_client->client->osdc;
+		result = rbd_obj_request_submit(osdc, orig_request);
+		if (!result)
+			return;
+	}
 
 	/*
 	 * Our only purpose here is to determine whether the object
@@ -2512,14 +2709,36 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
 	struct rbd_obj_request *obj_request;
 	struct rbd_device *rbd_dev;
 	u64 obj_end;
+	u64 img_xferred;
+	int img_result;
 
 	rbd_assert(img_request_child_test(img_request));
 
+	/* First get what we need from the image request and release it */
+
 	obj_request = img_request->obj_request;
+	img_xferred = img_request->xferred;
+	img_result = img_request->result;
+	rbd_img_request_put(img_request);
+
+	/*
+	 * If the overlap has become 0 (most likely because the
+	 * image has been flattened) we need to re-submit the
+	 * original request.
+	 */
 	rbd_assert(obj_request);
 	rbd_assert(obj_request->img_request);
+	rbd_dev = obj_request->img_request->rbd_dev;
+	if (!rbd_dev->parent_overlap) {
+		struct ceph_osd_client *osdc;
+
+		osdc = &rbd_dev->rbd_client->client->osdc;
+		img_result = rbd_obj_request_submit(osdc, obj_request);
+		if (!img_result)
+			return;
+	}
 
-	obj_request->result = img_request->result;
+	obj_request->result = img_result;
 	if (obj_request->result)
 		goto out;
 
@@ -2532,7 +2751,6 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
 	 */
 	rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
 	obj_end = obj_request->img_offset + obj_request->length;
-	rbd_dev = obj_request->img_request->rbd_dev;
 	if (obj_end > rbd_dev->parent_overlap) {
 		u64 xferred = 0;
 
@@ -2540,43 +2758,39 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
 			xferred = rbd_dev->parent_overlap -
 					obj_request->img_offset;
 
-		obj_request->xferred = min(img_request->xferred, xferred);
+		obj_request->xferred = min(img_xferred, xferred);
 	} else {
-		obj_request->xferred = img_request->xferred;
+		obj_request->xferred = img_xferred;
 	}
 out:
-	rbd_img_request_put(img_request);
 	rbd_img_obj_request_read_callback(obj_request);
 	rbd_obj_request_complete(obj_request);
 }
 
 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
 {
-	struct rbd_device *rbd_dev;
 	struct rbd_img_request *img_request;
 	int result;
 
 	rbd_assert(obj_request_img_data_test(obj_request));
 	rbd_assert(obj_request->img_request != NULL);
 	rbd_assert(obj_request->result == (s32) -ENOENT);
-	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
+	rbd_assert(obj_request_type_valid(obj_request->type));
 
-	rbd_dev = obj_request->img_request->rbd_dev;
-	rbd_assert(rbd_dev->parent != NULL);
 	/* rbd_read_finish(obj_request, obj_request->length); */
-	img_request = rbd_img_request_create(rbd_dev->parent,
+	img_request = rbd_parent_request_create(obj_request,
 						obj_request->img_offset,
-						obj_request->length,
-						false, true);
+						obj_request->length);
 	result = -ENOMEM;
 	if (!img_request)
 		goto out_err;
 
-	rbd_obj_request_get(obj_request);
-	img_request->obj_request = obj_request;
-
-	result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-					obj_request->bio_list);
+	if (obj_request->type == OBJ_REQUEST_BIO)
+		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
+						obj_request->bio_list);
+	else
+		result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
+						obj_request->pages);
 	if (result)
 		goto out_err;
 
@@ -2626,6 +2840,7 @@ out:
 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 {
 	struct rbd_device *rbd_dev = (struct rbd_device *)data;
+	int ret;
 
 	if (!rbd_dev)
 		return;
@@ -2633,7 +2848,9 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
 		rbd_dev->header_name, (unsigned long long)notify_id,
 		(unsigned int)opcode);
-	(void)rbd_dev_refresh(rbd_dev);
+	ret = rbd_dev_refresh(rbd_dev);
+	if (ret)
+		rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
 
 	rbd_obj_notify_ack(rbd_dev, notify_id);
 }
@@ -2642,7 +2859,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
  * Request sync osd watch/unwatch.  The value of "start" determines
  * whether a watch request is being initiated or torn down.
  */
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_obj_request *obj_request;
@@ -2676,7 +2893,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
 					rbd_dev->watch_request->osd_req);
 
 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-				rbd_dev->watch_event->cookie, 0, start);
+				rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
 	rbd_osd_req_format_write(obj_request);
 
 	ret = rbd_obj_request_submit(osdc, obj_request);
@@ -2869,9 +3086,16 @@ static void rbd_request_fn(struct request_queue *q)
 			goto end_request;	/* Shouldn't happen */
 		}
 
+		result = -EIO;
+		if (offset + length > rbd_dev->mapping.size) {
+			rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
+				offset, length, rbd_dev->mapping.size);
+			goto end_request;
+		}
+
 		result = -ENOMEM;
 		img_request = rbd_img_request_create(rbd_dev, offset, length,
-							write_request, false);
+							write_request);
 		if (!img_request)
 			goto end_request;
 
@@ -3022,17 +3246,11 @@ out:
 }
 
 /*
- * Read the complete header for the given rbd device.
- *
- * Returns a pointer to a dynamically-allocated buffer containing
- * the complete and validated header.  Caller can pass the address
- * of a variable that will be filled in with the version of the
- * header object at the time it was read.
- *
- * Returns a pointer-coded errno if a failure occurs.
+ * Read the complete header for the given rbd device.  On successful
+ * return, the rbd_dev->header field will contain up-to-date
+ * information about the image.
  */
-static struct rbd_image_header_ondisk *
-rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
+static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
 {
 	struct rbd_image_header_ondisk *ondisk = NULL;
 	u32 snap_count = 0;
@@ -3057,22 +3275,22 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
 		size += names_size;
 		ondisk = kmalloc(size, GFP_KERNEL);
 		if (!ondisk)
-			return ERR_PTR(-ENOMEM);
+			return -ENOMEM;
 
 		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
 				       0, size, ondisk);
 		if (ret < 0)
-			goto out_err;
+			goto out;
 		if ((size_t)ret < size) {
 			ret = -ENXIO;
 			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
 				size, ret);
-			goto out_err;
+			goto out;
 		}
 		if (!rbd_dev_ondisk_valid(ondisk)) {
 			ret = -ENXIO;
 			rbd_warn(rbd_dev, "invalid header");
-			goto out_err;
+			goto out;
 		}
 
 		names_size = le64_to_cpu(ondisk->snap_names_len);
@@ -3080,85 +3298,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
 		snap_count = le32_to_cpu(ondisk->snap_count);
 	} while (snap_count != want_count);
 
-	return ondisk;
-
-out_err:
-	kfree(ondisk);
-
-	return ERR_PTR(ret);
-}
-
-/*
- * reload the ondisk the header
- */
-static int rbd_read_header(struct rbd_device *rbd_dev,
-			   struct rbd_image_header *header)
-{
-	struct rbd_image_header_ondisk *ondisk;
-	int ret;
-
-	ondisk = rbd_dev_v1_header_read(rbd_dev);
-	if (IS_ERR(ondisk))
-		return PTR_ERR(ondisk);
-	ret = rbd_header_from_disk(header, ondisk);
+	ret = rbd_header_from_disk(rbd_dev, ondisk);
+out:
 	kfree(ondisk);
 
 	return ret;
 }
 
-static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
-{
-	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
-		return;
-
-	if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
-		sector_t size;
-
-		rbd_dev->mapping.size = rbd_dev->header.image_size;
-		size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
-		dout("setting size to %llu sectors", (unsigned long long)size);
-		set_capacity(rbd_dev->disk, size);
-	}
-}
-
-/*
- * only read the first part of the ondisk header, without the snaps info
- */
-static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
-{
-	int ret;
-	struct rbd_image_header h;
-
-	ret = rbd_read_header(rbd_dev, &h);
-	if (ret < 0)
-		return ret;
-
-	down_write(&rbd_dev->header_rwsem);
-
-	/* Update image size, and check for resize of mapped image */
-	rbd_dev->header.image_size = h.image_size;
-	rbd_update_mapping_size(rbd_dev);
-
-	/* rbd_dev->header.object_prefix shouldn't change */
-	kfree(rbd_dev->header.snap_sizes);
-	kfree(rbd_dev->header.snap_names);
-	/* osd requests may still refer to snapc */
-	ceph_put_snap_context(rbd_dev->header.snapc);
-
-	rbd_dev->header.image_size = h.image_size;
-	rbd_dev->header.snapc = h.snapc;
-	rbd_dev->header.snap_names = h.snap_names;
-	rbd_dev->header.snap_sizes = h.snap_sizes;
-	/* Free the extra copy of the object prefix */
-	if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
-		rbd_warn(rbd_dev, "object prefix changed (ignoring)");
-	kfree(h.object_prefix);
-
-	up_write(&rbd_dev->header_rwsem);
-
-	return ret;
-}
-
 /*
  * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
  * has disappeared from the (just updated) snapshot context.
@@ -3180,26 +3326,29 @@ static void rbd_exists_validate(struct rbd_device *rbd_dev)
 
 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
 {
-	u64 image_size;
+	u64 mapping_size;
 	int ret;
 
 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
-	image_size = rbd_dev->header.image_size;
+	mapping_size = rbd_dev->mapping.size;
 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 	if (rbd_dev->image_format == 1)
-		ret = rbd_dev_v1_refresh(rbd_dev);
+		ret = rbd_dev_v1_header_info(rbd_dev);
 	else
-		ret = rbd_dev_v2_refresh(rbd_dev);
+		ret = rbd_dev_v2_header_info(rbd_dev);
 
 	/* If it's a mapped snapshot, validate its EXISTS flag */
 
 	rbd_exists_validate(rbd_dev);
 	mutex_unlock(&ctl_mutex);
-	if (ret)
-		rbd_warn(rbd_dev, "got notification but failed to "
-			   " update snaps: %d\n", ret);
-	if (image_size != rbd_dev->header.image_size)
+	if (mapping_size != rbd_dev->mapping.size) {
+		sector_t size;
+
+		size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
+		dout("setting size to %llu sectors", (unsigned long long)size);
+		set_capacity(rbd_dev->disk, size);
 		revalidate_disk(rbd_dev->disk);
+	}
 
 	return ret;
 }
@@ -3403,6 +3552,8 @@ static ssize_t rbd_image_refresh(struct device *dev,
 	int ret;
 
 	ret = rbd_dev_refresh(rbd_dev);
+	if (ret)
+		rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
 
 	return ret < 0 ? ret : size;
 }
@@ -3501,6 +3652,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 
 	spin_lock_init(&rbd_dev->lock);
 	rbd_dev->flags = 0;
+	atomic_set(&rbd_dev->parent_ref, 0);
 	INIT_LIST_HEAD(&rbd_dev->node);
 	init_rwsem(&rbd_dev->header_rwsem);
 
@@ -3650,6 +3802,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 	__le64 snapid;
 	void *p;
 	void *end;
+	u64 pool_id;
 	char *image_id;
 	u64 overlap;
 	int ret;
@@ -3680,18 +3833,37 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 	p = reply_buf;
 	end = reply_buf + ret;
 	ret = -ERANGE;
-	ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
-	if (parent_spec->pool_id == CEPH_NOPOOL)
+	ceph_decode_64_safe(&p, end, pool_id, out_err);
+	if (pool_id == CEPH_NOPOOL) {
+		/*
+		 * Either the parent never existed, or we have
+		 * record of it but the image got flattened so it no
+		 * longer has a parent.  When the parent of a
+		 * layered image disappears we immediately set the
+		 * overlap to 0.  The effect of this is that all new
+		 * requests will be treated as if the image had no
+		 * parent.
+		 */
+		if (rbd_dev->parent_overlap) {
+			rbd_dev->parent_overlap = 0;
+			smp_mb();
+			rbd_dev_parent_put(rbd_dev);
+			pr_info("%s: clone image has been flattened\n",
+				rbd_dev->disk->disk_name);
+		}
+
 		goto out;	/* No parent?  No problem. */
+	}
 
 	/* The ceph file layout needs to fit pool id in 32 bits */
 
 	ret = -EIO;
-	if (parent_spec->pool_id > (u64)U32_MAX) {
+	if (pool_id > (u64)U32_MAX) {
 		rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
-			(unsigned long long)parent_spec->pool_id, U32_MAX);
+			(unsigned long long)pool_id, U32_MAX);
 		goto out_err;
 	}
+	parent_spec->pool_id = pool_id;
 
 	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
 	if (IS_ERR(image_id)) {
@@ -3702,9 +3874,14 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 	ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
 	ceph_decode_64_safe(&p, end, overlap, out_err);
 
-	rbd_dev->parent_overlap = overlap;
-	rbd_dev->parent_spec = parent_spec;
-	parent_spec = NULL;	/* rbd_dev now owns this */
+	if (overlap) {
+		rbd_spec_put(rbd_dev->parent_spec);
+		rbd_dev->parent_spec = parent_spec;
+		parent_spec = NULL;	/* rbd_dev now owns this */
+		rbd_dev->parent_overlap = overlap;
+	} else {
+		rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
+	}
 out:
 	ret = 0;
 out_err:
@@ -4002,6 +4179,7 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
 	for (i = 0; i < snap_count; i++)
 		snapc->snaps[i] = ceph_decode_64(&p);
 
+	ceph_put_snap_context(rbd_dev->header.snapc);
 	rbd_dev->header.snapc = snapc;
 
 	dout("  snap context seq = %llu, snap_count = %u\n",
@@ -4053,21 +4231,56 @@ out:
 	return snap_name;
 }
 
-static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
+static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
 {
+	bool first_time = rbd_dev->header.object_prefix == NULL;
 	int ret;
 
 	down_write(&rbd_dev->header_rwsem);
 
+	if (first_time) {
+		ret = rbd_dev_v2_header_onetime(rbd_dev);
+		if (ret)
+			goto out;
+	}
+
+	/*
+	 * If the image supports layering, get the parent info.  We
+	 * need to probe the first time regardless.  Thereafter we
+	 * only need to if there's a parent, to see if it has
+	 * disappeared due to the mapped image getting flattened.
+	 */
+	if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
+			(first_time || rbd_dev->parent_spec)) {
+		bool warn;
+
+		ret = rbd_dev_v2_parent_info(rbd_dev);
+		if (ret)
+			goto out;
+
+		/*
+		 * Print a warning if this is the initial probe and
+		 * the image has a parent.  Don't print it if the
+		 * image now being probed is itself a parent.  We
+		 * can tell at this point because we won't know its
+		 * pool name yet (just its pool id).
+		 */
+		warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
+		if (first_time && warn)
+			rbd_warn(rbd_dev, "WARNING: kernel layering "
+					"is EXPERIMENTAL!");
+	}
+
 	ret = rbd_dev_v2_image_size(rbd_dev);
 	if (ret)
 		goto out;
-	rbd_update_mapping_size(rbd_dev);
+
+	if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
+		if (rbd_dev->mapping.size != rbd_dev->header.image_size)
+			rbd_dev->mapping.size = rbd_dev->header.image_size;
 
 	ret = rbd_dev_v2_snap_context(rbd_dev);
 	dout("rbd_dev_v2_snap_context returned %d\n", ret);
-	if (ret)
-		goto out;
 out:
 	up_write(&rbd_dev->header_rwsem);
 
@@ -4490,10 +4703,10 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
 {
 	struct rbd_image_header	*header;
 
-	rbd_dev_remove_parent(rbd_dev);
-	rbd_spec_put(rbd_dev->parent_spec);
-	rbd_dev->parent_spec = NULL;
-	rbd_dev->parent_overlap = 0;
+	/* Drop parent reference unless it's already been done (or none) */
+
+	if (rbd_dev->parent_overlap)
+		rbd_dev_parent_put(rbd_dev);
 
 	/* Free dynamic fields from the header, then zero it out */
 
@@ -4505,72 +4718,22 @@ static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
 	memset(header, 0, sizeof (*header));
 }
 
-static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
 {
 	int ret;
 
-	/* Populate rbd image metadata */
-
-	ret = rbd_read_header(rbd_dev, &rbd_dev->header);
-	if (ret < 0)
-		goto out_err;
-
-	/* Version 1 images have no parent (no layering) */
-
-	rbd_dev->parent_spec = NULL;
-	rbd_dev->parent_overlap = 0;
-
-	dout("discovered version 1 image, header name is %s\n",
-		rbd_dev->header_name);
-
-	return 0;
-
-out_err:
-	kfree(rbd_dev->header_name);
-	rbd_dev->header_name = NULL;
-	kfree(rbd_dev->spec->image_id);
-	rbd_dev->spec->image_id = NULL;
-
-	return ret;
-}
-
-static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
-{
-	int ret;
-
-	ret = rbd_dev_v2_image_size(rbd_dev);
-	if (ret)
-		goto out_err;
-
-	/* Get the object prefix (a.k.a. block_name) for the image */
-
 	ret = rbd_dev_v2_object_prefix(rbd_dev);
 	if (ret)
 		goto out_err;
 
-	/* Get the and check features for the image */
-
+	/*
+	 * Get the and check features for the image.  Currently the
+	 * features are assumed to never change.
+	 */
 	ret = rbd_dev_v2_features(rbd_dev);
 	if (ret)
 		goto out_err;
 
-	/* If the image supports layering, get the parent info */
-
-	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
-		ret = rbd_dev_v2_parent_info(rbd_dev);
-		if (ret)
-			goto out_err;
-
-		/*
-		 * Don't print a warning for parent images.  We can
-		 * tell this point because we won't know its pool
-		 * name yet (just its pool id).
-		 */
-		if (rbd_dev->spec->pool_name)
-			rbd_warn(rbd_dev, "WARNING: kernel layering "
-					"is EXPERIMENTAL!");
-	}
-
 	/* If the image supports fancy striping, get its parameters */
 
 	if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
@@ -4578,28 +4741,11 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
 		if (ret < 0)
 			goto out_err;
 	}
-
-	/* crypto and compression type aren't (yet) supported for v2 images */
-
-	rbd_dev->header.crypt_type = 0;
-	rbd_dev->header.comp_type = 0;
-
-	/* Get the snapshot context, plus the header version */
-
-	ret = rbd_dev_v2_snap_context(rbd_dev);
-	if (ret)
-		goto out_err;
-
-	dout("discovered version 2 image, header name is %s\n",
-		rbd_dev->header_name);
+	/* No support for crypto and compression type format 2 images */
 
 	return 0;
 out_err:
-	rbd_dev->parent_overlap = 0;
-	rbd_spec_put(rbd_dev->parent_spec);
-	rbd_dev->parent_spec = NULL;
-	kfree(rbd_dev->header_name);
-	rbd_dev->header_name = NULL;
+	rbd_dev->header.features = 0;
 	kfree(rbd_dev->header.object_prefix);
 	rbd_dev->header.object_prefix = NULL;
 
@@ -4628,15 +4774,16 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
 	if (!parent)
 		goto out_err;
 
-	ret = rbd_dev_image_probe(parent);
+	ret = rbd_dev_image_probe(parent, false);
 	if (ret < 0)
 		goto out_err;
 	rbd_dev->parent = parent;
+	atomic_set(&rbd_dev->parent_ref, 1);
 
 	return 0;
 out_err:
 	if (parent) {
-		rbd_spec_put(rbd_dev->parent_spec);
+		rbd_dev_unparent(rbd_dev);
 		kfree(rbd_dev->header_name);
 		rbd_dev_destroy(parent);
 	} else {
@@ -4651,10 +4798,6 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 {
 	int ret;
 
-	ret = rbd_dev_mapping_set(rbd_dev);
-	if (ret)
-		return ret;
-
 	/* generate unique id: find highest unique id, add one */
 	rbd_dev_id_get(rbd_dev);
 
@@ -4676,13 +4819,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 	if (ret)
 		goto err_out_blkdev;
 
-	ret = rbd_bus_add_dev(rbd_dev);
+	ret = rbd_dev_mapping_set(rbd_dev);
 	if (ret)
 		goto err_out_disk;
+	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+
+	ret = rbd_bus_add_dev(rbd_dev);
+	if (ret)
+		goto err_out_mapping;
 
 	/* Everything's ready.  Announce the disk to the world. */
 
-	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
 	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	add_disk(rbd_dev->disk);
 
@@ -4691,6 +4838,8 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 
 	return ret;
 
+err_out_mapping:
+	rbd_dev_mapping_clear(rbd_dev);
 err_out_disk:
 	rbd_free_disk(rbd_dev);
 err_out_blkdev:
@@ -4731,12 +4880,7 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
 
 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 {
-	int ret;
-
 	rbd_dev_unprobe(rbd_dev);
-	ret = rbd_dev_header_watch_sync(rbd_dev, 0);
-	if (ret)
-		rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
 	kfree(rbd_dev->header_name);
 	rbd_dev->header_name = NULL;
 	rbd_dev->image_format = 0;
@@ -4748,10 +4892,11 @@ static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 
 /*
  * Probe for the existence of the header object for the given rbd
- * device.  For format 2 images this includes determining the image
- * id.
+ * device.  If this image is the one being mapped (i.e., not a
+ * parent), initiate a watch on its header object before using that
+ * object to get detailed information about the rbd image.
  */
-static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
 {
 	int ret;
 	int tmp;
@@ -4771,14 +4916,16 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
 	if (ret)
 		goto err_out_format;
 
-	ret = rbd_dev_header_watch_sync(rbd_dev, 1);
-	if (ret)
-		goto out_header_name;
+	if (mapping) {
+		ret = rbd_dev_header_watch_sync(rbd_dev, true);
+		if (ret)
+			goto out_header_name;
+	}
 
 	if (rbd_dev->image_format == 1)
-		ret = rbd_dev_v1_probe(rbd_dev);
+		ret = rbd_dev_v1_header_info(rbd_dev);
 	else
-		ret = rbd_dev_v2_probe(rbd_dev);
+		ret = rbd_dev_v2_header_info(rbd_dev);
 	if (ret)
 		goto err_out_watch;
 
@@ -4787,15 +4934,22 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
 		goto err_out_probe;
 
 	ret = rbd_dev_probe_parent(rbd_dev);
-	if (!ret)
-		return 0;
+	if (ret)
+		goto err_out_probe;
+
+	dout("discovered format %u image, header name is %s\n",
+		rbd_dev->image_format, rbd_dev->header_name);
 
+	return 0;
 err_out_probe:
 	rbd_dev_unprobe(rbd_dev);
 err_out_watch:
-	tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
-	if (tmp)
-		rbd_warn(rbd_dev, "unable to tear down watch request\n");
+	if (mapping) {
+		tmp = rbd_dev_header_watch_sync(rbd_dev, false);
+		if (tmp)
+			rbd_warn(rbd_dev, "unable to tear down "
+					"watch request (%d)\n", tmp);
+	}
 out_header_name:
 	kfree(rbd_dev->header_name);
 	rbd_dev->header_name = NULL;
@@ -4819,6 +4973,7 @@ static ssize_t rbd_add(struct bus_type *bus,
 	struct rbd_spec *spec = NULL;
 	struct rbd_client *rbdc;
 	struct ceph_osd_client *osdc;
+	bool read_only;
 	int rc = -ENOMEM;
 
 	if (!try_module_get(THIS_MODULE))
@@ -4828,6 +4983,9 @@ static ssize_t rbd_add(struct bus_type *bus,
 	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
 	if (rc < 0)
 		goto err_out_module;
+	read_only = rbd_opts->read_only;
+	kfree(rbd_opts);
+	rbd_opts = NULL;	/* done with this */
 
 	rbdc = rbd_get_client(ceph_opts);
 	if (IS_ERR(rbdc)) {
@@ -4858,14 +5016,16 @@ static ssize_t rbd_add(struct bus_type *bus,
 	rbdc = NULL;		/* rbd_dev now owns this */
 	spec = NULL;		/* rbd_dev now owns this */
 
-	rbd_dev->mapping.read_only = rbd_opts->read_only;
-	kfree(rbd_opts);
-	rbd_opts = NULL;	/* done with this */
-
-	rc = rbd_dev_image_probe(rbd_dev);
+	rc = rbd_dev_image_probe(rbd_dev, true);
 	if (rc < 0)
 		goto err_out_rbd_dev;
 
+	/* If we are mapping a snapshot it must be marked read-only */
+
+	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+		read_only = true;
+	rbd_dev->mapping.read_only = read_only;
+
 	rc = rbd_dev_device_setup(rbd_dev);
 	if (!rc)
 		return count;
@@ -4911,7 +5071,7 @@ static void rbd_dev_device_release(struct device *dev)
 
 	rbd_free_disk(rbd_dev);
 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
-	rbd_dev_clear_mapping(rbd_dev);
+	rbd_dev_mapping_clear(rbd_dev);
 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
 	rbd_dev->major = 0;
 	rbd_dev_id_put(rbd_dev);
@@ -4978,10 +5138,13 @@ static ssize_t rbd_remove(struct bus_type *bus,
 	spin_unlock_irq(&rbd_dev->lock);
 	if (ret < 0)
 		goto done;
-	ret = count;
 	rbd_bus_del_dev(rbd_dev);
+	ret = rbd_dev_header_watch_sync(rbd_dev, false);
+	if (ret)
+		rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
 	rbd_dev_image_release(rbd_dev);
 	module_put(THIS_MODULE);
+	ret = count;
 done:
 	mutex_unlock(&ctl_mutex);
 

+ 4 - 1
net/ceph/osd_client.c

@@ -1204,6 +1204,7 @@ void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
 	mutex_lock(&osdc->request_mutex);
 	if (req->r_linger) {
 		__unregister_linger_request(osdc, req);
+		req->r_linger = 0;
 		ceph_osdc_put_request(req);
 	}
 	mutex_unlock(&osdc->request_mutex);
@@ -2120,7 +2121,9 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 	down_read(&osdc->map_sem);
 	mutex_lock(&osdc->request_mutex);
 	__register_request(osdc, req);
-	WARN_ON(req->r_sent);
+	req->r_sent = 0;
+	req->r_got_reply = 0;
+	req->r_completed = 0;
 	rc = __map_request(osdc, req, 0);
 	if (rc < 0) {
 		if (nofail) {