From 7dda712818373d4d8ecc5dca2293664fcd3b0158 Mon Sep 17 00:00:00 2001 From: Christoph Hellwig Date: Mon, 16 Jul 2018 12:25:50 +0200 Subject: [PATCH 1/4] timerfd: add support for keyed wakeups This prepares timerfd for use with aio poll. Signed-off-by: Christoph Hellwig Tested-by: Avi Kivity --- fs/timerfd.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fs/timerfd.c b/fs/timerfd.c index cdad49da3ff7..f6c54fd56645 100644 --- a/fs/timerfd.c +++ b/fs/timerfd.c @@ -66,7 +66,7 @@ static void timerfd_triggered(struct timerfd_ctx *ctx) spin_lock_irqsave(&ctx->wqh.lock, flags); ctx->expired = 1; ctx->ticks++; - wake_up_locked(&ctx->wqh); + wake_up_locked_poll(&ctx->wqh, EPOLLIN); spin_unlock_irqrestore(&ctx->wqh.lock, flags); } @@ -107,7 +107,7 @@ void timerfd_clock_was_set(void) if (ctx->moffs != moffs) { ctx->moffs = KTIME_MAX; ctx->ticks++; - wake_up_locked(&ctx->wqh); + wake_up_locked_poll(&ctx->wqh, EPOLLIN); } spin_unlock_irqrestore(&ctx->wqh.lock, flags); } @@ -345,7 +345,7 @@ static long timerfd_ioctl(struct file *file, unsigned int cmd, unsigned long arg spin_lock_irq(&ctx->wqh.lock); if (!timerfd_canceled(ctx)) { ctx->ticks = ticks; - wake_up_locked(&ctx->wqh); + wake_up_locked_poll(&ctx->wqh, EPOLLIN); } else ret = -ECANCELED; spin_unlock_irq(&ctx->wqh.lock); From 9018ccc453af063d16b3b6b5dfa2ad0635390371 Mon Sep 17 00:00:00 2001 From: Christoph Hellwig Date: Tue, 24 Jul 2018 11:36:37 +0200 Subject: [PATCH 2/4] aio: add a iocb refcount This is needed to prevent races caused by the way the ->poll API works. To avoid introducing overhead for other users of the iocbs we initialize it to zero and only do refcount operations if it is non-zero in the completion path. Signed-off-by: Christoph Hellwig Tested-by: Avi Kivity --- fs/aio.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/fs/aio.c b/fs/aio.c index 27454594e37a..fe2018ada32c 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -178,6 +179,7 @@ struct aio_kiocb { struct list_head ki_list; /* the aio core uses this * for cancellation */ + refcount_t ki_refcnt; /* * If the aio_resfd field of the userspace iocb is not zero, @@ -1015,6 +1017,7 @@ static inline struct aio_kiocb *aio_get_req(struct kioctx *ctx) percpu_ref_get(&ctx->reqs); INIT_LIST_HEAD(&req->ki_list); + refcount_set(&req->ki_refcnt, 0); req->ki_ctx = ctx; return req; out_put: @@ -1049,6 +1052,15 @@ out: return ret; } +static inline void iocb_put(struct aio_kiocb *iocb) +{ + if (refcount_read(&iocb->ki_refcnt) == 0 || + refcount_dec_and_test(&iocb->ki_refcnt)) { + percpu_ref_put(&iocb->ki_ctx->reqs); + kmem_cache_free(kiocb_cachep, iocb); + } +} + /* aio_complete * Called when the io request on the given iocb is complete. */ @@ -1118,8 +1130,6 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2) eventfd_ctx_put(iocb->ki_eventfd); } - kmem_cache_free(kiocb_cachep, iocb); - /* * We have to order our ring_info tail store above and test * of the wait list below outside the wait lock. This is @@ -1130,8 +1140,7 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2) if (waitqueue_active(&ctx->wait)) wake_up(&ctx->wait); - - percpu_ref_put(&ctx->reqs); + iocb_put(iocb); } /* aio_read_events_ring From bfe4037e722ec672c9dafd5730d9132afeeb76e9 Mon Sep 17 00:00:00 2001 From: Christoph Hellwig Date: Mon, 16 Jul 2018 09:08:20 +0200 Subject: [PATCH 3/4] aio: implement IOCB_CMD_POLL Simple one-shot poll through the io_submit() interface. To poll for a file descriptor the application should submit an iocb of type IOCB_CMD_POLL. It will poll the fd for the events specified in the the first 32 bits of the aio_buf field of the iocb. Unlike poll or epoll without EPOLLONESHOT this interface always works in one shot mode, that is once the iocb is completed, it will have to be resubmitted. Signed-off-by: Christoph Hellwig Tested-by: Avi Kivity --- fs/aio.c | 178 +++++++++++++++++++++++++++++++++++ include/uapi/linux/aio_abi.h | 6 +- 2 files changed, 180 insertions(+), 4 deletions(-) diff --git a/fs/aio.c b/fs/aio.c index fe2018ada32c..2fd19521d8a8 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -5,6 +5,7 @@ * Implements an efficient asynchronous io interface. * * Copyright 2000, 2001, 2002 Red Hat, Inc. All Rights Reserved. + * Copyright 2018 Christoph Hellwig. * * See ../COPYING for licensing terms. */ @@ -165,10 +166,21 @@ struct fsync_iocb { bool datasync; }; +struct poll_iocb { + struct file *file; + struct wait_queue_head *head; + __poll_t events; + bool woken; + bool cancelled; + struct wait_queue_entry wait; + struct work_struct work; +}; + struct aio_kiocb { union { struct kiocb rw; struct fsync_iocb fsync; + struct poll_iocb poll; }; struct kioctx *ki_ctx; @@ -1601,6 +1613,169 @@ static int aio_fsync(struct fsync_iocb *req, struct iocb *iocb, bool datasync) return 0; } +static inline void aio_poll_complete(struct aio_kiocb *iocb, __poll_t mask) +{ + struct file *file = iocb->poll.file; + + aio_complete(iocb, mangle_poll(mask), 0); + fput(file); +} + +static void aio_poll_complete_work(struct work_struct *work) +{ + struct poll_iocb *req = container_of(work, struct poll_iocb, work); + struct aio_kiocb *iocb = container_of(req, struct aio_kiocb, poll); + struct poll_table_struct pt = { ._key = req->events }; + struct kioctx *ctx = iocb->ki_ctx; + __poll_t mask = 0; + + if (!READ_ONCE(req->cancelled)) + mask = vfs_poll(req->file, &pt) & req->events; + + /* + * Note that ->ki_cancel callers also delete iocb from active_reqs after + * calling ->ki_cancel. We need the ctx_lock roundtrip here to + * synchronize with them. In the cancellation case the list_del_init + * itself is not actually needed, but harmless so we keep it in to + * avoid further branches in the fast path. + */ + spin_lock_irq(&ctx->ctx_lock); + if (!mask && !READ_ONCE(req->cancelled)) { + add_wait_queue(req->head, &req->wait); + spin_unlock_irq(&ctx->ctx_lock); + return; + } + list_del_init(&iocb->ki_list); + spin_unlock_irq(&ctx->ctx_lock); + + aio_poll_complete(iocb, mask); +} + +/* assumes we are called with irqs disabled */ +static int aio_poll_cancel(struct kiocb *iocb) +{ + struct aio_kiocb *aiocb = container_of(iocb, struct aio_kiocb, rw); + struct poll_iocb *req = &aiocb->poll; + + spin_lock(&req->head->lock); + WRITE_ONCE(req->cancelled, true); + if (!list_empty(&req->wait.entry)) { + list_del_init(&req->wait.entry); + schedule_work(&aiocb->poll.work); + } + spin_unlock(&req->head->lock); + + return 0; +} + +static int aio_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, + void *key) +{ + struct poll_iocb *req = container_of(wait, struct poll_iocb, wait); + __poll_t mask = key_to_poll(key); + + req->woken = true; + + /* for instances that support it check for an event match first: */ + if (mask && !(mask & req->events)) + return 0; + + list_del_init(&req->wait.entry); + schedule_work(&req->work); + return 1; +} + +struct aio_poll_table { + struct poll_table_struct pt; + struct aio_kiocb *iocb; + int error; +}; + +static void +aio_poll_queue_proc(struct file *file, struct wait_queue_head *head, + struct poll_table_struct *p) +{ + struct aio_poll_table *pt = container_of(p, struct aio_poll_table, pt); + + /* multiple wait queues per file are not supported */ + if (unlikely(pt->iocb->poll.head)) { + pt->error = -EINVAL; + return; + } + + pt->error = 0; + pt->iocb->poll.head = head; + add_wait_queue(head, &pt->iocb->poll.wait); +} + +static ssize_t aio_poll(struct aio_kiocb *aiocb, struct iocb *iocb) +{ + struct kioctx *ctx = aiocb->ki_ctx; + struct poll_iocb *req = &aiocb->poll; + struct aio_poll_table apt; + __poll_t mask; + + /* reject any unknown events outside the normal event mask. */ + if ((u16)iocb->aio_buf != iocb->aio_buf) + return -EINVAL; + /* reject fields that are not defined for poll */ + if (iocb->aio_offset || iocb->aio_nbytes || iocb->aio_rw_flags) + return -EINVAL; + + INIT_WORK(&req->work, aio_poll_complete_work); + req->events = demangle_poll(iocb->aio_buf) | EPOLLERR | EPOLLHUP; + req->file = fget(iocb->aio_fildes); + if (unlikely(!req->file)) + return -EBADF; + + apt.pt._qproc = aio_poll_queue_proc; + apt.pt._key = req->events; + apt.iocb = aiocb; + apt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */ + + /* initialized the list so that we can do list_empty checks */ + INIT_LIST_HEAD(&req->wait.entry); + init_waitqueue_func_entry(&req->wait, aio_poll_wake); + + /* one for removal from waitqueue, one for this function */ + refcount_set(&aiocb->ki_refcnt, 2); + + mask = vfs_poll(req->file, &apt.pt) & req->events; + if (unlikely(!req->head)) { + /* we did not manage to set up a waitqueue, done */ + goto out; + } + + spin_lock_irq(&ctx->ctx_lock); + spin_lock(&req->head->lock); + if (req->woken) { + /* wake_up context handles the rest */ + mask = 0; + apt.error = 0; + } else if (mask || apt.error) { + /* if we get an error or a mask we are done */ + WARN_ON_ONCE(list_empty(&req->wait.entry)); + list_del_init(&req->wait.entry); + } else { + /* actually waiting for an event */ + list_add_tail(&aiocb->ki_list, &ctx->active_reqs); + aiocb->ki_cancel = aio_poll_cancel; + } + spin_unlock(&req->head->lock); + spin_unlock_irq(&ctx->ctx_lock); + +out: + if (unlikely(apt.error)) { + fput(req->file); + return apt.error; + } + + if (mask) + aio_poll_complete(aiocb, mask); + iocb_put(aiocb); + return 0; +} + static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, bool compat) { @@ -1674,6 +1849,9 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, case IOCB_CMD_FDSYNC: ret = aio_fsync(&req->fsync, &iocb, true); break; + case IOCB_CMD_POLL: + ret = aio_poll(req, &iocb); + break; default: pr_debug("invalid aio operation %d\n", iocb.aio_lio_opcode); ret = -EINVAL; diff --git a/include/uapi/linux/aio_abi.h b/include/uapi/linux/aio_abi.h index d4593a6062ef..ce43d340f010 100644 --- a/include/uapi/linux/aio_abi.h +++ b/include/uapi/linux/aio_abi.h @@ -38,10 +38,8 @@ enum { IOCB_CMD_PWRITE = 1, IOCB_CMD_FSYNC = 2, IOCB_CMD_FDSYNC = 3, - /* These two are experimental. - * IOCB_CMD_PREADX = 4, - * IOCB_CMD_POLL = 5, - */ + /* 4 was the experimental IOCB_CMD_PREADX */ + IOCB_CMD_POLL = 5, IOCB_CMD_NOOP = 6, IOCB_CMD_PREADV = 7, IOCB_CMD_PWRITEV = 8, From e8693bcfa0b4a56268946f0756153d942cb66cf7 Mon Sep 17 00:00:00 2001 From: Christoph Hellwig Date: Mon, 16 Jul 2018 12:25:17 +0200 Subject: [PATCH 4/4] aio: allow direct aio poll comletions for keyed wakeups If we get a keyed wakeup for a aio poll waitqueue and wake can acquire the ctx_lock without spinning we can just complete the iocb straight from the wakeup callback to avoid a context switch. Signed-off-by: Christoph Hellwig Tested-by: Avi Kivity --- fs/aio.c | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/fs/aio.c b/fs/aio.c index 2fd19521d8a8..29f2b5b57d32 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -1672,13 +1672,26 @@ static int aio_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key) { struct poll_iocb *req = container_of(wait, struct poll_iocb, wait); + struct aio_kiocb *iocb = container_of(req, struct aio_kiocb, poll); __poll_t mask = key_to_poll(key); req->woken = true; /* for instances that support it check for an event match first: */ - if (mask && !(mask & req->events)) - return 0; + if (mask) { + if (!(mask & req->events)) + return 0; + + /* try to complete the iocb inline if we can: */ + if (spin_trylock(&iocb->ki_ctx->ctx_lock)) { + list_del(&iocb->ki_list); + spin_unlock(&iocb->ki_ctx->ctx_lock); + + list_del_init(&req->wait.entry); + aio_poll_complete(iocb, mask); + return 1; + } + } list_del_init(&req->wait.entry); schedule_work(&req->work);