Browse Source

ceph: make fsync() wait unsafe requests that created/modified inode

If we get a unsafe reply for request that created/modified inode,
add the unsafe request to a list in the newly created/modified
inode. So we can make fsync() wait these unsafe requests.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
Yan, Zheng 9 years ago
parent
commit
68cd5b4b76
5 changed files with 53 additions and 37 deletions
  1. 34 37
      fs/ceph/caps.c
  2. 1 0
      fs/ceph/inode.c
  3. 14 0
      fs/ceph/mds_client.c
  4. 3 0
      fs/ceph/mds_client.h
  5. 1 0
      fs/ceph/super.h

+ 34 - 37
fs/ceph/caps.c

@@ -1970,49 +1970,46 @@ out:
 }
 }
 
 
 /*
 /*
- * wait for any uncommitted directory operations to commit.
+ * wait for any unsafe requests to complete.
  */
  */
-static int unsafe_dirop_wait(struct inode *inode)
+static int unsafe_request_wait(struct inode *inode)
 {
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct list_head *head = &ci->i_unsafe_dirops;
-	struct ceph_mds_request *req;
-	u64 last_tid;
-	int ret = 0;
-
-	if (!S_ISDIR(inode->i_mode))
-		return 0;
+	struct ceph_mds_request *req1 = NULL, *req2 = NULL;
+	int ret, err = 0;
 
 
 	spin_lock(&ci->i_unsafe_lock);
 	spin_lock(&ci->i_unsafe_lock);
-	if (list_empty(head))
-		goto out;
-
-	req = list_last_entry(head, struct ceph_mds_request,
-			      r_unsafe_dir_item);
-	last_tid = req->r_tid;
-
-	do {
-		ceph_mdsc_get_request(req);
-		spin_unlock(&ci->i_unsafe_lock);
+	if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) {
+		req1 = list_last_entry(&ci->i_unsafe_dirops,
+					struct ceph_mds_request,
+					r_unsafe_dir_item);
+		ceph_mdsc_get_request(req1);
+	}
+	if (!list_empty(&ci->i_unsafe_iops)) {
+		req2 = list_last_entry(&ci->i_unsafe_iops,
+					struct ceph_mds_request,
+					r_unsafe_target_item);
+		ceph_mdsc_get_request(req2);
+	}
+	spin_unlock(&ci->i_unsafe_lock);
 
 
-		dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n",
-		     inode, req->r_tid, last_tid);
-		ret = !wait_for_completion_timeout(&req->r_safe_completion,
-					ceph_timeout_jiffies(req->r_timeout));
+	dout("unsafe_requeset_wait %p wait on tid %llu %llu\n",
+	     inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
+	if (req1) {
+		ret = !wait_for_completion_timeout(&req1->r_safe_completion,
+					ceph_timeout_jiffies(req1->r_timeout));
 		if (ret)
 		if (ret)
-			ret = -EIO;  /* timed out */
-
-		ceph_mdsc_put_request(req);
-
-		spin_lock(&ci->i_unsafe_lock);
-		if (ret || list_empty(head))
-			break;
-		req = list_first_entry(head, struct ceph_mds_request,
-				       r_unsafe_dir_item);
-	} while (req->r_tid < last_tid);
-out:
-	spin_unlock(&ci->i_unsafe_lock);
-	return ret;
+			err = -EIO;
+		ceph_mdsc_put_request(req1);
+	}
+	if (req2) {
+		ret = !wait_for_completion_timeout(&req2->r_safe_completion,
+					ceph_timeout_jiffies(req2->r_timeout));
+		if (ret)
+			err = -EIO;
+		ceph_mdsc_put_request(req2);
+	}
+	return err;
 }
 }
 
 
 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
@@ -2038,7 +2035,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 	dirty = try_flush_caps(inode, &flush_tid);
 	dirty = try_flush_caps(inode, &flush_tid);
 	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
 
-	ret = unsafe_dirop_wait(inode);
+	ret = unsafe_request_wait(inode);
 
 
 	/*
 	/*
 	 * only wait on non-file metadata writeback (the mds
 	 * only wait on non-file metadata writeback (the mds

+ 1 - 0
fs/ceph/inode.c

@@ -452,6 +452,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
 
 
 	INIT_LIST_HEAD(&ci->i_unsafe_writes);
 	INIT_LIST_HEAD(&ci->i_unsafe_writes);
 	INIT_LIST_HEAD(&ci->i_unsafe_dirops);
 	INIT_LIST_HEAD(&ci->i_unsafe_dirops);
+	INIT_LIST_HEAD(&ci->i_unsafe_iops);
 	spin_lock_init(&ci->i_unsafe_lock);
 	spin_lock_init(&ci->i_unsafe_lock);
 
 
 	ci->i_snap_realm = NULL;
 	ci->i_snap_realm = NULL;

+ 14 - 0
fs/ceph/mds_client.c

@@ -666,6 +666,12 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
 		list_del_init(&req->r_unsafe_dir_item);
 		list_del_init(&req->r_unsafe_dir_item);
 		spin_unlock(&ci->i_unsafe_lock);
 		spin_unlock(&ci->i_unsafe_lock);
 	}
 	}
+	if (req->r_target_inode && req->r_got_unsafe) {
+		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
+		spin_lock(&ci->i_unsafe_lock);
+		list_del_init(&req->r_unsafe_target_item);
+		spin_unlock(&ci->i_unsafe_lock);
+	}
 
 
 	if (req->r_unsafe_dir) {
 	if (req->r_unsafe_dir) {
 		iput(req->r_unsafe_dir);
 		iput(req->r_unsafe_dir);
@@ -1707,6 +1713,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
 	req->r_started = jiffies;
 	req->r_started = jiffies;
 	req->r_resend_mds = -1;
 	req->r_resend_mds = -1;
 	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
 	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
+	INIT_LIST_HEAD(&req->r_unsafe_target_item);
 	req->r_fmode = -1;
 	req->r_fmode = -1;
 	kref_init(&req->r_kref);
 	kref_init(&req->r_kref);
 	INIT_LIST_HEAD(&req->r_wait);
 	INIT_LIST_HEAD(&req->r_wait);
@@ -2529,6 +2536,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 	up_read(&mdsc->snap_rwsem);
 	up_read(&mdsc->snap_rwsem);
 	if (realm)
 	if (realm)
 		ceph_put_snap_realm(mdsc, realm);
 		ceph_put_snap_realm(mdsc, realm);
+
+	if (err == 0 && req->r_got_unsafe && req->r_target_inode) {
+		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
+		spin_lock(&ci->i_unsafe_lock);
+		list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops);
+		spin_unlock(&ci->i_unsafe_lock);
+	}
 out_err:
 out_err:
 	mutex_lock(&mdsc->mutex);
 	mutex_lock(&mdsc->mutex);
 	if (!req->r_aborted) {
 	if (!req->r_aborted) {

+ 3 - 0
fs/ceph/mds_client.h

@@ -236,6 +236,9 @@ struct ceph_mds_request {
 	struct inode	*r_unsafe_dir;
 	struct inode	*r_unsafe_dir;
 	struct list_head r_unsafe_dir_item;
 	struct list_head r_unsafe_dir_item;
 
 
+	/* unsafe requests that modify the target inode */
+	struct list_head r_unsafe_target_item;
+
 	struct ceph_mds_session *r_session;
 	struct ceph_mds_session *r_session;
 
 
 	int               r_attempts;   /* resend attempts */
 	int               r_attempts;   /* resend attempts */

+ 1 - 0
fs/ceph/super.h

@@ -342,6 +342,7 @@ struct ceph_inode_info {
 
 
 	struct list_head i_unsafe_writes; /* uncommitted sync writes */
 	struct list_head i_unsafe_writes; /* uncommitted sync writes */
 	struct list_head i_unsafe_dirops; /* uncommitted mds dir ops */
 	struct list_head i_unsafe_dirops; /* uncommitted mds dir ops */
+	struct list_head i_unsafe_iops;   /* uncommitted mds inode ops */
 	spinlock_t i_unsafe_lock;
 	spinlock_t i_unsafe_lock;
 
 
 	struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */
 	struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */