Pārlūkot izejas kodu

libceph: defer __complete_request() to a workqueue

In the common case, req->r_callback is called by handle_reply() on the
ceph-msgr worker thread without any locks.  If handle_reply() fails, it
is called with both osd->lock and osdc->lock.  In the map check case,
it is called with just osdc->lock but held for write.  Finally, if the
request is aborted because of -ENOSPC or by ceph_osdc_abort_requests(),
it is called directly on the submitter's thread, again with both locks.

req->r_callback on the submitter's thread is relatively new (introduced
in 4.12) and ripe for deadlocks -- e.g. writeback worker thread waiting
on itself:

  inode_wait_for_writeback+0x26/0x40
  evict+0xb5/0x1a0
  iput+0x1d2/0x220
  ceph_put_wrbuffer_cap_refs+0xe0/0x2c0 [ceph]
  writepages_finish+0x2d3/0x410 [ceph]
  __complete_request+0x26/0x60 [libceph]
  complete_request+0x2e/0x70 [libceph]
  __submit_request+0x256/0x330 [libceph]
  submit_request+0x2b/0x30 [libceph]
  ceph_osdc_start_request+0x25/0x40 [libceph]
  ceph_writepages_start+0xdfe/0x1320 [ceph]
  do_writepages+0x1f/0x70
  __writeback_single_inode+0x45/0x330
  writeback_sb_inodes+0x26a/0x600
  __writeback_inodes_wb+0x92/0xc0
  wb_writeback+0x274/0x330
  wb_workfn+0x2d5/0x3b0

Defer __complete_request() to a workqueue in all failure cases so it's
never on the same thread as ceph_osdc_start_request() and always called
with no locks held.

Link: http://tracker.ceph.com/issues/23978
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Acked-by: Jeff Layton <jlayton@redhat.com>
Reviewed-by: "Yan, Zheng" <zyan@redhat.com>
Ilya Dryomov 7 gadi atpakaļ
vecāks
revīzija
88bc1922c2
2 mainītis faili ar 20 papildinājumiem un 1 dzēšanām
  1. 2 0
      include/linux/ceph/osd_client.h
  2. 18 1
      net/ceph/osd_client.c

+ 2 - 0
include/linux/ceph/osd_client.h

@@ -170,6 +170,7 @@ struct ceph_osd_request {
 	u64             r_tid;              /* unique for this client */
 	u64             r_tid;              /* unique for this client */
 	struct rb_node  r_node;
 	struct rb_node  r_node;
 	struct rb_node  r_mc_node;          /* map check */
 	struct rb_node  r_mc_node;          /* map check */
+	struct work_struct r_complete_work;
 	struct ceph_osd *r_osd;
 	struct ceph_osd *r_osd;
 
 
 	struct ceph_osd_request_target r_t;
 	struct ceph_osd_request_target r_t;
@@ -360,6 +361,7 @@ struct ceph_osd_client {
 	struct ceph_msgpool	msgpool_op_reply;
 	struct ceph_msgpool	msgpool_op_reply;
 
 
 	struct workqueue_struct	*notify_wq;
 	struct workqueue_struct	*notify_wq;
+	struct workqueue_struct	*completion_wq;
 };
 };
 
 
 static inline bool ceph_osdmap_flag(struct ceph_osd_client *osdc, int flag)
 static inline bool ceph_osdmap_flag(struct ceph_osd_client *osdc, int flag)

+ 18 - 1
net/ceph/osd_client.c

@@ -2329,6 +2329,14 @@ static void __complete_request(struct ceph_osd_request *req)
 	ceph_osdc_put_request(req);
 	ceph_osdc_put_request(req);
 }
 }
 
 
+static void complete_request_workfn(struct work_struct *work)
+{
+	struct ceph_osd_request *req =
+	    container_of(work, struct ceph_osd_request, r_complete_work);
+
+	__complete_request(req);
+}
+
 /*
 /*
  * This is open-coded in handle_reply().
  * This is open-coded in handle_reply().
  */
  */
@@ -2338,7 +2346,9 @@ static void complete_request(struct ceph_osd_request *req, int err)
 
 
 	req->r_result = err;
 	req->r_result = err;
 	finish_request(req);
 	finish_request(req);
-	__complete_request(req);
+
+	INIT_WORK(&req->r_complete_work, complete_request_workfn);
+	queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
 }
 }
 
 
 static void cancel_map_check(struct ceph_osd_request *req)
 static void cancel_map_check(struct ceph_osd_request *req)
@@ -5058,6 +5068,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 	if (!osdc->notify_wq)
 	if (!osdc->notify_wq)
 		goto out_msgpool_reply;
 		goto out_msgpool_reply;
 
 
+	osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
+	if (!osdc->completion_wq)
+		goto out_notify_wq;
+
 	schedule_delayed_work(&osdc->timeout_work,
 	schedule_delayed_work(&osdc->timeout_work,
 			      osdc->client->options->osd_keepalive_timeout);
 			      osdc->client->options->osd_keepalive_timeout);
 	schedule_delayed_work(&osdc->osds_timeout_work,
 	schedule_delayed_work(&osdc->osds_timeout_work,
@@ -5065,6 +5079,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 
 
 	return 0;
 	return 0;
 
 
+out_notify_wq:
+	destroy_workqueue(osdc->notify_wq);
 out_msgpool_reply:
 out_msgpool_reply:
 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 out_msgpool:
 out_msgpool:
@@ -5079,6 +5095,7 @@ out:
 
 
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
 {
+	destroy_workqueue(osdc->completion_wq);
 	destroy_workqueue(osdc->notify_wq);
 	destroy_workqueue(osdc->notify_wq);
 	cancel_delayed_work_sync(&osdc->timeout_work);
 	cancel_delayed_work_sync(&osdc->timeout_work);
 	cancel_delayed_work_sync(&osdc->osds_timeout_work);
 	cancel_delayed_work_sync(&osdc->osds_timeout_work);