Преглед изворни кода

Merge tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "The two main items are support for disabling automatic rbd exclusive
  lock transfers from myself and the long awaited -ENOSPC handling
  series from Jeff.

  The former will allow rbd users to take advantage of exclusive lock's
  built-in blacklist/break-lock functionality while staying in control
  of who owns the lock. With the latter in place, we will abort
  filesystem writes on -ENOSPC instead of having them block
  indefinitely.

  Beyond that we've got the usual pile of filesystem fixes from Zheng,
  some refcount_t conversion patches from Elena and a patch for an
  ancient open() flags handling bug from Alexander"

* tag 'ceph-for-4.12-rc1' of git://github.com/ceph/ceph-client: (31 commits)
  ceph: fix memory leak in __ceph_setxattr()
  ceph: fix file open flags on ppc64
  ceph: choose readdir frag based on previous readdir reply
  rbd: exclusive map option
  rbd: return ResponseMessage result from rbd_handle_request_lock()
  rbd: kill rbd_is_lock_supported()
  rbd: support updating the lock cookie without releasing the lock
  rbd: store lock cookie
  rbd: ignore unlock errors
  rbd: fix error handling around rbd_init_disk()
  rbd: move rbd_unregister_watch() call into rbd_dev_image_release()
  rbd: move rbd_dev_destroy() call out of rbd_dev_image_release()
  ceph: when seeing write errors on an inode, switch to sync writes
  Revert "ceph: SetPageError() for writeback pages if writepages fails"
  ceph: handle epoch barriers in cap messages
  libceph: add an epoch_barrier field to struct ceph_osd_client
  libceph: abort already submitted but abortable requests when map or pool goes full
  libceph: allow requests to return immediately on full conditions if caller wishes
  libceph: remove req->r_replay_version
  ceph: make seeky readdir more efficient
  ...
Linus Torvalds пре 8 година
родитељ
комит
26c5eaa132

+ 215 - 144
drivers/block/rbd.c

@@ -387,6 +387,7 @@ struct rbd_device {
 
 
 	struct rw_semaphore	lock_rwsem;
 	struct rw_semaphore	lock_rwsem;
 	enum rbd_lock_state	lock_state;
 	enum rbd_lock_state	lock_state;
+	char			lock_cookie[32];
 	struct rbd_client_id	owner_cid;
 	struct rbd_client_id	owner_cid;
 	struct work_struct	acquired_lock_work;
 	struct work_struct	acquired_lock_work;
 	struct work_struct	released_lock_work;
 	struct work_struct	released_lock_work;
@@ -477,13 +478,6 @@ static int minor_to_rbd_dev_id(int minor)
 	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 }
 }
 
 
-static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
-{
-	return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
-	       rbd_dev->spec->snap_id == CEPH_NOSNAP &&
-	       !rbd_dev->mapping.read_only;
-}
-
 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
 static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
 {
 {
 	return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
 	return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
@@ -731,7 +725,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 	kref_init(&rbdc->kref);
 	kref_init(&rbdc->kref);
 	INIT_LIST_HEAD(&rbdc->node);
 	INIT_LIST_HEAD(&rbdc->node);
 
 
-	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
+	rbdc->client = ceph_create_client(ceph_opts, rbdc);
 	if (IS_ERR(rbdc->client))
 	if (IS_ERR(rbdc->client))
 		goto out_rbdc;
 		goto out_rbdc;
 	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
@@ -804,6 +798,7 @@ enum {
 	Opt_read_only,
 	Opt_read_only,
 	Opt_read_write,
 	Opt_read_write,
 	Opt_lock_on_read,
 	Opt_lock_on_read,
+	Opt_exclusive,
 	Opt_err
 	Opt_err
 };
 };
 
 
@@ -816,6 +811,7 @@ static match_table_t rbd_opts_tokens = {
 	{Opt_read_write, "read_write"},
 	{Opt_read_write, "read_write"},
 	{Opt_read_write, "rw"},		/* Alternate spelling */
 	{Opt_read_write, "rw"},		/* Alternate spelling */
 	{Opt_lock_on_read, "lock_on_read"},
 	{Opt_lock_on_read, "lock_on_read"},
+	{Opt_exclusive, "exclusive"},
 	{Opt_err, NULL}
 	{Opt_err, NULL}
 };
 };
 
 
@@ -823,11 +819,13 @@ struct rbd_options {
 	int	queue_depth;
 	int	queue_depth;
 	bool	read_only;
 	bool	read_only;
 	bool	lock_on_read;
 	bool	lock_on_read;
+	bool	exclusive;
 };
 };
 
 
 #define RBD_QUEUE_DEPTH_DEFAULT	BLKDEV_MAX_RQ
 #define RBD_QUEUE_DEPTH_DEFAULT	BLKDEV_MAX_RQ
 #define RBD_READ_ONLY_DEFAULT	false
 #define RBD_READ_ONLY_DEFAULT	false
 #define RBD_LOCK_ON_READ_DEFAULT false
 #define RBD_LOCK_ON_READ_DEFAULT false
+#define RBD_EXCLUSIVE_DEFAULT	false
 
 
 static int parse_rbd_opts_token(char *c, void *private)
 static int parse_rbd_opts_token(char *c, void *private)
 {
 {
@@ -866,6 +864,9 @@ static int parse_rbd_opts_token(char *c, void *private)
 	case Opt_lock_on_read:
 	case Opt_lock_on_read:
 		rbd_opts->lock_on_read = true;
 		rbd_opts->lock_on_read = true;
 		break;
 		break;
+	case Opt_exclusive:
+		rbd_opts->exclusive = true;
+		break;
 	default:
 	default:
 		/* libceph prints "bad option" msg */
 		/* libceph prints "bad option" msg */
 		return -EINVAL;
 		return -EINVAL;
@@ -3079,7 +3080,8 @@ static int rbd_lock(struct rbd_device *rbd_dev)
 	char cookie[32];
 	char cookie[32];
 	int ret;
 	int ret;
 
 
-	WARN_ON(__rbd_is_lock_owner(rbd_dev));
+	WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
+		rbd_dev->lock_cookie[0] != '\0');
 
 
 	format_lock_cookie(rbd_dev, cookie);
 	format_lock_cookie(rbd_dev, cookie);
 	ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
 	ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
@@ -3089,6 +3091,7 @@ static int rbd_lock(struct rbd_device *rbd_dev)
 		return ret;
 		return ret;
 
 
 	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
 	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
+	strcpy(rbd_dev->lock_cookie, cookie);
 	rbd_set_owner_cid(rbd_dev, &cid);
 	rbd_set_owner_cid(rbd_dev, &cid);
 	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
 	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
 	return 0;
 	return 0;
@@ -3097,27 +3100,24 @@ static int rbd_lock(struct rbd_device *rbd_dev)
 /*
 /*
  * lock_rwsem must be held for write
  * lock_rwsem must be held for write
  */
  */
-static int rbd_unlock(struct rbd_device *rbd_dev)
+static void rbd_unlock(struct rbd_device *rbd_dev)
 {
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-	char cookie[32];
 	int ret;
 	int ret;
 
 
-	WARN_ON(!__rbd_is_lock_owner(rbd_dev));
-
-	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+	WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
+		rbd_dev->lock_cookie[0] == '\0');
 
 
-	format_lock_cookie(rbd_dev, cookie);
 	ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
 	ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
-			      RBD_LOCK_NAME, cookie);
-	if (ret && ret != -ENOENT) {
-		rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
-		return ret;
-	}
+			      RBD_LOCK_NAME, rbd_dev->lock_cookie);
+	if (ret && ret != -ENOENT)
+		rbd_warn(rbd_dev, "failed to unlock: %d", ret);
 
 
+	/* treat errors as the image is unlocked */
+	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+	rbd_dev->lock_cookie[0] = '\0';
 	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
 	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
 	queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
 	queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
-	return 0;
 }
 }
 
 
 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
 static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
@@ -3447,6 +3447,18 @@ again:
 	ret = rbd_request_lock(rbd_dev);
 	ret = rbd_request_lock(rbd_dev);
 	if (ret == -ETIMEDOUT) {
 	if (ret == -ETIMEDOUT) {
 		goto again; /* treat this as a dead client */
 		goto again; /* treat this as a dead client */
+	} else if (ret == -EROFS) {
+		rbd_warn(rbd_dev, "peer will not release lock");
+		/*
+		 * If this is rbd_add_acquire_lock(), we want to fail
+		 * immediately -- reuse BLACKLISTED flag.  Otherwise we
+		 * want to block.
+		 */
+		if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
+			set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
+			/* wake "rbd map --exclusive" process */
+			wake_requests(rbd_dev, false);
+		}
 	} else if (ret < 0) {
 	} else if (ret < 0) {
 		rbd_warn(rbd_dev, "error requesting lock: %d", ret);
 		rbd_warn(rbd_dev, "error requesting lock: %d", ret);
 		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
 		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
@@ -3490,16 +3502,15 @@ static bool rbd_release_lock(struct rbd_device *rbd_dev)
 	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
 	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
 		return false;
 		return false;
 
 
-	if (!rbd_unlock(rbd_dev))
-		/*
-		 * Give others a chance to grab the lock - we would re-acquire
-		 * almost immediately if we got new IO during ceph_osdc_sync()
-		 * otherwise.  We need to ack our own notifications, so this
-		 * lock_dwork will be requeued from rbd_wait_state_locked()
-		 * after wake_requests() in rbd_handle_released_lock().
-		 */
-		cancel_delayed_work(&rbd_dev->lock_dwork);
-
+	rbd_unlock(rbd_dev);
+	/*
+	 * Give others a chance to grab the lock - we would re-acquire
+	 * almost immediately if we got new IO during ceph_osdc_sync()
+	 * otherwise.  We need to ack our own notifications, so this
+	 * lock_dwork will be requeued from rbd_wait_state_locked()
+	 * after wake_requests() in rbd_handle_released_lock().
+	 */
+	cancel_delayed_work(&rbd_dev->lock_dwork);
 	return true;
 	return true;
 }
 }
 
 
@@ -3580,12 +3591,16 @@ static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
 	up_read(&rbd_dev->lock_rwsem);
 	up_read(&rbd_dev->lock_rwsem);
 }
 }
 
 
-static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
-				    void **p)
+/*
+ * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
+ * ResponseMessage is needed.
+ */
+static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
+				   void **p)
 {
 {
 	struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
 	struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
 	struct rbd_client_id cid = { 0 };
 	struct rbd_client_id cid = { 0 };
-	bool need_to_send;
+	int result = 1;
 
 
 	if (struct_v >= 2) {
 	if (struct_v >= 2) {
 		cid.gid = ceph_decode_64(p);
 		cid.gid = ceph_decode_64(p);
@@ -3595,19 +3610,36 @@ static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
 	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
 	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
 	     cid.handle);
 	     cid.handle);
 	if (rbd_cid_equal(&cid, &my_cid))
 	if (rbd_cid_equal(&cid, &my_cid))
-		return false;
+		return result;
 
 
 	down_read(&rbd_dev->lock_rwsem);
 	down_read(&rbd_dev->lock_rwsem);
-	need_to_send = __rbd_is_lock_owner(rbd_dev);
-	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
-		if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
-			dout("%s rbd_dev %p queueing unlock_work\n", __func__,
-			     rbd_dev);
-			queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
+	if (__rbd_is_lock_owner(rbd_dev)) {
+		if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
+		    rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
+			goto out_unlock;
+
+		/*
+		 * encode ResponseMessage(0) so the peer can detect
+		 * a missing owner
+		 */
+		result = 0;
+
+		if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
+			if (!rbd_dev->opts->exclusive) {
+				dout("%s rbd_dev %p queueing unlock_work\n",
+				     __func__, rbd_dev);
+				queue_work(rbd_dev->task_wq,
+					   &rbd_dev->unlock_work);
+			} else {
+				/* refuse to release the lock */
+				result = -EROFS;
+			}
 		}
 		}
 	}
 	}
+
+out_unlock:
 	up_read(&rbd_dev->lock_rwsem);
 	up_read(&rbd_dev->lock_rwsem);
-	return need_to_send;
+	return result;
 }
 }
 
 
 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
 static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
@@ -3690,13 +3722,10 @@ static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
 		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
 		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
 		break;
 		break;
 	case RBD_NOTIFY_OP_REQUEST_LOCK:
 	case RBD_NOTIFY_OP_REQUEST_LOCK:
-		if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
-			/*
-			 * send ResponseMessage(0) back so the client
-			 * can detect a missing owner
-			 */
+		ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
+		if (ret <= 0)
 			rbd_acknowledge_notify_result(rbd_dev, notify_id,
 			rbd_acknowledge_notify_result(rbd_dev, notify_id,
-						      cookie, 0);
+						      cookie, ret);
 		else
 		else
 			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
 			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
 		break;
 		break;
@@ -3821,24 +3850,51 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev)
 	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
 	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
 }
 }
 
 
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	char cookie[32];
+	int ret;
+
+	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+
+	format_lock_cookie(rbd_dev, cookie);
+	ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
+				  &rbd_dev->header_oloc, RBD_LOCK_NAME,
+				  CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
+				  RBD_LOCK_TAG, cookie);
+	if (ret) {
+		if (ret != -EOPNOTSUPP)
+			rbd_warn(rbd_dev, "failed to update lock cookie: %d",
+				 ret);
+
+		/*
+		 * Lock cookie cannot be updated on older OSDs, so do
+		 * a manual release and queue an acquire.
+		 */
+		if (rbd_release_lock(rbd_dev))
+			queue_delayed_work(rbd_dev->task_wq,
+					   &rbd_dev->lock_dwork, 0);
+	} else {
+		strcpy(rbd_dev->lock_cookie, cookie);
+	}
+}
+
 static void rbd_reregister_watch(struct work_struct *work)
 static void rbd_reregister_watch(struct work_struct *work)
 {
 {
 	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
 	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
 					    struct rbd_device, watch_dwork);
 					    struct rbd_device, watch_dwork);
-	bool was_lock_owner = false;
-	bool need_to_wake = false;
 	int ret;
 	int ret;
 
 
 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
 
-	down_write(&rbd_dev->lock_rwsem);
-	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
-		was_lock_owner = rbd_release_lock(rbd_dev);
-
 	mutex_lock(&rbd_dev->watch_mutex);
 	mutex_lock(&rbd_dev->watch_mutex);
 	if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
 	if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
 		mutex_unlock(&rbd_dev->watch_mutex);
 		mutex_unlock(&rbd_dev->watch_mutex);
-		goto out;
+		return;
 	}
 	}
 
 
 	ret = __rbd_register_watch(rbd_dev);
 	ret = __rbd_register_watch(rbd_dev);
@@ -3846,36 +3902,28 @@ static void rbd_reregister_watch(struct work_struct *work)
 		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
 		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
 		if (ret == -EBLACKLISTED || ret == -ENOENT) {
 		if (ret == -EBLACKLISTED || ret == -ENOENT) {
 			set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
 			set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
-			need_to_wake = true;
+			wake_requests(rbd_dev, true);
 		} else {
 		} else {
 			queue_delayed_work(rbd_dev->task_wq,
 			queue_delayed_work(rbd_dev->task_wq,
 					   &rbd_dev->watch_dwork,
 					   &rbd_dev->watch_dwork,
 					   RBD_RETRY_DELAY);
 					   RBD_RETRY_DELAY);
 		}
 		}
 		mutex_unlock(&rbd_dev->watch_mutex);
 		mutex_unlock(&rbd_dev->watch_mutex);
-		goto out;
+		return;
 	}
 	}
 
 
-	need_to_wake = true;
 	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
 	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
 	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
 	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
 	mutex_unlock(&rbd_dev->watch_mutex);
 	mutex_unlock(&rbd_dev->watch_mutex);
 
 
+	down_write(&rbd_dev->lock_rwsem);
+	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
+		rbd_reacquire_lock(rbd_dev);
+	up_write(&rbd_dev->lock_rwsem);
+
 	ret = rbd_dev_refresh(rbd_dev);
 	ret = rbd_dev_refresh(rbd_dev);
 	if (ret)
 	if (ret)
 		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
 		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
-
-	if (was_lock_owner) {
-		ret = rbd_try_lock(rbd_dev);
-		if (ret)
-			rbd_warn(rbd_dev, "reregisteration lock failed: %d",
-				 ret);
-	}
-
-out:
-	up_write(&rbd_dev->lock_rwsem);
-	if (need_to_wake)
-		wake_requests(rbd_dev, true);
 }
 }
 
 
 /*
 /*
@@ -4034,10 +4082,6 @@ static void rbd_queue_workfn(struct work_struct *work)
 	if (op_type != OBJ_OP_READ) {
 	if (op_type != OBJ_OP_READ) {
 		snapc = rbd_dev->header.snapc;
 		snapc = rbd_dev->header.snapc;
 		ceph_get_snap_context(snapc);
 		ceph_get_snap_context(snapc);
-		must_be_locked = rbd_is_lock_supported(rbd_dev);
-	} else {
-		must_be_locked = rbd_dev->opts->lock_on_read &&
-					rbd_is_lock_supported(rbd_dev);
 	}
 	}
 	up_read(&rbd_dev->header_rwsem);
 	up_read(&rbd_dev->header_rwsem);
 
 
@@ -4048,14 +4092,20 @@ static void rbd_queue_workfn(struct work_struct *work)
 		goto err_rq;
 		goto err_rq;
 	}
 	}
 
 
+	must_be_locked =
+	    (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
+	    (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
 	if (must_be_locked) {
 	if (must_be_locked) {
 		down_read(&rbd_dev->lock_rwsem);
 		down_read(&rbd_dev->lock_rwsem);
 		if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
 		if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
-		    !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
+		    !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+			if (rbd_dev->opts->exclusive) {
+				rbd_warn(rbd_dev, "exclusive lock required");
+				result = -EROFS;
+				goto err_unlock;
+			}
 			rbd_wait_state_locked(rbd_dev);
 			rbd_wait_state_locked(rbd_dev);
-
-		WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
-			!test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
+		}
 		if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
 		if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
 			result = -EBLACKLISTED;
 			result = -EBLACKLISTED;
 			goto err_unlock;
 			goto err_unlock;
@@ -4114,19 +4164,10 @@ static int rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
 
 
 static void rbd_free_disk(struct rbd_device *rbd_dev)
 static void rbd_free_disk(struct rbd_device *rbd_dev)
 {
 {
-	struct gendisk *disk = rbd_dev->disk;
-
-	if (!disk)
-		return;
-
+	blk_cleanup_queue(rbd_dev->disk->queue);
+	blk_mq_free_tag_set(&rbd_dev->tag_set);
+	put_disk(rbd_dev->disk);
 	rbd_dev->disk = NULL;
 	rbd_dev->disk = NULL;
-	if (disk->flags & GENHD_FL_UP) {
-		del_gendisk(disk);
-		if (disk->queue)
-			blk_cleanup_queue(disk->queue);
-		blk_mq_free_tag_set(&rbd_dev->tag_set);
-	}
-	put_disk(disk);
 }
 }
 
 
 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
@@ -4383,8 +4424,12 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 	if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
 	if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
 		q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
 		q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
 
 
+	/*
+	 * disk_release() expects a queue ref from add_disk() and will
+	 * put it.  Hold an extra ref until add_disk() is called.
+	 */
+	WARN_ON(!blk_get_queue(q));
 	disk->queue = q;
 	disk->queue = q;
-
 	q->queuedata = rbd_dev;
 	q->queuedata = rbd_dev;
 
 
 	rbd_dev->disk = disk;
 	rbd_dev->disk = disk;
@@ -5624,6 +5669,7 @@ static int rbd_add_parse_args(const char *buf,
 	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
 	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
 	rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
 	rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
 	rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
 	rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
+	rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
 
 
 	copts = ceph_parse_options(options, mon_addrs,
 	copts = ceph_parse_options(options, mon_addrs,
 					mon_addrs + mon_addrs_size - 1,
 					mon_addrs + mon_addrs_size - 1,
@@ -5682,6 +5728,33 @@ again:
 	return ret;
 	return ret;
 }
 }
 
 
+static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
+{
+	down_write(&rbd_dev->lock_rwsem);
+	if (__rbd_is_lock_owner(rbd_dev))
+		rbd_unlock(rbd_dev);
+	up_write(&rbd_dev->lock_rwsem);
+}
+
+static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
+{
+	if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
+		rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
+		return -EINVAL;
+	}
+
+	/* FIXME: "rbd map --exclusive" should be in interruptible */
+	down_read(&rbd_dev->lock_rwsem);
+	rbd_wait_state_locked(rbd_dev);
+	up_read(&rbd_dev->lock_rwsem);
+	if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+		rbd_warn(rbd_dev, "failed to acquire exclusive lock");
+		return -EROFS;
+	}
+
+	return 0;
+}
+
 /*
 /*
  * An rbd format 2 image has a unique identifier, distinct from the
  * An rbd format 2 image has a unique identifier, distinct from the
  * name given to it by the user.  Internally, that identifier is
  * name given to it by the user.  Internally, that identifier is
@@ -5873,6 +5946,15 @@ out_err:
 	return ret;
 	return ret;
 }
 }
 
 
+static void rbd_dev_device_release(struct rbd_device *rbd_dev)
+{
+	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+	rbd_dev_mapping_clear(rbd_dev);
+	rbd_free_disk(rbd_dev);
+	if (!single_major)
+		unregister_blkdev(rbd_dev->major, rbd_dev->name);
+}
+
 /*
 /*
  * rbd_dev->header_rwsem must be locked for write and will be unlocked
  * rbd_dev->header_rwsem must be locked for write and will be unlocked
  * upon return.
  * upon return.
@@ -5908,26 +5990,13 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
 	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
 
-	dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
-	ret = device_add(&rbd_dev->dev);
+	ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
 	if (ret)
 	if (ret)
 		goto err_out_mapping;
 		goto err_out_mapping;
 
 
-	/* Everything's ready.  Announce the disk to the world. */
-
 	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	up_write(&rbd_dev->header_rwsem);
 	up_write(&rbd_dev->header_rwsem);
-
-	spin_lock(&rbd_dev_list_lock);
-	list_add_tail(&rbd_dev->node, &rbd_dev_list);
-	spin_unlock(&rbd_dev_list_lock);
-
-	add_disk(rbd_dev->disk);
-	pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
-		(unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
-		rbd_dev->header.features);
-
-	return ret;
+	return 0;
 
 
 err_out_mapping:
 err_out_mapping:
 	rbd_dev_mapping_clear(rbd_dev);
 	rbd_dev_mapping_clear(rbd_dev);
@@ -5962,11 +6031,11 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev)
 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
 {
 {
 	rbd_dev_unprobe(rbd_dev);
 	rbd_dev_unprobe(rbd_dev);
+	if (rbd_dev->opts)
+		rbd_unregister_watch(rbd_dev);
 	rbd_dev->image_format = 0;
 	rbd_dev->image_format = 0;
 	kfree(rbd_dev->spec->image_id);
 	kfree(rbd_dev->spec->image_id);
 	rbd_dev->spec->image_id = NULL;
 	rbd_dev->spec->image_id = NULL;
-
-	rbd_dev_destroy(rbd_dev);
 }
 }
 
 
 /*
 /*
@@ -6126,22 +6195,43 @@ static ssize_t do_rbd_add(struct bus_type *bus,
 	rbd_dev->mapping.read_only = read_only;
 	rbd_dev->mapping.read_only = read_only;
 
 
 	rc = rbd_dev_device_setup(rbd_dev);
 	rc = rbd_dev_device_setup(rbd_dev);
-	if (rc) {
-		/*
-		 * rbd_unregister_watch() can't be moved into
-		 * rbd_dev_image_release() without refactoring, see
-		 * commit 1f3ef78861ac.
-		 */
-		rbd_unregister_watch(rbd_dev);
-		rbd_dev_image_release(rbd_dev);
-		goto out;
+	if (rc)
+		goto err_out_image_probe;
+
+	if (rbd_dev->opts->exclusive) {
+		rc = rbd_add_acquire_lock(rbd_dev);
+		if (rc)
+			goto err_out_device_setup;
 	}
 	}
 
 
+	/* Everything's ready.  Announce the disk to the world. */
+
+	rc = device_add(&rbd_dev->dev);
+	if (rc)
+		goto err_out_image_lock;
+
+	add_disk(rbd_dev->disk);
+	/* see rbd_init_disk() */
+	blk_put_queue(rbd_dev->disk->queue);
+
+	spin_lock(&rbd_dev_list_lock);
+	list_add_tail(&rbd_dev->node, &rbd_dev_list);
+	spin_unlock(&rbd_dev_list_lock);
+
+	pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
+		(unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
+		rbd_dev->header.features);
 	rc = count;
 	rc = count;
 out:
 out:
 	module_put(THIS_MODULE);
 	module_put(THIS_MODULE);
 	return rc;
 	return rc;
 
 
+err_out_image_lock:
+	rbd_dev_image_unlock(rbd_dev);
+err_out_device_setup:
+	rbd_dev_device_release(rbd_dev);
+err_out_image_probe:
+	rbd_dev_image_release(rbd_dev);
 err_out_rbd_dev:
 err_out_rbd_dev:
 	rbd_dev_destroy(rbd_dev);
 	rbd_dev_destroy(rbd_dev);
 err_out_client:
 err_out_client:
@@ -6169,21 +6259,6 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
 	return do_rbd_add(bus, buf, count);
 	return do_rbd_add(bus, buf, count);
 }
 }
 
 
-static void rbd_dev_device_release(struct rbd_device *rbd_dev)
-{
-	rbd_free_disk(rbd_dev);
-
-	spin_lock(&rbd_dev_list_lock);
-	list_del_init(&rbd_dev->node);
-	spin_unlock(&rbd_dev_list_lock);
-
-	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
-	device_del(&rbd_dev->dev);
-	rbd_dev_mapping_clear(rbd_dev);
-	if (!single_major)
-		unregister_blkdev(rbd_dev->major, rbd_dev->name);
-}
-
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
 {
 {
 	while (rbd_dev->parent) {
 	while (rbd_dev->parent) {
@@ -6201,6 +6276,7 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
 		}
 		}
 		rbd_assert(second);
 		rbd_assert(second);
 		rbd_dev_image_release(second);
 		rbd_dev_image_release(second);
+		rbd_dev_destroy(second);
 		first->parent = NULL;
 		first->parent = NULL;
 		first->parent_overlap = 0;
 		first->parent_overlap = 0;
 
 
@@ -6269,21 +6345,16 @@ static ssize_t do_rbd_remove(struct bus_type *bus,
 		blk_set_queue_dying(rbd_dev->disk->queue);
 		blk_set_queue_dying(rbd_dev->disk->queue);
 	}
 	}
 
 
-	down_write(&rbd_dev->lock_rwsem);
-	if (__rbd_is_lock_owner(rbd_dev))
-		rbd_unlock(rbd_dev);
-	up_write(&rbd_dev->lock_rwsem);
-	rbd_unregister_watch(rbd_dev);
+	del_gendisk(rbd_dev->disk);
+	spin_lock(&rbd_dev_list_lock);
+	list_del_init(&rbd_dev->node);
+	spin_unlock(&rbd_dev_list_lock);
+	device_del(&rbd_dev->dev);
 
 
-	/*
-	 * Don't free anything from rbd_dev->disk until after all
-	 * notifies are completely processed. Otherwise
-	 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
-	 * in a potential use after free of rbd_dev->disk or rbd_dev.
-	 */
+	rbd_dev_image_unlock(rbd_dev);
 	rbd_dev_device_release(rbd_dev);
 	rbd_dev_device_release(rbd_dev);
 	rbd_dev_image_release(rbd_dev);
 	rbd_dev_image_release(rbd_dev);
-
+	rbd_dev_destroy(rbd_dev);
 	return count;
 	return count;
 }
 }
 
 

+ 6 - 4
fs/ceph/addr.c

@@ -670,8 +670,12 @@ static void writepages_finish(struct ceph_osd_request *req)
 	bool remove_page;
 	bool remove_page;
 
 
 	dout("writepages_finish %p rc %d\n", inode, rc);
 	dout("writepages_finish %p rc %d\n", inode, rc);
-	if (rc < 0)
+	if (rc < 0) {
 		mapping_set_error(mapping, rc);
 		mapping_set_error(mapping, rc);
+		ceph_set_error_write(ci);
+	} else {
+		ceph_clear_error_write(ci);
+	}
 
 
 	/*
 	/*
 	 * We lost the cache cap, need to truncate the page before
 	 * We lost the cache cap, need to truncate the page before
@@ -703,9 +707,6 @@ static void writepages_finish(struct ceph_osd_request *req)
 				clear_bdi_congested(inode_to_bdi(inode),
 				clear_bdi_congested(inode_to_bdi(inode),
 						    BLK_RW_ASYNC);
 						    BLK_RW_ASYNC);
 
 
-			if (rc < 0)
-				SetPageError(page);
-
 			ceph_put_snap_context(page_snap_context(page));
 			ceph_put_snap_context(page_snap_context(page));
 			page->private = 0;
 			page->private = 0;
 			ClearPagePrivate(page);
 			ClearPagePrivate(page);
@@ -1892,6 +1893,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
 	err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
 	err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
 
 
 	wr_req->r_mtime = ci->vfs_inode.i_mtime;
 	wr_req->r_mtime = ci->vfs_inode.i_mtime;
+	wr_req->r_abort_on_full = true;
 	err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
 	err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
 
 
 	if (!err)
 	if (!err)

+ 18 - 7
fs/ceph/caps.c

@@ -1015,6 +1015,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
 	void *p;
 	void *p;
 	size_t extra_len;
 	size_t extra_len;
 	struct timespec zerotime = {0};
 	struct timespec zerotime = {0};
+	struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc;
 
 
 	dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
 	dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
 	     " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
 	     " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
@@ -1076,8 +1077,12 @@ static int send_cap_msg(struct cap_msg_args *arg)
 	ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
 	ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
 	/* inline data size */
 	/* inline data size */
 	ceph_encode_32(&p, 0);
 	ceph_encode_32(&p, 0);
-	/* osd_epoch_barrier (version 5) */
-	ceph_encode_32(&p, 0);
+	/*
+	 * osd_epoch_barrier (version 5)
+	 * The epoch_barrier is protected osdc->lock, so READ_ONCE here in
+	 * case it was recently changed
+	 */
+	ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier));
 	/* oldest_flush_tid (version 6) */
 	/* oldest_flush_tid (version 6) */
 	ceph_encode_64(&p, arg->oldest_flush_tid);
 	ceph_encode_64(&p, arg->oldest_flush_tid);
 
 
@@ -1389,7 +1394,7 @@ static void __ceph_flush_snaps(struct ceph_inode_info *ci,
 		first_tid = cf->tid + 1;
 		first_tid = cf->tid + 1;
 
 
 		capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
 		capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
-		atomic_inc(&capsnap->nref);
+		refcount_inc(&capsnap->nref);
 		spin_unlock(&ci->i_ceph_lock);
 		spin_unlock(&ci->i_ceph_lock);
 
 
 		dout("__flush_snaps %p capsnap %p tid %llu %s\n",
 		dout("__flush_snaps %p capsnap %p tid %llu %s\n",
@@ -2202,7 +2207,7 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
 			     inode, capsnap, cf->tid,
 			     inode, capsnap, cf->tid,
 			     ceph_cap_string(capsnap->dirty));
 			     ceph_cap_string(capsnap->dirty));
 
 
-			atomic_inc(&capsnap->nref);
+			refcount_inc(&capsnap->nref);
 			spin_unlock(&ci->i_ceph_lock);
 			spin_unlock(&ci->i_ceph_lock);
 
 
 			ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
 			ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
@@ -3633,13 +3638,19 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		p += inline_len;
 		p += inline_len;
 	}
 	}
 
 
+	if (le16_to_cpu(msg->hdr.version) >= 5) {
+		struct ceph_osd_client	*osdc = &mdsc->fsc->client->osdc;
+		u32			epoch_barrier;
+
+		ceph_decode_32_safe(&p, end, epoch_barrier, bad);
+		ceph_osdc_update_epoch_barrier(osdc, epoch_barrier);
+	}
+
 	if (le16_to_cpu(msg->hdr.version) >= 8) {
 	if (le16_to_cpu(msg->hdr.version) >= 8) {
 		u64 flush_tid;
 		u64 flush_tid;
 		u32 caller_uid, caller_gid;
 		u32 caller_uid, caller_gid;
-		u32 osd_epoch_barrier;
 		u32 pool_ns_len;
 		u32 pool_ns_len;
-		/* version >= 5 */
-		ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
+
 		/* version >= 6 */
 		/* version >= 6 */
 		ceph_decode_64_safe(&p, end, flush_tid, bad);
 		ceph_decode_64_safe(&p, end, flush_tid, bad);
 		/* version >= 7 */
 		/* version >= 7 */

+ 10 - 11
fs/ceph/debugfs.c

@@ -22,20 +22,19 @@ static int mdsmap_show(struct seq_file *s, void *p)
 {
 {
 	int i;
 	int i;
 	struct ceph_fs_client *fsc = s->private;
 	struct ceph_fs_client *fsc = s->private;
+	struct ceph_mdsmap *mdsmap;
 
 
 	if (fsc->mdsc == NULL || fsc->mdsc->mdsmap == NULL)
 	if (fsc->mdsc == NULL || fsc->mdsc->mdsmap == NULL)
 		return 0;
 		return 0;
-	seq_printf(s, "epoch %d\n", fsc->mdsc->mdsmap->m_epoch);
-	seq_printf(s, "root %d\n", fsc->mdsc->mdsmap->m_root);
-	seq_printf(s, "session_timeout %d\n",
-		       fsc->mdsc->mdsmap->m_session_timeout);
-	seq_printf(s, "session_autoclose %d\n",
-		       fsc->mdsc->mdsmap->m_session_autoclose);
-	for (i = 0; i < fsc->mdsc->mdsmap->m_max_mds; i++) {
-		struct ceph_entity_addr *addr =
-			&fsc->mdsc->mdsmap->m_info[i].addr;
-		int state = fsc->mdsc->mdsmap->m_info[i].state;
-
+	mdsmap = fsc->mdsc->mdsmap;
+	seq_printf(s, "epoch %d\n", mdsmap->m_epoch);
+	seq_printf(s, "root %d\n", mdsmap->m_root);
+	seq_printf(s, "max_mds %d\n", mdsmap->m_max_mds);
+	seq_printf(s, "session_timeout %d\n", mdsmap->m_session_timeout);
+	seq_printf(s, "session_autoclose %d\n", mdsmap->m_session_autoclose);
+	for (i = 0; i < mdsmap->m_num_mds; i++) {
+		struct ceph_entity_addr *addr = &mdsmap->m_info[i].addr;
+		int state = mdsmap->m_info[i].state;
 		seq_printf(s, "\tmds%d\t%s\t(%s)\n", i,
 		seq_printf(s, "\tmds%d\t%s\t(%s)\n", i,
 			       ceph_pr_addr(&addr->in_addr),
 			       ceph_pr_addr(&addr->in_addr),
 			       ceph_mds_state_name(state));
 			       ceph_mds_state_name(state));

+ 16 - 7
fs/ceph/dir.c

@@ -294,7 +294,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	int i;
 	int i;
 	int err;
 	int err;
-	u32 ftype;
+	unsigned frag = -1;
 	struct ceph_mds_reply_info_parsed *rinfo;
 	struct ceph_mds_reply_info_parsed *rinfo;
 
 
 	dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
 	dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
@@ -341,7 +341,6 @@ more:
 	/* do we have the correct frag content buffered? */
 	/* do we have the correct frag content buffered? */
 	if (need_send_readdir(fi, ctx->pos)) {
 	if (need_send_readdir(fi, ctx->pos)) {
 		struct ceph_mds_request *req;
 		struct ceph_mds_request *req;
-		unsigned frag;
 		int op = ceph_snap(inode) == CEPH_SNAPDIR ?
 		int op = ceph_snap(inode) == CEPH_SNAPDIR ?
 			CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
 			CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
 
 
@@ -352,8 +351,11 @@ more:
 		}
 		}
 
 
 		if (is_hash_order(ctx->pos)) {
 		if (is_hash_order(ctx->pos)) {
-			frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
-						NULL, NULL);
+			/* fragtree isn't always accurate. choose frag
+			 * based on previous reply when possible. */
+			if (frag == (unsigned)-1)
+				frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
+							NULL, NULL);
 		} else {
 		} else {
 			frag = fpos_frag(ctx->pos);
 			frag = fpos_frag(ctx->pos);
 		}
 		}
@@ -378,7 +380,11 @@ more:
 				ceph_mdsc_put_request(req);
 				ceph_mdsc_put_request(req);
 				return -ENOMEM;
 				return -ENOMEM;
 			}
 			}
+		} else if (is_hash_order(ctx->pos)) {
+			req->r_args.readdir.offset_hash =
+				cpu_to_le32(fpos_hash(ctx->pos));
 		}
 		}
+
 		req->r_dir_release_cnt = fi->dir_release_count;
 		req->r_dir_release_cnt = fi->dir_release_count;
 		req->r_dir_ordered_cnt = fi->dir_ordered_count;
 		req->r_dir_ordered_cnt = fi->dir_ordered_count;
 		req->r_readdir_cache_idx = fi->readdir_cache_idx;
 		req->r_readdir_cache_idx = fi->readdir_cache_idx;
@@ -476,6 +482,7 @@ more:
 		struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
 		struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
 		struct ceph_vino vino;
 		struct ceph_vino vino;
 		ino_t ino;
 		ino_t ino;
+		u32 ftype;
 
 
 		BUG_ON(rde->offset < ctx->pos);
 		BUG_ON(rde->offset < ctx->pos);
 
 
@@ -498,15 +505,17 @@ more:
 		ctx->pos++;
 		ctx->pos++;
 	}
 	}
 
 
+	ceph_mdsc_put_request(fi->last_readdir);
+	fi->last_readdir = NULL;
+
 	if (fi->next_offset > 2) {
 	if (fi->next_offset > 2) {
-		ceph_mdsc_put_request(fi->last_readdir);
-		fi->last_readdir = NULL;
+		frag = fi->frag;
 		goto more;
 		goto more;
 	}
 	}
 
 
 	/* more frags? */
 	/* more frags? */
 	if (!ceph_frag_is_rightmost(fi->frag)) {
 	if (!ceph_frag_is_rightmost(fi->frag)) {
-		unsigned frag = ceph_frag_next(fi->frag);
+		frag = ceph_frag_next(fi->frag);
 		if (is_hash_order(ctx->pos)) {
 		if (is_hash_order(ctx->pos)) {
 			loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
 			loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
 							fi->next_offset, true);
 							fi->next_offset, true);

+ 53 - 15
fs/ceph/file.c

@@ -13,6 +13,38 @@
 #include "mds_client.h"
 #include "mds_client.h"
 #include "cache.h"
 #include "cache.h"
 
 
+static __le32 ceph_flags_sys2wire(u32 flags)
+{
+	u32 wire_flags = 0;
+
+	switch (flags & O_ACCMODE) {
+	case O_RDONLY:
+		wire_flags |= CEPH_O_RDONLY;
+		break;
+	case O_WRONLY:
+		wire_flags |= CEPH_O_WRONLY;
+		break;
+	case O_RDWR:
+		wire_flags |= CEPH_O_RDWR;
+		break;
+	}
+
+#define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; }
+
+	ceph_sys2wire(O_CREAT);
+	ceph_sys2wire(O_EXCL);
+	ceph_sys2wire(O_TRUNC);
+	ceph_sys2wire(O_DIRECTORY);
+	ceph_sys2wire(O_NOFOLLOW);
+
+#undef ceph_sys2wire
+
+	if (flags)
+		dout("unused open flags: %x", flags);
+
+	return cpu_to_le32(wire_flags);
+}
+
 /*
 /*
  * Ceph file operations
  * Ceph file operations
  *
  *
@@ -120,7 +152,7 @@ prepare_open_request(struct super_block *sb, int flags, int create_mode)
 	if (IS_ERR(req))
 	if (IS_ERR(req))
 		goto out;
 		goto out;
 	req->r_fmode = ceph_flags_to_mode(flags);
 	req->r_fmode = ceph_flags_to_mode(flags);
-	req->r_args.open.flags = cpu_to_le32(flags);
+	req->r_args.open.flags = ceph_flags_sys2wire(flags);
 	req->r_args.open.mode = cpu_to_le32(create_mode);
 	req->r_args.open.mode = cpu_to_le32(create_mode);
 out:
 out:
 	return req;
 	return req;
@@ -189,7 +221,7 @@ int ceph_renew_caps(struct inode *inode)
 	spin_lock(&ci->i_ceph_lock);
 	spin_lock(&ci->i_ceph_lock);
 	wanted = __ceph_caps_file_wanted(ci);
 	wanted = __ceph_caps_file_wanted(ci);
 	if (__ceph_is_any_real_caps(ci) &&
 	if (__ceph_is_any_real_caps(ci) &&
-	    (!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) {
+	    (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) {
 		int issued = __ceph_caps_issued(ci, NULL);
 		int issued = __ceph_caps_issued(ci, NULL);
 		spin_unlock(&ci->i_ceph_lock);
 		spin_unlock(&ci->i_ceph_lock);
 		dout("renew caps %p want %s issued %s updating mds_wanted\n",
 		dout("renew caps %p want %s issued %s updating mds_wanted\n",
@@ -778,6 +810,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
 	req->r_callback = ceph_aio_complete_req;
 	req->r_callback = ceph_aio_complete_req;
 	req->r_inode = inode;
 	req->r_inode = inode;
 	req->r_priv = aio_req;
 	req->r_priv = aio_req;
+	req->r_abort_on_full = true;
 
 
 	ret = ceph_osdc_start_request(req->r_osdc, req, false);
 	ret = ceph_osdc_start_request(req->r_osdc, req, false);
 out:
 out:
@@ -1085,19 +1118,22 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 
 
 out:
 out:
 		ceph_osdc_put_request(req);
 		ceph_osdc_put_request(req);
-		if (ret == 0) {
-			pos += len;
-			written += len;
-
-			if (pos > i_size_read(inode)) {
-				check_caps = ceph_inode_set_size(inode, pos);
-				if (check_caps)
-					ceph_check_caps(ceph_inode(inode),
-							CHECK_CAPS_AUTHONLY,
-							NULL);
-			}
-		} else
+		if (ret != 0) {
+			ceph_set_error_write(ci);
 			break;
 			break;
+		}
+
+		ceph_clear_error_write(ci);
+		pos += len;
+		written += len;
+		if (pos > i_size_read(inode)) {
+			check_caps = ceph_inode_set_size(inode, pos);
+			if (check_caps)
+				ceph_check_caps(ceph_inode(inode),
+						CHECK_CAPS_AUTHONLY,
+						NULL);
+		}
+
 	}
 	}
 
 
 	if (ret != -EOLDSNAPC && written > 0) {
 	if (ret != -EOLDSNAPC && written > 0) {
@@ -1303,6 +1339,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	}
 	}
 
 
 retry_snap:
 retry_snap:
+	/* FIXME: not complete since it doesn't account for being at quota */
 	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) {
 	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) {
 		err = -ENOSPC;
 		err = -ENOSPC;
 		goto out;
 		goto out;
@@ -1324,7 +1361,8 @@ retry_snap:
 	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
 	     inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
 
 
 	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
 	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
-	    (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
+	    (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC) ||
+	    (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
 		struct ceph_snap_context *snapc;
 		struct ceph_snap_context *snapc;
 		struct iov_iter data;
 		struct iov_iter data;
 		inode_unlock(inode);
 		inode_unlock(inode);

+ 12 - 5
fs/ceph/inode.c

@@ -1482,10 +1482,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
 	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
 		return readdir_prepopulate_inodes_only(req, session);
 		return readdir_prepopulate_inodes_only(req, session);
 
 
-	if (rinfo->hash_order && req->r_path2) {
-		last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
-					  req->r_path2, strlen(req->r_path2));
-		last_hash = ceph_frag_value(last_hash);
+	if (rinfo->hash_order) {
+		if (req->r_path2) {
+			last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+						  req->r_path2,
+						  strlen(req->r_path2));
+			last_hash = ceph_frag_value(last_hash);
+		} else if (rinfo->offset_hash) {
+			/* mds understands offset_hash */
+			WARN_ON_ONCE(req->r_readdir_offset != 2);
+			last_hash = le32_to_cpu(rhead->args.readdir.offset_hash);
+		}
 	}
 	}
 
 
 	if (rinfo->dir_dir &&
 	if (rinfo->dir_dir &&
@@ -1510,7 +1517,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 	}
 	}
 
 
 	if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2 &&
 	if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2 &&
-	    !(rinfo->hash_order && req->r_path2)) {
+	    !(rinfo->hash_order && last_hash)) {
 		/* note dir version at start of readdir so we can tell
 		/* note dir version at start of readdir so we can tell
 		 * if any dentries get dropped */
 		 * if any dentries get dropped */
 		req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
 		req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);

+ 59 - 16
fs/ceph/mds_client.c

@@ -189,6 +189,7 @@ static int parse_reply_info_dir(void **p, void *end,
 		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
 		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
 		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
 		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
 		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
 		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
+		info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
 	}
 	}
 	if (num == 0)
 	if (num == 0)
 		goto done;
 		goto done;
@@ -378,9 +379,9 @@ const char *ceph_session_state_name(int s)
 
 
 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
 {
 {
-	if (atomic_inc_not_zero(&s->s_ref)) {
+	if (refcount_inc_not_zero(&s->s_ref)) {
 		dout("mdsc get_session %p %d -> %d\n", s,
 		dout("mdsc get_session %p %d -> %d\n", s,
-		     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
+		     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
 		return s;
 		return s;
 	} else {
 	} else {
 		dout("mdsc get_session %p 0 -- FAIL", s);
 		dout("mdsc get_session %p 0 -- FAIL", s);
@@ -391,8 +392,8 @@ static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
 void ceph_put_mds_session(struct ceph_mds_session *s)
 void ceph_put_mds_session(struct ceph_mds_session *s)
 {
 {
 	dout("mdsc put_session %p %d -> %d\n", s,
 	dout("mdsc put_session %p %d -> %d\n", s,
-	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
-	if (atomic_dec_and_test(&s->s_ref)) {
+	     refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
+	if (refcount_dec_and_test(&s->s_ref)) {
 		if (s->s_auth.authorizer)
 		if (s->s_auth.authorizer)
 			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
 			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
 		kfree(s);
 		kfree(s);
@@ -411,7 +412,7 @@ struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 		return NULL;
 		return NULL;
 	session = mdsc->sessions[mds];
 	session = mdsc->sessions[mds];
 	dout("lookup_mds_session %p %d\n", session,
 	dout("lookup_mds_session %p %d\n", session,
-	     atomic_read(&session->s_ref));
+	     refcount_read(&session->s_ref));
 	get_session(session);
 	get_session(session);
 	return session;
 	return session;
 }
 }
@@ -441,7 +442,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 {
 {
 	struct ceph_mds_session *s;
 	struct ceph_mds_session *s;
 
 
-	if (mds >= mdsc->mdsmap->m_max_mds)
+	if (mds >= mdsc->mdsmap->m_num_mds)
 		return ERR_PTR(-EINVAL);
 		return ERR_PTR(-EINVAL);
 
 
 	s = kzalloc(sizeof(*s), GFP_NOFS);
 	s = kzalloc(sizeof(*s), GFP_NOFS);
@@ -466,7 +467,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 	INIT_LIST_HEAD(&s->s_caps);
 	INIT_LIST_HEAD(&s->s_caps);
 	s->s_nr_caps = 0;
 	s->s_nr_caps = 0;
 	s->s_trim_caps = 0;
 	s->s_trim_caps = 0;
-	atomic_set(&s->s_ref, 1);
+	refcount_set(&s->s_ref, 1);
 	INIT_LIST_HEAD(&s->s_waiting);
 	INIT_LIST_HEAD(&s->s_waiting);
 	INIT_LIST_HEAD(&s->s_unsafe);
 	INIT_LIST_HEAD(&s->s_unsafe);
 	s->s_num_cap_releases = 0;
 	s->s_num_cap_releases = 0;
@@ -494,7 +495,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 	}
 	}
 	mdsc->sessions[mds] = s;
 	mdsc->sessions[mds] = s;
 	atomic_inc(&mdsc->num_sessions);
 	atomic_inc(&mdsc->num_sessions);
-	atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
+	refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 
 
 	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
 	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
@@ -1004,7 +1005,7 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
 	struct ceph_mds_session *ts;
 	struct ceph_mds_session *ts;
 	int i, mds = session->s_mds;
 	int i, mds = session->s_mds;
 
 
-	if (mds >= mdsc->mdsmap->m_max_mds)
+	if (mds >= mdsc->mdsmap->m_num_mds)
 		return;
 		return;
 
 
 	mi = &mdsc->mdsmap->m_info[mds];
 	mi = &mdsc->mdsmap->m_info[mds];
@@ -1551,9 +1552,15 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
 	struct ceph_msg *msg = NULL;
 	struct ceph_msg *msg = NULL;
 	struct ceph_mds_cap_release *head;
 	struct ceph_mds_cap_release *head;
 	struct ceph_mds_cap_item *item;
 	struct ceph_mds_cap_item *item;
+	struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
 	struct ceph_cap *cap;
 	struct ceph_cap *cap;
 	LIST_HEAD(tmp_list);
 	LIST_HEAD(tmp_list);
 	int num_cap_releases;
 	int num_cap_releases;
+	__le32	barrier, *cap_barrier;
+
+	down_read(&osdc->lock);
+	barrier = cpu_to_le32(osdc->epoch_barrier);
+	up_read(&osdc->lock);
 
 
 	spin_lock(&session->s_cap_lock);
 	spin_lock(&session->s_cap_lock);
 again:
 again:
@@ -1571,7 +1578,11 @@ again:
 			head = msg->front.iov_base;
 			head = msg->front.iov_base;
 			head->num = cpu_to_le32(0);
 			head->num = cpu_to_le32(0);
 			msg->front.iov_len = sizeof(*head);
 			msg->front.iov_len = sizeof(*head);
+
+			msg->hdr.version = cpu_to_le16(2);
+			msg->hdr.compat_version = cpu_to_le16(1);
 		}
 		}
+
 		cap = list_first_entry(&tmp_list, struct ceph_cap,
 		cap = list_first_entry(&tmp_list, struct ceph_cap,
 					session_caps);
 					session_caps);
 		list_del(&cap->session_caps);
 		list_del(&cap->session_caps);
@@ -1589,6 +1600,11 @@ again:
 		ceph_put_cap(mdsc, cap);
 		ceph_put_cap(mdsc, cap);
 
 
 		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
 		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+			// Append cap_barrier field
+			cap_barrier = msg->front.iov_base + msg->front.iov_len;
+			*cap_barrier = barrier;
+			msg->front.iov_len += sizeof(*cap_barrier);
+
 			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
 			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
 			ceph_con_send(&session->s_con, msg);
 			ceph_con_send(&session->s_con, msg);
@@ -1604,6 +1620,11 @@ again:
 	spin_unlock(&session->s_cap_lock);
 	spin_unlock(&session->s_cap_lock);
 
 
 	if (msg) {
 	if (msg) {
+		// Append cap_barrier field
+		cap_barrier = msg->front.iov_base + msg->front.iov_len;
+		*cap_barrier = barrier;
+		msg->front.iov_len += sizeof(*cap_barrier);
+
 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
 		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
 		ceph_con_send(&session->s_con, msg);
 		ceph_con_send(&session->s_con, msg);
@@ -1993,7 +2014,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
 
 
 	if (req->r_pagelist) {
 	if (req->r_pagelist) {
 		struct ceph_pagelist *pagelist = req->r_pagelist;
 		struct ceph_pagelist *pagelist = req->r_pagelist;
-		atomic_inc(&pagelist->refcnt);
+		refcount_inc(&pagelist->refcnt);
 		ceph_msg_data_add_pagelist(msg, pagelist);
 		ceph_msg_data_add_pagelist(msg, pagelist);
 		msg->hdr.data_len = cpu_to_le32(pagelist->length);
 		msg->hdr.data_len = cpu_to_le32(pagelist->length);
 	} else {
 	} else {
@@ -2640,8 +2661,10 @@ static void handle_session(struct ceph_mds_session *session,
 	seq = le64_to_cpu(h->seq);
 	seq = le64_to_cpu(h->seq);
 
 
 	mutex_lock(&mdsc->mutex);
 	mutex_lock(&mdsc->mutex);
-	if (op == CEPH_SESSION_CLOSE)
+	if (op == CEPH_SESSION_CLOSE) {
+		get_session(session);
 		__unregister_session(mdsc, session);
 		__unregister_session(mdsc, session);
+	}
 	/* FIXME: this ttl calculation is generous */
 	/* FIXME: this ttl calculation is generous */
 	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
 	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
 	mutex_unlock(&mdsc->mutex);
 	mutex_unlock(&mdsc->mutex);
@@ -2730,6 +2753,8 @@ static void handle_session(struct ceph_mds_session *session,
 			kick_requests(mdsc, mds);
 			kick_requests(mdsc, mds);
 		mutex_unlock(&mdsc->mutex);
 		mutex_unlock(&mdsc->mutex);
 	}
 	}
+	if (op == CEPH_SESSION_CLOSE)
+		ceph_put_mds_session(session);
 	return;
 	return;
 
 
 bad:
 bad:
@@ -3109,7 +3134,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
 	dout("check_new_map new %u old %u\n",
 	dout("check_new_map new %u old %u\n",
 	     newmap->m_epoch, oldmap->m_epoch);
 	     newmap->m_epoch, oldmap->m_epoch);
 
 
-	for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
+	for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
 		if (mdsc->sessions[i] == NULL)
 		if (mdsc->sessions[i] == NULL)
 			continue;
 			continue;
 		s = mdsc->sessions[i];
 		s = mdsc->sessions[i];
@@ -3123,15 +3148,33 @@ static void check_new_map(struct ceph_mds_client *mdsc,
 		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
 		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
 		     ceph_session_state_name(s->s_state));
 		     ceph_session_state_name(s->s_state));
 
 
-		if (i >= newmap->m_max_mds ||
+		if (i >= newmap->m_num_mds ||
 		    memcmp(ceph_mdsmap_get_addr(oldmap, i),
 		    memcmp(ceph_mdsmap_get_addr(oldmap, i),
 			   ceph_mdsmap_get_addr(newmap, i),
 			   ceph_mdsmap_get_addr(newmap, i),
 			   sizeof(struct ceph_entity_addr))) {
 			   sizeof(struct ceph_entity_addr))) {
 			if (s->s_state == CEPH_MDS_SESSION_OPENING) {
 			if (s->s_state == CEPH_MDS_SESSION_OPENING) {
 				/* the session never opened, just close it
 				/* the session never opened, just close it
 				 * out now */
 				 * out now */
+				get_session(s);
+				__unregister_session(mdsc, s);
 				__wake_requests(mdsc, &s->s_waiting);
 				__wake_requests(mdsc, &s->s_waiting);
+				ceph_put_mds_session(s);
+			} else if (i >= newmap->m_num_mds) {
+				/* force close session for stopped mds */
+				get_session(s);
 				__unregister_session(mdsc, s);
 				__unregister_session(mdsc, s);
+				__wake_requests(mdsc, &s->s_waiting);
+				kick_requests(mdsc, i);
+				mutex_unlock(&mdsc->mutex);
+
+				mutex_lock(&s->s_mutex);
+				cleanup_session_requests(mdsc, s);
+				remove_session_caps(s);
+				mutex_unlock(&s->s_mutex);
+
+				ceph_put_mds_session(s);
+
+				mutex_lock(&mdsc->mutex);
 			} else {
 			} else {
 				/* just close it */
 				/* just close it */
 				mutex_unlock(&mdsc->mutex);
 				mutex_unlock(&mdsc->mutex);
@@ -3169,7 +3212,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
 		}
 		}
 	}
 	}
 
 
-	for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
+	for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) {
 		s = mdsc->sessions[i];
 		s = mdsc->sessions[i];
 		if (!s)
 		if (!s)
 			continue;
 			continue;
@@ -3883,7 +3926,7 @@ static struct ceph_connection *con_get(struct ceph_connection *con)
 	struct ceph_mds_session *s = con->private;
 	struct ceph_mds_session *s = con->private;
 
 
 	if (get_session(s)) {
 	if (get_session(s)) {
-		dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
+		dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref));
 		return con;
 		return con;
 	}
 	}
 	dout("mdsc con_get %p FAIL\n", s);
 	dout("mdsc con_get %p FAIL\n", s);
@@ -3894,7 +3937,7 @@ static void con_put(struct ceph_connection *con)
 {
 {
 	struct ceph_mds_session *s = con->private;
 	struct ceph_mds_session *s = con->private;
 
 
-	dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
+	dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1);
 	ceph_put_mds_session(s);
 	ceph_put_mds_session(s);
 }
 }
 
 

+ 10 - 5
fs/ceph/mds_client.h

@@ -7,6 +7,7 @@
 #include <linux/mutex.h>
 #include <linux/mutex.h>
 #include <linux/rbtree.h>
 #include <linux/rbtree.h>
 #include <linux/spinlock.h>
 #include <linux/spinlock.h>
+#include <linux/refcount.h>
 
 
 #include <linux/ceph/types.h>
 #include <linux/ceph/types.h>
 #include <linux/ceph/messenger.h>
 #include <linux/ceph/messenger.h>
@@ -82,9 +83,10 @@ struct ceph_mds_reply_info_parsed {
 			struct ceph_mds_reply_dirfrag *dir_dir;
 			struct ceph_mds_reply_dirfrag *dir_dir;
 			size_t			      dir_buf_size;
 			size_t			      dir_buf_size;
 			int                           dir_nr;
 			int                           dir_nr;
-			bool			      dir_complete;
 			bool			      dir_end;
 			bool			      dir_end;
+			bool			      dir_complete;
 			bool			      hash_order;
 			bool			      hash_order;
+			bool			      offset_hash;
 			struct ceph_mds_reply_dir_entry  *dir_entries;
 			struct ceph_mds_reply_dir_entry  *dir_entries;
 		};
 		};
 
 
@@ -104,10 +106,13 @@ struct ceph_mds_reply_info_parsed {
 
 
 /*
 /*
  * cap releases are batched and sent to the MDS en masse.
  * cap releases are batched and sent to the MDS en masse.
+ *
+ * Account for per-message overhead of mds_cap_release header
+ * and __le32 for osd epoch barrier trailing field.
  */
  */
-#define CEPH_CAPS_PER_RELEASE ((PAGE_SIZE -			\
+#define CEPH_CAPS_PER_RELEASE ((PAGE_SIZE - sizeof(u32) -		\
 				sizeof(struct ceph_mds_cap_release)) /	\
 				sizeof(struct ceph_mds_cap_release)) /	\
-			       sizeof(struct ceph_mds_cap_item))
+			        sizeof(struct ceph_mds_cap_item))
 
 
 
 
 /*
 /*
@@ -156,7 +161,7 @@ struct ceph_mds_session {
 	unsigned long     s_renew_requested; /* last time we sent a renew req */
 	unsigned long     s_renew_requested; /* last time we sent a renew req */
 	u64               s_renew_seq;
 	u64               s_renew_seq;
 
 
-	atomic_t          s_ref;
+	refcount_t          s_ref;
 	struct list_head  s_waiting;  /* waiting requests */
 	struct list_head  s_waiting;  /* waiting requests */
 	struct list_head  s_unsafe;   /* unsafe requests */
 	struct list_head  s_unsafe;   /* unsafe requests */
 };
 };
@@ -373,7 +378,7 @@ __ceph_lookup_mds_session(struct ceph_mds_client *, int mds);
 static inline struct ceph_mds_session *
 static inline struct ceph_mds_session *
 ceph_get_mds_session(struct ceph_mds_session *s)
 ceph_get_mds_session(struct ceph_mds_session *s)
 {
 {
-	atomic_inc(&s->s_ref);
+	refcount_inc(&s->s_ref);
 	return s;
 	return s;
 }
 }
 
 

+ 37 - 7
fs/ceph/mdsmap.c

@@ -22,11 +22,11 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
 	int i;
 	int i;
 
 
 	/* special case for one mds */
 	/* special case for one mds */
-	if (1 == m->m_max_mds && m->m_info[0].state > 0)
+	if (1 == m->m_num_mds && m->m_info[0].state > 0)
 		return 0;
 		return 0;
 
 
 	/* count */
 	/* count */
-	for (i = 0; i < m->m_max_mds; i++)
+	for (i = 0; i < m->m_num_mds; i++)
 		if (m->m_info[i].state > 0)
 		if (m->m_info[i].state > 0)
 			n++;
 			n++;
 	if (n == 0)
 	if (n == 0)
@@ -135,8 +135,9 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 	m->m_session_autoclose = ceph_decode_32(p);
 	m->m_session_autoclose = ceph_decode_32(p);
 	m->m_max_file_size = ceph_decode_64(p);
 	m->m_max_file_size = ceph_decode_64(p);
 	m->m_max_mds = ceph_decode_32(p);
 	m->m_max_mds = ceph_decode_32(p);
+	m->m_num_mds = m->m_max_mds;
 
 
-	m->m_info = kcalloc(m->m_max_mds, sizeof(*m->m_info), GFP_NOFS);
+	m->m_info = kcalloc(m->m_num_mds, sizeof(*m->m_info), GFP_NOFS);
 	if (m->m_info == NULL)
 	if (m->m_info == NULL)
 		goto nomem;
 		goto nomem;
 
 
@@ -207,9 +208,20 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 		     ceph_pr_addr(&addr.in_addr),
 		     ceph_pr_addr(&addr.in_addr),
 		     ceph_mds_state_name(state));
 		     ceph_mds_state_name(state));
 
 
-		if (mds < 0 || mds >= m->m_max_mds || state <= 0)
+		if (mds < 0 || state <= 0)
 			continue;
 			continue;
 
 
+		if (mds >= m->m_num_mds) {
+			int new_num = max(mds + 1, m->m_num_mds * 2);
+			void *new_m_info = krealloc(m->m_info,
+						new_num * sizeof(*m->m_info),
+						GFP_NOFS | __GFP_ZERO);
+			if (!new_m_info)
+				goto nomem;
+			m->m_info = new_m_info;
+			m->m_num_mds = new_num;
+		}
+
 		info = &m->m_info[mds];
 		info = &m->m_info[mds];
 		info->global_id = global_id;
 		info->global_id = global_id;
 		info->state = state;
 		info->state = state;
@@ -229,6 +241,14 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 			info->export_targets = NULL;
 			info->export_targets = NULL;
 		}
 		}
 	}
 	}
+	if (m->m_num_mds > m->m_max_mds) {
+		/* find max up mds */
+		for (i = m->m_num_mds; i >= m->m_max_mds; i--) {
+			if (i == 0 || m->m_info[i-1].state > 0)
+				break;
+		}
+		m->m_num_mds = i;
+	}
 
 
 	/* pg_pools */
 	/* pg_pools */
 	ceph_decode_32_safe(p, end, n, bad);
 	ceph_decode_32_safe(p, end, n, bad);
@@ -270,12 +290,22 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 
 
 		for (i = 0; i < n; i++) {
 		for (i = 0; i < n; i++) {
 			s32 mds = ceph_decode_32(p);
 			s32 mds = ceph_decode_32(p);
-			if (mds >= 0 && mds < m->m_max_mds) {
+			if (mds >= 0 && mds < m->m_num_mds) {
 				if (m->m_info[mds].laggy)
 				if (m->m_info[mds].laggy)
 					num_laggy++;
 					num_laggy++;
 			}
 			}
 		}
 		}
 		m->m_num_laggy = num_laggy;
 		m->m_num_laggy = num_laggy;
+
+		if (n > m->m_num_mds) {
+			void *new_m_info = krealloc(m->m_info,
+						    n * sizeof(*m->m_info),
+						    GFP_NOFS | __GFP_ZERO);
+			if (!new_m_info)
+				goto nomem;
+			m->m_info = new_m_info;
+		}
+		m->m_num_mds = n;
 	}
 	}
 
 
 	/* inc */
 	/* inc */
@@ -341,7 +371,7 @@ void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
 {
 {
 	int i;
 	int i;
 
 
-	for (i = 0; i < m->m_max_mds; i++)
+	for (i = 0; i < m->m_num_mds; i++)
 		kfree(m->m_info[i].export_targets);
 		kfree(m->m_info[i].export_targets);
 	kfree(m->m_info);
 	kfree(m->m_info);
 	kfree(m->m_data_pg_pools);
 	kfree(m->m_data_pg_pools);
@@ -357,7 +387,7 @@ bool ceph_mdsmap_is_cluster_available(struct ceph_mdsmap *m)
 		return false;
 		return false;
 	if (m->m_num_laggy > 0)
 	if (m->m_num_laggy > 0)
 		return false;
 		return false;
-	for (i = 0; i < m->m_max_mds; i++) {
+	for (i = 0; i < m->m_num_mds; i++) {
 		if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE)
 		if (m->m_info[i].state == CEPH_MDS_STATE_ACTIVE)
 			nr_active++;
 			nr_active++;
 	}
 	}

+ 1 - 1
fs/ceph/snap.c

@@ -519,7 +519,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 	     capsnap->need_flush ? "" : "no_flush");
 	     capsnap->need_flush ? "" : "no_flush");
 	ihold(inode);
 	ihold(inode);
 
 
-	atomic_set(&capsnap->nref, 1);
+	refcount_set(&capsnap->nref, 1);
 	INIT_LIST_HEAD(&capsnap->ci_item);
 	INIT_LIST_HEAD(&capsnap->ci_item);
 
 
 	capsnap->follows = old_snapc->seq;
 	capsnap->follows = old_snapc->seq;

+ 1 - 6
fs/ceph/super.c

@@ -544,10 +544,6 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 					struct ceph_options *opt)
 					struct ceph_options *opt)
 {
 {
 	struct ceph_fs_client *fsc;
 	struct ceph_fs_client *fsc;
-	const u64 supported_features =
-		CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH |
-		CEPH_FEATURE_MDSENC | CEPH_FEATURE_MDS_INLINE_DATA;
-	const u64 required_features = 0;
 	int page_count;
 	int page_count;
 	size_t size;
 	size_t size;
 	int err = -ENOMEM;
 	int err = -ENOMEM;
@@ -556,8 +552,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 	if (!fsc)
 	if (!fsc)
 		return ERR_PTR(-ENOMEM);
 		return ERR_PTR(-ENOMEM);
 
 
-	fsc->client = ceph_create_client(opt, fsc, supported_features,
-					 required_features);
+	fsc->client = ceph_create_client(opt, fsc);
 	if (IS_ERR(fsc->client)) {
 	if (IS_ERR(fsc->client)) {
 		err = PTR_ERR(fsc->client);
 		err = PTR_ERR(fsc->client);
 		goto fail;
 		goto fail;

+ 29 - 2
fs/ceph/super.h

@@ -14,6 +14,7 @@
 #include <linux/writeback.h>
 #include <linux/writeback.h>
 #include <linux/slab.h>
 #include <linux/slab.h>
 #include <linux/posix_acl.h>
 #include <linux/posix_acl.h>
+#include <linux/refcount.h>
 
 
 #include <linux/ceph/libceph.h>
 #include <linux/ceph/libceph.h>
 
 
@@ -160,7 +161,7 @@ struct ceph_cap_flush {
  * data before flushing the snapped state (tracked here) back to the MDS.
  * data before flushing the snapped state (tracked here) back to the MDS.
  */
  */
 struct ceph_cap_snap {
 struct ceph_cap_snap {
-	atomic_t nref;
+	refcount_t nref;
 	struct list_head ci_item;
 	struct list_head ci_item;
 
 
 	struct ceph_cap_flush cap_flush;
 	struct ceph_cap_flush cap_flush;
@@ -189,7 +190,7 @@ struct ceph_cap_snap {
 
 
 static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
 static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
 {
 {
-	if (atomic_dec_and_test(&capsnap->nref)) {
+	if (refcount_dec_and_test(&capsnap->nref)) {
 		if (capsnap->xattr_blob)
 		if (capsnap->xattr_blob)
 			ceph_buffer_put(capsnap->xattr_blob);
 			ceph_buffer_put(capsnap->xattr_blob);
 		kfree(capsnap);
 		kfree(capsnap);
@@ -471,6 +472,32 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 #define CEPH_I_CAP_DROPPED	(1 << 8)  /* caps were forcibly dropped */
 #define CEPH_I_CAP_DROPPED	(1 << 8)  /* caps were forcibly dropped */
 #define CEPH_I_KICK_FLUSH	(1 << 9)  /* kick flushing caps */
 #define CEPH_I_KICK_FLUSH	(1 << 9)  /* kick flushing caps */
 #define CEPH_I_FLUSH_SNAPS	(1 << 10) /* need flush snapss */
 #define CEPH_I_FLUSH_SNAPS	(1 << 10) /* need flush snapss */
+#define CEPH_I_ERROR_WRITE	(1 << 11) /* have seen write errors */
+
+/*
+ * We set the ERROR_WRITE bit when we start seeing write errors on an inode
+ * and then clear it when they start succeeding. Note that we do a lockless
+ * check first, and only take the lock if it looks like it needs to be changed.
+ * The write submission code just takes this as a hint, so we're not too
+ * worried if a few slip through in either direction.
+ */
+static inline void ceph_set_error_write(struct ceph_inode_info *ci)
+{
+	if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ERROR_WRITE)) {
+		spin_lock(&ci->i_ceph_lock);
+		ci->i_ceph_flags |= CEPH_I_ERROR_WRITE;
+		spin_unlock(&ci->i_ceph_lock);
+	}
+}
+
+static inline void ceph_clear_error_write(struct ceph_inode_info *ci)
+{
+	if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ERROR_WRITE) {
+		spin_lock(&ci->i_ceph_lock);
+		ci->i_ceph_flags &= ~CEPH_I_ERROR_WRITE;
+		spin_unlock(&ci->i_ceph_lock);
+	}
+}
 
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
 					   long long release_count,
 					   long long release_count,

+ 3 - 0
fs/ceph/xattr.c

@@ -392,6 +392,7 @@ static int __set_xattr(struct ceph_inode_info *ci,
 
 
 	if (update_xattr) {
 	if (update_xattr) {
 		int err = 0;
 		int err = 0;
+
 		if (xattr && (flags & XATTR_CREATE))
 		if (xattr && (flags & XATTR_CREATE))
 			err = -EEXIST;
 			err = -EEXIST;
 		else if (!xattr && (flags & XATTR_REPLACE))
 		else if (!xattr && (flags & XATTR_REPLACE))
@@ -399,12 +400,14 @@ static int __set_xattr(struct ceph_inode_info *ci,
 		if (err) {
 		if (err) {
 			kfree(name);
 			kfree(name);
 			kfree(val);
 			kfree(val);
+			kfree(*newxattr);
 			return err;
 			return err;
 		}
 		}
 		if (update_xattr < 0) {
 		if (update_xattr < 0) {
 			if (xattr)
 			if (xattr)
 				__remove_xattr(ci, xattr);
 				__remove_xattr(ci, xattr);
 			kfree(name);
 			kfree(name);
+			kfree(*newxattr);
 			return 0;
 			return 0;
 		}
 		}
 	}
 	}

+ 4 - 0
include/linux/ceph/ceph_features.h

@@ -105,8 +105,10 @@ static inline u64 ceph_sanitize_features(u64 features)
  */
  */
 #define CEPH_FEATURES_SUPPORTED_DEFAULT		\
 #define CEPH_FEATURES_SUPPORTED_DEFAULT		\
 	(CEPH_FEATURE_NOSRCADDR |		\
 	(CEPH_FEATURE_NOSRCADDR |		\
+	 CEPH_FEATURE_FLOCK |			\
 	 CEPH_FEATURE_SUBSCRIBE2 |		\
 	 CEPH_FEATURE_SUBSCRIBE2 |		\
 	 CEPH_FEATURE_RECONNECT_SEQ |		\
 	 CEPH_FEATURE_RECONNECT_SEQ |		\
+	 CEPH_FEATURE_DIRLAYOUTHASH |		\
 	 CEPH_FEATURE_PGID64 |			\
 	 CEPH_FEATURE_PGID64 |			\
 	 CEPH_FEATURE_PGPOOL3 |			\
 	 CEPH_FEATURE_PGPOOL3 |			\
 	 CEPH_FEATURE_OSDENC |			\
 	 CEPH_FEATURE_OSDENC |			\
@@ -114,11 +116,13 @@ static inline u64 ceph_sanitize_features(u64 features)
 	 CEPH_FEATURE_MSG_AUTH |		\
 	 CEPH_FEATURE_MSG_AUTH |		\
 	 CEPH_FEATURE_CRUSH_TUNABLES2 |		\
 	 CEPH_FEATURE_CRUSH_TUNABLES2 |		\
 	 CEPH_FEATURE_REPLY_CREATE_INODE |	\
 	 CEPH_FEATURE_REPLY_CREATE_INODE |	\
+	 CEPH_FEATURE_MDSENC |			\
 	 CEPH_FEATURE_OSDHASHPSPOOL |		\
 	 CEPH_FEATURE_OSDHASHPSPOOL |		\
 	 CEPH_FEATURE_OSD_CACHEPOOL |		\
 	 CEPH_FEATURE_OSD_CACHEPOOL |		\
 	 CEPH_FEATURE_CRUSH_V2 |		\
 	 CEPH_FEATURE_CRUSH_V2 |		\
 	 CEPH_FEATURE_EXPORT_PEER |		\
 	 CEPH_FEATURE_EXPORT_PEER |		\
 	 CEPH_FEATURE_OSDMAP_ENC |		\
 	 CEPH_FEATURE_OSDMAP_ENC |		\
+	 CEPH_FEATURE_MDS_INLINE_DATA |		\
 	 CEPH_FEATURE_CRUSH_TUNABLES3 |		\
 	 CEPH_FEATURE_CRUSH_TUNABLES3 |		\
 	 CEPH_FEATURE_OSD_PRIMARY_AFFINITY |	\
 	 CEPH_FEATURE_OSD_PRIMARY_AFFINITY |	\
 	 CEPH_FEATURE_MSGR_KEEPALIVE2 |		\
 	 CEPH_FEATURE_MSGR_KEEPALIVE2 |		\

+ 14 - 0
include/linux/ceph/ceph_fs.h

@@ -365,6 +365,19 @@ extern const char *ceph_mds_op_name(int op);
 #define CEPH_READDIR_FRAG_END		(1<<0)
 #define CEPH_READDIR_FRAG_END		(1<<0)
 #define CEPH_READDIR_FRAG_COMPLETE	(1<<8)
 #define CEPH_READDIR_FRAG_COMPLETE	(1<<8)
 #define CEPH_READDIR_HASH_ORDER		(1<<9)
 #define CEPH_READDIR_HASH_ORDER		(1<<9)
+#define CEPH_READDIR_OFFSET_HASH	(1<<10)
+
+/*
+ * open request flags
+ */
+#define CEPH_O_RDONLY		00000000
+#define CEPH_O_WRONLY		00000001
+#define CEPH_O_RDWR		00000002
+#define CEPH_O_CREAT		00000100
+#define CEPH_O_EXCL		00000200
+#define CEPH_O_TRUNC		00001000
+#define CEPH_O_DIRECTORY	00200000
+#define CEPH_O_NOFOLLOW		00400000
 
 
 union ceph_mds_request_args {
 union ceph_mds_request_args {
 	struct {
 	struct {
@@ -384,6 +397,7 @@ union ceph_mds_request_args {
 		__le32 max_entries;          /* how many dentries to grab */
 		__le32 max_entries;          /* how many dentries to grab */
 		__le32 max_bytes;
 		__le32 max_bytes;
 		__le16 flags;
 		__le16 flags;
+		__le32 offset_hash;
 	} __attribute__ ((packed)) readdir;
 	} __attribute__ ((packed)) readdir;
 	struct {
 	struct {
 		__le32 mode;
 		__le32 mode;

+ 5 - 0
include/linux/ceph/cls_lock_client.h

@@ -37,6 +37,11 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
 			struct ceph_object_locator *oloc,
 			struct ceph_object_locator *oloc,
 			char *lock_name, char *cookie,
 			char *lock_name, char *cookie,
 			struct ceph_entity_name *locker);
 			struct ceph_entity_name *locker);
+int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
+			struct ceph_object_id *oid,
+			struct ceph_object_locator *oloc,
+			char *lock_name, u8 type, char *old_cookie,
+			char *tag, char *new_cookie);
 
 
 void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);
 void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);
 
 

+ 3 - 5
include/linux/ceph/libceph.h

@@ -14,6 +14,7 @@
 #include <linux/wait.h>
 #include <linux/wait.h>
 #include <linux/writeback.h>
 #include <linux/writeback.h>
 #include <linux/slab.h>
 #include <linux/slab.h>
+#include <linux/refcount.h>
 
 
 #include <linux/ceph/types.h>
 #include <linux/ceph/types.h>
 #include <linux/ceph/messenger.h>
 #include <linux/ceph/messenger.h>
@@ -161,7 +162,7 @@ struct ceph_client {
  * dirtied.
  * dirtied.
  */
  */
 struct ceph_snap_context {
 struct ceph_snap_context {
-	atomic_t nref;
+	refcount_t nref;
 	u64 seq;
 	u64 seq;
 	u32 num_snaps;
 	u32 num_snaps;
 	u64 snaps[];
 	u64 snaps[];
@@ -262,10 +263,7 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client);
 extern void ceph_destroy_options(struct ceph_options *opt);
 extern void ceph_destroy_options(struct ceph_options *opt);
 extern int ceph_compare_options(struct ceph_options *new_opt,
 extern int ceph_compare_options(struct ceph_options *new_opt,
 				struct ceph_client *client);
 				struct ceph_client *client);
-extern struct ceph_client *ceph_create_client(struct ceph_options *opt,
-					      void *private,
-					      u64 supported_features,
-					      u64 required_features);
+struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private);
 struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client);
 struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client);
 u64 ceph_client_gid(struct ceph_client *client);
 u64 ceph_client_gid(struct ceph_client *client);
 extern void ceph_destroy_client(struct ceph_client *client);
 extern void ceph_destroy_client(struct ceph_client *client);

+ 4 - 3
include/linux/ceph/mdsmap.h

@@ -25,6 +25,7 @@ struct ceph_mdsmap {
 	u32 m_session_autoclose;        /* seconds */
 	u32 m_session_autoclose;        /* seconds */
 	u64 m_max_file_size;
 	u64 m_max_file_size;
 	u32 m_max_mds;                  /* size of m_addr, m_state arrays */
 	u32 m_max_mds;                  /* size of m_addr, m_state arrays */
+	int m_num_mds;
 	struct ceph_mds_info *m_info;
 	struct ceph_mds_info *m_info;
 
 
 	/* which object pools file data can be stored in */
 	/* which object pools file data can be stored in */
@@ -40,7 +41,7 @@ struct ceph_mdsmap {
 static inline struct ceph_entity_addr *
 static inline struct ceph_entity_addr *
 ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w)
 ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w)
 {
 {
-	if (w >= m->m_max_mds)
+	if (w >= m->m_num_mds)
 		return NULL;
 		return NULL;
 	return &m->m_info[w].addr;
 	return &m->m_info[w].addr;
 }
 }
@@ -48,14 +49,14 @@ ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w)
 static inline int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w)
 static inline int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w)
 {
 {
 	BUG_ON(w < 0);
 	BUG_ON(w < 0);
-	if (w >= m->m_max_mds)
+	if (w >= m->m_num_mds)
 		return CEPH_MDS_STATE_DNE;
 		return CEPH_MDS_STATE_DNE;
 	return m->m_info[w].state;
 	return m->m_info[w].state;
 }
 }
 
 
 static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w)
 static inline bool ceph_mdsmap_is_laggy(struct ceph_mdsmap *m, int w)
 {
 {
-	if (w >= 0 && w < m->m_max_mds)
+	if (w >= 0 && w < m->m_num_mds)
 		return m->m_info[w].laggy;
 		return m->m_info[w].laggy;
 	return false;
 	return false;
 }
 }

+ 5 - 2
include/linux/ceph/osd_client.h

@@ -5,6 +5,7 @@
 #include <linux/kref.h>
 #include <linux/kref.h>
 #include <linux/mempool.h>
 #include <linux/mempool.h>
 #include <linux/rbtree.h>
 #include <linux/rbtree.h>
+#include <linux/refcount.h>
 
 
 #include <linux/ceph/types.h>
 #include <linux/ceph/types.h>
 #include <linux/ceph/osdmap.h>
 #include <linux/ceph/osdmap.h>
@@ -27,7 +28,7 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
 
 
 /* a given osd we're communicating with */
 /* a given osd we're communicating with */
 struct ceph_osd {
 struct ceph_osd {
-	atomic_t o_ref;
+	refcount_t o_ref;
 	struct ceph_osd_client *o_osdc;
 	struct ceph_osd_client *o_osdc;
 	int o_osd;
 	int o_osd;
 	int o_incarnation;
 	int o_incarnation;
@@ -186,12 +187,12 @@ struct ceph_osd_request {
 	struct timespec r_mtime;              /* ditto */
 	struct timespec r_mtime;              /* ditto */
 	u64 r_data_offset;                    /* ditto */
 	u64 r_data_offset;                    /* ditto */
 	bool r_linger;                        /* don't resend on failure */
 	bool r_linger;                        /* don't resend on failure */
+	bool r_abort_on_full;		      /* return ENOSPC when full */
 
 
 	/* internal */
 	/* internal */
 	unsigned long r_stamp;                /* jiffies, send or check time */
 	unsigned long r_stamp;                /* jiffies, send or check time */
 	unsigned long r_start_stamp;          /* jiffies */
 	unsigned long r_start_stamp;          /* jiffies */
 	int r_attempts;
 	int r_attempts;
-	struct ceph_eversion r_replay_version; /* aka reassert_version */
 	u32 r_last_force_resend;
 	u32 r_last_force_resend;
 	u32 r_map_dne_bound;
 	u32 r_map_dne_bound;
 
 
@@ -266,6 +267,7 @@ struct ceph_osd_client {
 	struct rb_root         osds;          /* osds */
 	struct rb_root         osds;          /* osds */
 	struct list_head       osd_lru;       /* idle osds */
 	struct list_head       osd_lru;       /* idle osds */
 	spinlock_t             osd_lru_lock;
 	spinlock_t             osd_lru_lock;
+	u32		       epoch_barrier;
 	struct ceph_osd        homeless_osd;
 	struct ceph_osd        homeless_osd;
 	atomic64_t             last_tid;      /* tid of last request */
 	atomic64_t             last_tid;      /* tid of last request */
 	u64                    last_linger_id;
 	u64                    last_linger_id;
@@ -304,6 +306,7 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
 				   struct ceph_msg *msg);
 				   struct ceph_msg *msg);
 extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
 extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
 				 struct ceph_msg *msg);
 				 struct ceph_msg *msg);
+void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb);
 
 
 extern void osd_req_op_init(struct ceph_osd_request *osd_req,
 extern void osd_req_op_init(struct ceph_osd_request *osd_req,
 			    unsigned int which, u16 opcode, u32 flags);
 			    unsigned int which, u16 opcode, u32 flags);

+ 3 - 3
include/linux/ceph/pagelist.h

@@ -2,7 +2,7 @@
 #define __FS_CEPH_PAGELIST_H
 #define __FS_CEPH_PAGELIST_H
 
 
 #include <asm/byteorder.h>
 #include <asm/byteorder.h>
-#include <linux/atomic.h>
+#include <linux/refcount.h>
 #include <linux/list.h>
 #include <linux/list.h>
 #include <linux/types.h>
 #include <linux/types.h>
 
 
@@ -13,7 +13,7 @@ struct ceph_pagelist {
 	size_t room;
 	size_t room;
 	struct list_head free_list;
 	struct list_head free_list;
 	size_t num_pages_free;
 	size_t num_pages_free;
-	atomic_t refcnt;
+	refcount_t refcnt;
 };
 };
 
 
 struct ceph_pagelist_cursor {
 struct ceph_pagelist_cursor {
@@ -30,7 +30,7 @@ static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
 	pl->room = 0;
 	pl->room = 0;
 	INIT_LIST_HEAD(&pl->free_list);
 	INIT_LIST_HEAD(&pl->free_list);
 	pl->num_pages_free = 0;
 	pl->num_pages_free = 0;
-	atomic_set(&pl->refcnt, 1);
+	refcount_set(&pl->refcnt, 1);
 }
 }
 
 
 extern void ceph_pagelist_release(struct ceph_pagelist *pl);
 extern void ceph_pagelist_release(struct ceph_pagelist *pl);

+ 17 - 10
net/ceph/ceph_common.c

@@ -45,6 +45,17 @@ bool libceph_compatible(void *data)
 }
 }
 EXPORT_SYMBOL(libceph_compatible);
 EXPORT_SYMBOL(libceph_compatible);
 
 
+static int param_get_supported_features(char *buffer,
+					const struct kernel_param *kp)
+{
+	return sprintf(buffer, "0x%llx", CEPH_FEATURES_SUPPORTED_DEFAULT);
+}
+static const struct kernel_param_ops param_ops_supported_features = {
+	.get = param_get_supported_features,
+};
+module_param_cb(supported_features, &param_ops_supported_features, NULL,
+		S_IRUGO);
+
 /*
 /*
  * find filename portion of a path (/foo/bar/baz -> baz)
  * find filename portion of a path (/foo/bar/baz -> baz)
  */
  */
@@ -596,9 +607,7 @@ EXPORT_SYMBOL(ceph_client_gid);
 /*
 /*
  * create a fresh client instance
  * create a fresh client instance
  */
  */
-struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
-				       u64 supported_features,
-				       u64 required_features)
+struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
 {
 {
 	struct ceph_client *client;
 	struct ceph_client *client;
 	struct ceph_entity_addr *myaddr = NULL;
 	struct ceph_entity_addr *myaddr = NULL;
@@ -615,14 +624,12 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
 	init_waitqueue_head(&client->auth_wq);
 	init_waitqueue_head(&client->auth_wq);
 	client->auth_err = 0;
 	client->auth_err = 0;
 
 
-	if (!ceph_test_opt(client, NOMSGAUTH))
-		required_features |= CEPH_FEATURE_MSG_AUTH;
-
 	client->extra_mon_dispatch = NULL;
 	client->extra_mon_dispatch = NULL;
-	client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
-		supported_features;
-	client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT |
-		required_features;
+	client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT;
+	client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT;
+
+	if (!ceph_test_opt(client, NOMSGAUTH))
+		client->required_features |= CEPH_FEATURE_MSG_AUTH;
 
 
 	/* msgr */
 	/* msgr */
 	if (ceph_test_opt(client, MYIP))
 	if (ceph_test_opt(client, MYIP))

+ 51 - 0
net/ceph/cls_lock_client.c

@@ -179,6 +179,57 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
 }
 }
 EXPORT_SYMBOL(ceph_cls_break_lock);
 EXPORT_SYMBOL(ceph_cls_break_lock);
 
 
+int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
+			struct ceph_object_id *oid,
+			struct ceph_object_locator *oloc,
+			char *lock_name, u8 type, char *old_cookie,
+			char *tag, char *new_cookie)
+{
+	int cookie_op_buf_size;
+	int name_len = strlen(lock_name);
+	int old_cookie_len = strlen(old_cookie);
+	int tag_len = strlen(tag);
+	int new_cookie_len = strlen(new_cookie);
+	void *p, *end;
+	struct page *cookie_op_page;
+	int ret;
+
+	cookie_op_buf_size = name_len + sizeof(__le32) +
+			     old_cookie_len + sizeof(__le32) +
+			     tag_len + sizeof(__le32) +
+			     new_cookie_len + sizeof(__le32) +
+			     sizeof(u8) + CEPH_ENCODING_START_BLK_LEN;
+	if (cookie_op_buf_size > PAGE_SIZE)
+		return -E2BIG;
+
+	cookie_op_page = alloc_page(GFP_NOIO);
+	if (!cookie_op_page)
+		return -ENOMEM;
+
+	p = page_address(cookie_op_page);
+	end = p + cookie_op_buf_size;
+
+	/* encode cls_lock_set_cookie_op struct */
+	ceph_start_encoding(&p, 1, 1,
+			    cookie_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_string(&p, end, lock_name, name_len);
+	ceph_encode_8(&p, type);
+	ceph_encode_string(&p, end, old_cookie, old_cookie_len);
+	ceph_encode_string(&p, end, tag, tag_len);
+	ceph_encode_string(&p, end, new_cookie, new_cookie_len);
+
+	dout("%s lock_name %s type %d old_cookie %s tag %s new_cookie %s\n",
+	     __func__, lock_name, type, old_cookie, tag, new_cookie);
+	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "set_cookie",
+			     CEPH_OSD_FLAG_WRITE, cookie_op_page,
+			     cookie_op_buf_size, NULL, NULL);
+
+	dout("%s: status %d\n", __func__, ret);
+	__free_page(cookie_op_page);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_cls_set_cookie);
+
 void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
 void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
 {
 {
 	int i;
 	int i;

+ 3 - 4
net/ceph/debugfs.c

@@ -62,7 +62,8 @@ static int osdmap_show(struct seq_file *s, void *p)
 		return 0;
 		return 0;
 
 
 	down_read(&osdc->lock);
 	down_read(&osdc->lock);
-	seq_printf(s, "epoch %d flags 0x%x\n", map->epoch, map->flags);
+	seq_printf(s, "epoch %u barrier %u flags 0x%x\n", map->epoch,
+			osdc->epoch_barrier, map->flags);
 
 
 	for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) {
 	for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) {
 		struct ceph_pg_pool_info *pi =
 		struct ceph_pg_pool_info *pi =
@@ -177,9 +178,7 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req)
 	seq_printf(s, "%llu\t", req->r_tid);
 	seq_printf(s, "%llu\t", req->r_tid);
 	dump_target(s, &req->r_t);
 	dump_target(s, &req->r_t);
 
 
-	seq_printf(s, "\t%d\t%u'%llu", req->r_attempts,
-		   le32_to_cpu(req->r_replay_version.epoch),
-		   le64_to_cpu(req->r_replay_version.version));
+	seq_printf(s, "\t%d", req->r_attempts);
 
 
 	for (i = 0; i < req->r_num_ops; i++) {
 	for (i = 0; i < req->r_num_ops; i++) {
 		struct ceph_osd_req_op *op = &req->r_ops[i];
 		struct ceph_osd_req_op *op = &req->r_ops[i];

+ 123 - 16
net/ceph/osd_client.c

@@ -961,6 +961,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 				       truncate_size, truncate_seq);
 				       truncate_size, truncate_seq);
 	}
 	}
 
 
+	req->r_abort_on_full = true;
 	req->r_flags = flags;
 	req->r_flags = flags;
 	req->r_base_oloc.pool = layout->pool_id;
 	req->r_base_oloc.pool = layout->pool_id;
 	req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
 	req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
@@ -1005,7 +1006,7 @@ static bool osd_registered(struct ceph_osd *osd)
  */
  */
 static void osd_init(struct ceph_osd *osd)
 static void osd_init(struct ceph_osd *osd)
 {
 {
-	atomic_set(&osd->o_ref, 1);
+	refcount_set(&osd->o_ref, 1);
 	RB_CLEAR_NODE(&osd->o_node);
 	RB_CLEAR_NODE(&osd->o_node);
 	osd->o_requests = RB_ROOT;
 	osd->o_requests = RB_ROOT;
 	osd->o_linger_requests = RB_ROOT;
 	osd->o_linger_requests = RB_ROOT;
@@ -1050,9 +1051,9 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
 
 
 static struct ceph_osd *get_osd(struct ceph_osd *osd)
 static struct ceph_osd *get_osd(struct ceph_osd *osd)
 {
 {
-	if (atomic_inc_not_zero(&osd->o_ref)) {
-		dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
-		     atomic_read(&osd->o_ref));
+	if (refcount_inc_not_zero(&osd->o_ref)) {
+		dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
+		     refcount_read(&osd->o_ref));
 		return osd;
 		return osd;
 	} else {
 	} else {
 		dout("get_osd %p FAIL\n", osd);
 		dout("get_osd %p FAIL\n", osd);
@@ -1062,9 +1063,9 @@ static struct ceph_osd *get_osd(struct ceph_osd *osd)
 
 
 static void put_osd(struct ceph_osd *osd)
 static void put_osd(struct ceph_osd *osd)
 {
 {
-	dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
-	     atomic_read(&osd->o_ref) - 1);
-	if (atomic_dec_and_test(&osd->o_ref)) {
+	dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
+	     refcount_read(&osd->o_ref) - 1);
+	if (refcount_dec_and_test(&osd->o_ref)) {
 		osd_cleanup(osd);
 		osd_cleanup(osd);
 		kfree(osd);
 		kfree(osd);
 	}
 	}
@@ -1297,8 +1298,9 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
 		       __pool_full(pi);
 		       __pool_full(pi);
 
 
 	WARN_ON(pi->id != t->base_oloc.pool);
 	WARN_ON(pi->id != t->base_oloc.pool);
-	return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
-	       (t->flags & CEPH_OSD_FLAG_WRITE && pausewr);
+	return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
+	       ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
+	       (osdc->osdmap->epoch < osdc->epoch_barrier);
 }
 }
 
 
 enum calc_target_result {
 enum calc_target_result {
@@ -1503,9 +1505,10 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
 	ceph_encode_32(&p, req->r_flags);
 	ceph_encode_32(&p, req->r_flags);
 	ceph_encode_timespec(p, &req->r_mtime);
 	ceph_encode_timespec(p, &req->r_mtime);
 	p += sizeof(struct ceph_timespec);
 	p += sizeof(struct ceph_timespec);
-	/* aka reassert_version */
-	memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version));
-	p += sizeof(req->r_replay_version);
+
+	/* reassert_version */
+	memset(p, 0, sizeof(struct ceph_eversion));
+	p += sizeof(struct ceph_eversion);
 
 
 	/* oloc */
 	/* oloc */
 	ceph_start_encoding(&p, 5, 4,
 	ceph_start_encoding(&p, 5, 4,
@@ -1626,6 +1629,7 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
 		ceph_monc_renew_subs(&osdc->client->monc);
 		ceph_monc_renew_subs(&osdc->client->monc);
 }
 }
 
 
+static void complete_request(struct ceph_osd_request *req, int err);
 static void send_map_check(struct ceph_osd_request *req);
 static void send_map_check(struct ceph_osd_request *req);
 
 
 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
@@ -1635,6 +1639,7 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
 	enum calc_target_result ct_res;
 	enum calc_target_result ct_res;
 	bool need_send = false;
 	bool need_send = false;
 	bool promoted = false;
 	bool promoted = false;
+	bool need_abort = false;
 
 
 	WARN_ON(req->r_tid);
 	WARN_ON(req->r_tid);
 	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
 	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
@@ -1650,8 +1655,13 @@ again:
 		goto promote;
 		goto promote;
 	}
 	}
 
 
-	if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
-	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
+	if (osdc->osdmap->epoch < osdc->epoch_barrier) {
+		dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
+		     osdc->epoch_barrier);
+		req->r_t.paused = true;
+		maybe_request_map(osdc);
+	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
 		dout("req %p pausewr\n", req);
 		dout("req %p pausewr\n", req);
 		req->r_t.paused = true;
 		req->r_t.paused = true;
 		maybe_request_map(osdc);
 		maybe_request_map(osdc);
@@ -1669,6 +1679,8 @@ again:
 		pr_warn_ratelimited("FULL or reached pool quota\n");
 		pr_warn_ratelimited("FULL or reached pool quota\n");
 		req->r_t.paused = true;
 		req->r_t.paused = true;
 		maybe_request_map(osdc);
 		maybe_request_map(osdc);
+		if (req->r_abort_on_full)
+			need_abort = true;
 	} else if (!osd_homeless(osd)) {
 	} else if (!osd_homeless(osd)) {
 		need_send = true;
 		need_send = true;
 	} else {
 	} else {
@@ -1685,6 +1697,8 @@ again:
 	link_request(osd, req);
 	link_request(osd, req);
 	if (need_send)
 	if (need_send)
 		send_request(req);
 		send_request(req);
+	else if (need_abort)
+		complete_request(req, -ENOSPC);
 	mutex_unlock(&osd->lock);
 	mutex_unlock(&osd->lock);
 
 
 	if (ct_res == CALC_TARGET_POOL_DNE)
 	if (ct_res == CALC_TARGET_POOL_DNE)
@@ -1799,6 +1813,97 @@ static void abort_request(struct ceph_osd_request *req, int err)
 	complete_request(req, err);
 	complete_request(req, err);
 }
 }
 
 
