瀏覽代碼

Merge branch 'work.aio' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs

Pull vfs aio updates from Al Viro:
 "Christoph's aio poll, saner this time around.

  This time it's pretty much local to fs/aio.c. Hopefully race-free..."

* 'work.aio' of git://git.kernel.org/pub/scm/linux/kernel/git/viro/vfs:
  aio: allow direct aio poll comletions for keyed wakeups
  aio: implement IOCB_CMD_POLL
  aio: add a iocb refcount
  timerfd: add support for keyed wakeups
Linus Torvalds 7 年之前
父節點
當前提交
f2be269897
共有 3 個文件被更改,包括 209 次插入11 次删除
  1. 204 4
      fs/aio.c
  2. 3 3
      fs/timerfd.c
  3. 2 4
      include/uapi/linux/aio_abi.h

+ 204 - 4
fs/aio.c

@@ -5,6 +5,7 @@
  *	Implements an efficient asynchronous io interface.
  *
  *	Copyright 2000, 2001, 2002 Red Hat, Inc.  All Rights Reserved.
+ *	Copyright 2018 Christoph Hellwig.
  *
  *	See ../COPYING for licensing terms.
  */
@@ -18,6 +19,7 @@
 #include <linux/export.h>
 #include <linux/syscalls.h>
 #include <linux/backing-dev.h>
+#include <linux/refcount.h>
 #include <linux/uio.h>
 
 #include <linux/sched/signal.h>
@@ -164,10 +166,21 @@ struct fsync_iocb {
 	bool			datasync;
 };
 
+struct poll_iocb {
+	struct file		*file;
+	struct wait_queue_head	*head;
+	__poll_t		events;
+	bool			woken;
+	bool			cancelled;
+	struct wait_queue_entry	wait;
+	struct work_struct	work;
+};
+
 struct aio_kiocb {
 	union {
 		struct kiocb		rw;
 		struct fsync_iocb	fsync;
+		struct poll_iocb	poll;
 	};
 
 	struct kioctx		*ki_ctx;
@@ -178,6 +191,7 @@ struct aio_kiocb {
 
 	struct list_head	ki_list;	/* the aio core uses this
 						 * for cancellation */
+	refcount_t		ki_refcnt;
 
 	/*
 	 * If the aio_resfd field of the userspace iocb is not zero,
@@ -999,6 +1013,7 @@ static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx)
 
 	percpu_ref_get(&ctx->reqs);
 	INIT_LIST_HEAD(&req->ki_list);
+	refcount_set(&req->ki_refcnt, 0);
 	req->ki_ctx = ctx;
 	return req;
 out_put:
@@ -1033,6 +1048,15 @@ out:
 	return ret;
 }
 
+static inline void iocb_put(struct aio_kiocb *iocb)
+{
+	if (refcount_read(&iocb->ki_refcnt) == 0 ||
+	    refcount_dec_and_test(&iocb->ki_refcnt)) {
+		percpu_ref_put(&iocb->ki_ctx->reqs);
+		kmem_cache_free(kiocb_cachep, iocb);
+	}
+}
+
 /* aio_complete
  *	Called when the io request on the given iocb is complete.
  */
@@ -1102,8 +1126,6 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
 		eventfd_ctx_put(iocb->ki_eventfd);
 	}
 
-	kmem_cache_free(kiocb_cachep, iocb);
-
 	/*
 	 * We have to order our ring_info tail store above and test
 	 * of the wait list below outside the wait lock.  This is
@@ -1114,8 +1136,7 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
 
 	if (waitqueue_active(&ctx->wait))
 		wake_up(&ctx->wait);
-
-	percpu_ref_put(&ctx->reqs);
+	iocb_put(iocb);
 }
 
 /* aio_read_events_ring
@@ -1576,6 +1597,182 @@ static int aio_fsync(struct fsync_iocb *req, struct iocb *iocb, bool datasync)
 	return 0;
 }
 
+static inline void aio_poll_complete(struct aio_kiocb *iocb, __poll_t mask)
+{
+	struct file *file = iocb->poll.file;
+
+	aio_complete(iocb, mangle_poll(mask), 0);
+	fput(file);
+}
+
+static void aio_poll_complete_work(struct work_struct *work)
+{
+	struct poll_iocb *req = container_of(work, struct poll_iocb, work);
+	struct aio_kiocb *iocb = container_of(req, struct aio_kiocb, poll);
+	struct poll_table_struct pt = { ._key = req->events };
+	struct kioctx *ctx = iocb->ki_ctx;
+	__poll_t mask = 0;
+
+	if (!READ_ONCE(req->cancelled))
+		mask = vfs_poll(req->file, &pt) & req->events;
+
+	/*
+	 * Note that ->ki_cancel callers also delete iocb from active_reqs after
+	 * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
+	 * synchronize with them.  In the cancellation case the list_del_init
+	 * itself is not actually needed, but harmless so we keep it in to
+	 * avoid further branches in the fast path.
+	 */
+	spin_lock_irq(&ctx->ctx_lock);
+	if (!mask && !READ_ONCE(req->cancelled)) {
+		add_wait_queue(req->head, &req->wait);
+		spin_unlock_irq(&ctx->ctx_lock);
+		return;
+	}
+	list_del_init(&iocb->ki_list);
+	spin_unlock_irq(&ctx->ctx_lock);
+
+	aio_poll_complete(iocb, mask);
+}
+
+/* assumes we are called with irqs disabled */
+static int aio_poll_cancel(struct kiocb *iocb)
+{
+	struct aio_kiocb *aiocb = container_of(iocb, struct aio_kiocb, rw);
+	struct poll_iocb *req = &aiocb->poll;
+
+	spin_lock(&req->head->lock);
+	WRITE_ONCE(req->cancelled, true);
+	if (!list_empty(&req->wait.entry)) {
+		list_del_init(&req->wait.entry);
+		schedule_work(&aiocb->poll.work);
+	}
+	spin_unlock(&req->head->lock);
+
+	return 0;
+}
+
+static int aio_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
+		void *key)
+{
+	struct poll_iocb *req = container_of(wait, struct poll_iocb, wait);
+	struct aio_kiocb *iocb = container_of(req, struct aio_kiocb, poll);
+	__poll_t mask = key_to_poll(key);
+
+	req->woken = true;
+
+	/* for instances that support it check for an event match first: */
+	if (mask) {
+		if (!(mask & req->events))
+			return 0;
+
+		/* try to complete the iocb inline if we can: */
+		if (spin_trylock(&iocb->ki_ctx->ctx_lock)) {
+			list_del(&iocb->ki_list);
+			spin_unlock(&iocb->ki_ctx->ctx_lock);
+
+			list_del_init(&req->wait.entry);
+			aio_poll_complete(iocb, mask);
+			return 1;
+		}
+	}
+
+	list_del_init(&req->wait.entry);
+	schedule_work(&req->work);
+	return 1;
+}
+
+struct aio_poll_table {
+	struct poll_table_struct	pt;
+	struct aio_kiocb		*iocb;
+	int				error;
+};
+
+static void
+aio_poll_queue_proc(struct file *file, struct wait_queue_head *head,
+		struct poll_table_struct *p)
+{
+	struct aio_poll_table *pt = container_of(p, struct aio_poll_table, pt);
+
+	/* multiple wait queues per file are not supported */
+	if (unlikely(pt->iocb->poll.head)) {
+		pt->error = -EINVAL;
+		return;
+	}
+
+	pt->error = 0;
+	pt->iocb->poll.head = head;
+	add_wait_queue(head, &pt->iocb->poll.wait);
+}
+
+static ssize_t aio_poll(struct aio_kiocb *aiocb, struct iocb *iocb)
+{
+	struct kioctx *ctx = aiocb->ki_ctx;
+	struct poll_iocb *req = &aiocb->poll;
+	struct aio_poll_table apt;
+	__poll_t mask;
+
+	/* reject any unknown events outside the normal event mask. */
+	if ((u16)iocb->aio_buf != iocb->aio_buf)
+		return -EINVAL;
+	/* reject fields that are not defined for poll */
+	if (iocb->aio_offset || iocb->aio_nbytes || iocb->aio_rw_flags)
+		return -EINVAL;
+
+	INIT_WORK(&req->work, aio_poll_complete_work);
+	req->events = demangle_poll(iocb->aio_buf) | EPOLLERR | EPOLLHUP;
+	req->file = fget(iocb->aio_fildes);
+	if (unlikely(!req->file))
+		return -EBADF;
+
+	apt.pt._qproc = aio_poll_queue_proc;
+	apt.pt._key = req->events;
+	apt.iocb = aiocb;
+	apt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
+
+	/* initialized the list so that we can do list_empty checks */
+	INIT_LIST_HEAD(&req->wait.entry);
+	init_waitqueue_func_entry(&req->wait, aio_poll_wake);
+
+	/* one for removal from waitqueue, one for this function */
+	refcount_set(&aiocb->ki_refcnt, 2);
+
+	mask = vfs_poll(req->file, &apt.pt) & req->events;
+	if (unlikely(!req->head)) {
+		/* we did not manage to set up a waitqueue, done */
+		goto out;
+	}
+
+	spin_lock_irq(&ctx->ctx_lock);
+	spin_lock(&req->head->lock);
+	if (req->woken) {
+		/* wake_up context handles the rest */
+		mask = 0;
+		apt.error = 0;
+	} else if (mask || apt.error) {
+		/* if we get an error or a mask we are done */
+		WARN_ON_ONCE(list_empty(&req->wait.entry));
+		list_del_init(&req->wait.entry);
+	} else {
+		/* actually waiting for an event */
+		list_add_tail(&aiocb->ki_list, &ctx->active_reqs);
+		aiocb->ki_cancel = aio_poll_cancel;
+	}
+	spin_unlock(&req->head->lock);
+	spin_unlock_irq(&ctx->ctx_lock);
+
+out:
+	if (unlikely(apt.error)) {
+		fput(req->file);
+		return apt.error;
+	}
+
+	if (mask)
+		aio_poll_complete(aiocb, mask);
+	iocb_put(aiocb);
+	return 0;
+}
+
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 			 bool compat)
 {
@@ -1649,6 +1846,9 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 	case IOCB_CMD_FDSYNC:
 		ret = aio_fsync(&req->fsync, &iocb, true);
 		break;
+	case IOCB_CMD_POLL:
+		ret = aio_poll(req, &iocb);
+		break;
 	default:
 		pr_debug("invalid aio operation %d\n", iocb.aio_lio_opcode);
 		ret = -EINVAL;

+ 3 - 3
fs/timerfd.c

@@ -66,7 +66,7 @@ static void timerfd_triggered(struct timerfd_ctx *ctx)
 	spin_lock_irqsave(&ctx->wqh.lock, flags);
 	ctx->expired = 1;
 	ctx->ticks++;
-	wake_up_locked(&ctx->wqh);
+	wake_up_locked_poll(&ctx->wqh, EPOLLIN);
 	spin_unlock_irqrestore(&ctx->wqh.lock, flags);
 }
 
@@ -107,7 +107,7 @@ void timerfd_clock_was_set(void)
 		if (ctx->moffs != moffs) {
 			ctx->moffs = KTIME_MAX;
 			ctx->ticks++;
-			wake_up_locked(&ctx->wqh);
+			wake_up_locked_poll(&ctx->wqh, EPOLLIN);
 		}
 		spin_unlock_irqrestore(&ctx->wqh.lock, flags);
 	}
@@ -345,7 +345,7 @@ static long timerfd_ioctl(struct file *file, unsigned int cmd, unsigned long arg
 		spin_lock_irq(&ctx->wqh.lock);
 		if (!timerfd_canceled(ctx)) {
 			ctx->ticks = ticks;
-			wake_up_locked(&ctx->wqh);
+			wake_up_locked_poll(&ctx->wqh, EPOLLIN);
 		} else
 			ret = -ECANCELED;
 		spin_unlock_irq(&ctx->wqh.lock);

+ 2 - 4
include/uapi/linux/aio_abi.h

@@ -38,10 +38,8 @@ enum {
 	IOCB_CMD_PWRITE = 1,
 	IOCB_CMD_FSYNC = 2,
 	IOCB_CMD_FDSYNC = 3,
-	/* These two are experimental.
-	 * IOCB_CMD_PREADX = 4,
-	 * IOCB_CMD_POLL = 5,
-	 */
+	/* 4 was the experimental IOCB_CMD_PREADX */
+	IOCB_CMD_POLL = 5,
 	IOCB_CMD_NOOP = 6,
 	IOCB_CMD_PREADV = 7,
 	IOCB_CMD_PWRITEV = 8,