瀏覽代碼

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph fixes from Sage Weil:
 "There are a couple of fixes from Yan for bad pointer dereferences in
  the messenger code and when fiddling with page->private after page
  migration, a fix from Alex for a use-after-free in the osd client
  code, and a couple fixes for the message refcounting and shutdown
  ordering."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  libceph: flush msgr queue during mon_client shutdown
  rbd: Clear ceph_msg->bio_iter for retransmitted message
  libceph: use con get/put ops from osd_client
  libceph: osd_client: don't drop reply reference too early
  ceph: check PG_Private flag before accessing page->private
Linus Torvalds 13 年之前
父節點
當前提交
002b758b6d
共有 5 個文件被更改,包括 30 次插入22 次删除
  1. 12 9
      fs/ceph/addr.c
  2. 0 7
      net/ceph/ceph_common.c
  3. 4 0
      net/ceph/messenger.c
  4. 8 0
      net/ceph/mon_client.c
  5. 6 6
      net/ceph/osd_client.c

+ 12 - 9
fs/ceph/addr.c

@@ -54,7 +54,12 @@
 	(CONGESTION_ON_THRESH(congestion_kb) -				\
 	(CONGESTION_ON_THRESH(congestion_kb) -				\
 	 (CONGESTION_ON_THRESH(congestion_kb) >> 2))
 	 (CONGESTION_ON_THRESH(congestion_kb) >> 2))
 
 
-
+static inline struct ceph_snap_context *page_snap_context(struct page *page)
+{
+	if (PagePrivate(page))
+		return (void *)page->private;
+	return NULL;
+}
 
 
 /*
 /*
  * Dirty a page.  Optimistically adjust accounting, on the assumption
  * Dirty a page.  Optimistically adjust accounting, on the assumption
@@ -142,10 +147,9 @@ static void ceph_invalidatepage(struct page *page, unsigned long offset)
 {
 {
 	struct inode *inode;
 	struct inode *inode;
 	struct ceph_inode_info *ci;
 	struct ceph_inode_info *ci;
-	struct ceph_snap_context *snapc = (void *)page->private;
+	struct ceph_snap_context *snapc = page_snap_context(page);
 
 
 	BUG_ON(!PageLocked(page));
 	BUG_ON(!PageLocked(page));
-	BUG_ON(!page->private);
 	BUG_ON(!PagePrivate(page));
 	BUG_ON(!PagePrivate(page));
 	BUG_ON(!page->mapping);
 	BUG_ON(!page->mapping);
 
 
@@ -182,7 +186,6 @@ static int ceph_releasepage(struct page *page, gfp_t g)
 	struct inode *inode = page->mapping ? page->mapping->host : NULL;
 	struct inode *inode = page->mapping ? page->mapping->host : NULL;
 	dout("%p releasepage %p idx %lu\n", inode, page, page->index);
 	dout("%p releasepage %p idx %lu\n", inode, page, page->index);
 	WARN_ON(PageDirty(page));
 	WARN_ON(PageDirty(page));
-	WARN_ON(page->private);
 	WARN_ON(PagePrivate(page));
 	WARN_ON(PagePrivate(page));
 	return 0;
 	return 0;
 }
 }
@@ -443,7 +446,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	osdc = &fsc->client->osdc;
 	osdc = &fsc->client->osdc;
 
 
 	/* verify this is a writeable snap context */
 	/* verify this is a writeable snap context */