+static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
+{
+	if (likely(eb > osdc->epoch_barrier)) {
+		dout("updating epoch_barrier from %u to %u\n",
+				osdc->epoch_barrier, eb);
+		osdc->epoch_barrier = eb;
+		/* Request map if we're not to the barrier yet */
+		if (eb > osdc->osdmap->epoch)
+			maybe_request_map(osdc);
+	}
+}
+
+void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
+{
+	down_read(&osdc->lock);
+	if (unlikely(eb > osdc->epoch_barrier)) {
+		up_read(&osdc->lock);
+		down_write(&osdc->lock);
+		update_epoch_barrier(osdc, eb);
+		up_write(&osdc->lock);
+	} else {
+		up_read(&osdc->lock);
+	}
+}
+EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
+
+/*
+ * Drop all pending requests that are stalled waiting on a full condition to
+ * clear, and complete them with ENOSPC as the return code. Set the
+ * osdc->epoch_barrier to the latest map epoch that we've seen if any were
+ * cancelled.
+ */
+static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
+{
+	struct rb_node *n;
+	bool victims = false;
+
+	dout("enter abort_on_full\n");
+
+	if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
+		goto out;
+
+	/* Scan list and see if there is anything to abort */
+	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+		struct rb_node *m;
+
+		m = rb_first(&osd->o_requests);
+		while (m) {
+			struct ceph_osd_request *req = rb_entry(m,
+					struct ceph_osd_request, r_node);
+			m = rb_next(m);
+
+			if (req->r_abort_on_full) {
+				victims = true;
+				break;
+			}
+		}
+		if (victims)
+			break;
+	}
+
+	if (!victims)
+		goto out;
+
+	/*
+	 * Update the barrier to current epoch if it's behind that point,
+	 * since we know we have some calls to be aborted in the tree.
+	 */
+	update_epoch_barrier(osdc, osdc->osdmap->epoch);
+
+	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+		struct rb_node *m;
+
+		m = rb_first(&osd->o_requests);
+		while (m) {
+			struct ceph_osd_request *req = rb_entry(m,
+					struct ceph_osd_request, r_node);
+			m = rb_next(m);
+
+			if (req->r_abort_on_full &&
+			    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
+			     pool_full(osdc, req->r_t.target_oloc.pool)))
+				abort_request(req, -ENOSPC);
+		}
+	}
+out:
+	dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
+}
+
 static void check_pool_dne(struct ceph_osd_request *req)
 static void check_pool_dne(struct ceph_osd_request *req)
 {
 {
 	struct ceph_osd_client *osdc = req->r_osdc;
 	struct ceph_osd_client *osdc = req->r_osdc;
@@ -3252,11 +3357,13 @@ done:
 	pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
 	pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
 		  ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
 		  ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
 		  have_pool_full(osdc);
 		  have_pool_full(osdc);
-	if (was_pauserd || was_pausewr || pauserd || pausewr)
+	if (was_pauserd || was_pausewr || pauserd || pausewr ||
+	    osdc->osdmap->epoch < osdc->epoch_barrier)
 		maybe_request_map(osdc);
 		maybe_request_map(osdc);
 
 
 	kick_requests(osdc, &need_resend, &need_resend_linger);
 	kick_requests(osdc, &need_resend, &need_resend_linger);
 
 
+	ceph_osdc_abort_on_full(osdc);
 	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
 	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
 			  osdc->osdmap->epoch);
 			  osdc->osdmap->epoch);
 	up_write(&osdc->lock);
 	up_write(&osdc->lock);
@@ -4126,7 +4233,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
 		close_osd(osd);
 		close_osd(osd);
 	}
 	}
 	up_write(&osdc->lock);
 	up_write(&osdc->lock);
-	WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
+	WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
 	osd_cleanup(&osdc->homeless_osd);
 	osd_cleanup(&osdc->homeless_osd);
 
 
 	WARN_ON(!list_empty(&osdc->osd_lru));
 	WARN_ON(!list_empty(&osdc->osd_lru));

+ 1 - 1
net/ceph/pagelist.c

@@ -16,7 +16,7 @@ static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl)
 
 
 void ceph_pagelist_release(struct ceph_pagelist *pl)
 void ceph_pagelist_release(struct ceph_pagelist *pl)
 {
 {
-	if (!atomic_dec_and_test(&pl->refcnt))
+	if (!refcount_dec_and_test(&pl->refcnt))
 		return;
 		return;
 	ceph_pagelist_unmap_tail(pl);
 	ceph_pagelist_unmap_tail(pl);
 	while (!list_empty(&pl->head)) {
 	while (!list_empty(&pl->head)) {

+ 3 - 3
net/ceph/snapshot.c

@@ -49,7 +49,7 @@ struct ceph_snap_context *ceph_create_snap_context(u32 snap_count,
 	if (!snapc)
 	if (!snapc)
 		return NULL;
 		return NULL;
 
 
-	atomic_set(&snapc->nref, 1);
+	refcount_set(&snapc->nref, 1);
 	snapc->num_snaps = snap_count;
 	snapc->num_snaps = snap_count;
 
 
 	return snapc;
 	return snapc;
@@ -59,7 +59,7 @@ EXPORT_SYMBOL(ceph_create_snap_context);
 struct ceph_snap_context *ceph_get_snap_context(struct ceph_snap_context *sc)
 struct ceph_snap_context *ceph_get_snap_context(struct ceph_snap_context *sc)
 {
 {
 	if (sc)
 	if (sc)
-		atomic_inc(&sc->nref);
+		refcount_inc(&sc->nref);
 	return sc;
 	return sc;
 }
 }
 EXPORT_SYMBOL(ceph_get_snap_context);
 EXPORT_SYMBOL(ceph_get_snap_context);
@@ -68,7 +68,7 @@ void ceph_put_snap_context(struct ceph_snap_context *sc)
 {
 {
 	if (!sc)
 	if (!sc)
 		return;
 		return;
-	if (atomic_dec_and_test(&sc->nref)) {
+	if (refcount_dec_and_test(&sc->nref)) {
 		/*printk(" deleting snap_context %p\n", sc);*/
 		/*printk(" deleting snap_context %p\n", sc);*/
 		kfree(sc);
 		kfree(sc);
 	}
 	}