-	snapc = (void *)page->private;
+	snapc = page_snap_context(page);
 	if (snapc == NULL) {
 	if (snapc == NULL) {
 		dout("writepage %p page %p not dirty?\n", inode, page);
 		dout("writepage %p page %p not dirty?\n", inode, page);
 		goto out;
 		goto out;
@@ -451,7 +454,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	oldest = get_oldest_context(inode, &snap_size);
 	oldest = get_oldest_context(inode, &snap_size);
 	if (snapc->seq > oldest->seq) {
 	if (snapc->seq > oldest->seq) {
 		dout("writepage %p page %p snapc %p not writeable - noop\n",
 		dout("writepage %p page %p snapc %p not writeable - noop\n",
-		     inode, page, (void *)page->private);
+		     inode, page, snapc);
 		/* we should only noop if called by kswapd */
 		/* we should only noop if called by kswapd */
 		WARN_ON((current->flags & PF_MEMALLOC) == 0);
 		WARN_ON((current->flags & PF_MEMALLOC) == 0);
 		ceph_put_snap_context(oldest);
 		ceph_put_snap_context(oldest);
@@ -591,7 +594,7 @@ static void writepages_finish(struct ceph_osd_request *req,
 			clear_bdi_congested(&fsc->backing_dev_info,
 			clear_bdi_congested(&fsc->backing_dev_info,
 					    BLK_RW_ASYNC);
 					    BLK_RW_ASYNC);
 
 
-		ceph_put_snap_context((void *)page->private);
+		ceph_put_snap_context(page_snap_context(page));
 		page->private = 0;
 		page->private = 0;
 		ClearPagePrivate(page);
 		ClearPagePrivate(page);
 		dout("unlocking %d %p\n", i, page);
 		dout("unlocking %d %p\n", i, page);
@@ -795,7 +798,7 @@ get_more_pages:
 			}
 			}
 
 
 			/* only if matching snap context */
 			/* only if matching snap context */
-			pgsnapc = (void *)page->private;
+			pgsnapc = page_snap_context(page);
 			if (pgsnapc->seq > snapc->seq) {
 			if (pgsnapc->seq > snapc->seq) {
 				dout("page snapc %p %lld > oldest %p %lld\n",
 				dout("page snapc %p %lld > oldest %p %lld\n",
 				     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
 				     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
@@ -984,7 +987,7 @@ retry_locked:
 	BUG_ON(!ci->i_snap_realm);
 	BUG_ON(!ci->i_snap_realm);
 	down_read(&mdsc->snap_rwsem);
 	down_read(&mdsc->snap_rwsem);
 	BUG_ON(!ci->i_snap_realm->cached_context);
 	BUG_ON(!ci->i_snap_realm->cached_context);
-	snapc = (void *)page->private;
+	snapc = page_snap_context(page);
 	if (snapc && snapc != ci->i_head_snapc) {
 	if (snapc && snapc != ci->i_head_snapc) {
 		/*
 		/*
 		 * this page is already dirty in another (older) snap
 		 * this page is already dirty in another (older) snap

+ 0 - 7
net/ceph/ceph_common.c

@@ -504,13 +504,6 @@ void ceph_destroy_client(struct ceph_client *client)
 	/* unmount */
 	/* unmount */
 	ceph_osdc_stop(&client->osdc);
 	ceph_osdc_stop(&client->osdc);
 
 
-	/*
-	 * make sure osd connections close out before destroying the
-	 * auth module, which is needed to free those connections'
-	 * ceph_authorizers.
-	 */
-	ceph_msgr_flush();
-
 	ceph_monc_stop(&client->monc);
 	ceph_monc_stop(&client->monc);
 
 
 	ceph_debugfs_client_cleanup(client);
 	ceph_debugfs_client_cleanup(client);

+ 4 - 0
net/ceph/messenger.c

@@ -563,6 +563,10 @@ static void prepare_write_message(struct ceph_connection *con)
 		m->hdr.seq = cpu_to_le64(++con->out_seq);
 		m->hdr.seq = cpu_to_le64(++con->out_seq);
 		m->needs_out_seq = false;
 		m->needs_out_seq = false;
 	}
 	}
+#ifdef CONFIG_BLOCK
+	else
+		m->bio_iter = NULL;
+#endif
 
 
 	dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
 	dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
 	     m, con->out_seq, le16_to_cpu(m->hdr.type),
 	     m, con->out_seq, le16_to_cpu(m->hdr.type),

+ 8 - 0
net/ceph/mon_client.c

@@ -847,6 +847,14 @@ void ceph_monc_stop(struct ceph_mon_client *monc)
 
 
 	mutex_unlock(&monc->mutex);
 	mutex_unlock(&monc->mutex);
 
 
+	/*
+	 * flush msgr queue before we destroy ourselves to ensure that:
+	 *  - any work that references our embedded con is finished.
+	 *  - any osd_client or other work that may reference an authorizer
+	 *    finishes before we shut down the auth subsystem.
+	 */
+	ceph_msgr_flush();
+
 	ceph_auth_destroy(monc->auth);
 	ceph_auth_destroy(monc->auth);
 
 
 	ceph_msg_put(monc->m_auth);
 	ceph_msg_put(monc->m_auth);

+ 6 - 6
net/ceph/osd_client.c

@@ -139,15 +139,15 @@ void ceph_osdc_release_request(struct kref *kref)
 
 
 	if (req->r_request)
 	if (req->r_request)
 		ceph_msg_put(req->r_request);
 		ceph_msg_put(req->r_request);
-	if (req->r_reply)
-		ceph_msg_put(req->r_reply);
 	if (req->r_con_filling_msg) {
 	if (req->r_con_filling_msg) {
 		dout("release_request revoking pages %p from con %p\n",
 		dout("release_request revoking pages %p from con %p\n",
 		     req->r_pages, req->r_con_filling_msg);
 		     req->r_pages, req->r_con_filling_msg);
 		ceph_con_revoke_message(req->r_con_filling_msg,
 		ceph_con_revoke_message(req->r_con_filling_msg,
 				      req->r_reply);
 				      req->r_reply);
-		ceph_con_put(req->r_con_filling_msg);
+		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
 	}
 	}
+	if (req->r_reply)
+		ceph_msg_put(req->r_reply);
 	if (req->r_own_pages)
 	if (req->r_own_pages)
 		ceph_release_page_vector(req->r_pages,
 		ceph_release_page_vector(req->r_pages,
 					 req->r_num_pages);
 					 req->r_num_pages);
@@ -1216,7 +1216,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 	if (req->r_con_filling_msg == con && req->r_reply == msg) {
 	if (req->r_con_filling_msg == con && req->r_reply == msg) {
 		dout(" dropping con_filling_msg ref %p\n", con);
 		dout(" dropping con_filling_msg ref %p\n", con);
 		req->r_con_filling_msg = NULL;
 		req->r_con_filling_msg = NULL;
-		ceph_con_put(con);
+		con->ops->put(con);
 	}
 	}
 
 
 	if (!req->r_got_reply) {
 	if (!req->r_got_reply) {
@@ -2028,7 +2028,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 		dout("get_reply revoking msg %p from old con %p\n",
 		dout("get_reply revoking msg %p from old con %p\n",
 		     req->r_reply, req->r_con_filling_msg);
 		     req->r_reply, req->r_con_filling_msg);
 		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
 		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
-		ceph_con_put(req->r_con_filling_msg);
+		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
 		req->r_con_filling_msg = NULL;
 		req->r_con_filling_msg = NULL;
 	}
 	}
 
 
@@ -2063,7 +2063,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
 #endif
 #endif
 	}
 	}
 	*skip = 0;
 	*skip = 0;
-	req->r_con_filling_msg = ceph_con_get(con);
+	req->r_con_filling_msg = con->ops->get(con);
 	dout("get_reply tid %lld %p\n", tid, m);
 	dout("get_reply tid %lld %p\n", tid, m);
 
 
 out:
 out: