diff --git a/fs/io-wq.c b/fs/io-wq.c index 44e20248805a..28868eb4cd09 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -16,6 +16,7 @@ #include #include #include +#include #include "../kernel/sched/sched.h" #include "io-wq.h" @@ -52,9 +53,6 @@ struct io_worker { struct io_wq_work *cur_work; spinlock_t lock; - const struct cred *cur_creds; - const struct cred *saved_creds; - struct completion ref_done; struct rcu_head rcu; @@ -117,7 +115,10 @@ struct io_wq { struct io_wq_hash *hash; refcount_t refs; - struct completion done; + struct completion exited; + + atomic_t worker_refs; + struct completion worker_done; struct hlist_node cpuhp_node; @@ -126,6 +127,17 @@ struct io_wq { static enum cpuhp_state io_wq_online; +struct io_cb_cancel_data { + work_cancel_fn *fn; + void *data; + int nr_running; + int nr_pending; + bool cancel_all; +}; + +static void io_wqe_cancel_pending_work(struct io_wqe *wqe, + struct io_cb_cancel_data *match); + static bool io_worker_get(struct io_worker *worker) { return refcount_inc_not_zero(&worker->ref); @@ -175,11 +187,6 @@ static void io_worker_exit(struct io_worker *worker) worker->flags = 0; preempt_enable(); - if (worker->saved_creds) { - revert_creds(worker->saved_creds); - worker->cur_creds = worker->saved_creds = NULL; - } - raw_spin_lock_irq(&wqe->lock); if (flags & IO_WORKER_F_FREE) hlist_nulls_del_rcu(&worker->nulls_node); @@ -188,7 +195,9 @@ static void io_worker_exit(struct io_worker *worker) raw_spin_unlock_irq(&wqe->lock); kfree_rcu(worker, rcu); - io_wq_put(wqe->wq); + if (atomic_dec_and_test(&wqe->wq->worker_refs)) + complete(&wqe->wq->worker_done); + do_exit(0); } static inline bool io_wqe_run_queue(struct io_wqe *wqe) @@ -263,12 +272,6 @@ static void io_wqe_dec_running(struct io_worker *worker) io_wqe_wake_worker(wqe, acct); } -static void io_worker_start(struct io_worker *worker) -{ - worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); - io_wqe_inc_running(worker); -} - /* * Worker will start processing some work. Move it to the busy list, if * it's currently on the freelist @@ -319,10 +322,6 @@ static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) worker->flags |= IO_WORKER_F_FREE; hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); } - if (worker->saved_creds) { - revert_creds(worker->saved_creds); - worker->cur_creds = worker->saved_creds = NULL; - } } static inline unsigned int io_get_work_hash(struct io_wq_work *work) @@ -397,18 +396,6 @@ static void io_flush_signals(void) } } -static void io_wq_switch_creds(struct io_worker *worker, - struct io_wq_work *work) -{ - const struct cred *old_creds = override_creds(work->creds); - - worker->cur_creds = work->creds; - if (worker->saved_creds) - put_cred(old_creds); /* creds set by previous switch */ - else - worker->saved_creds = old_creds; -} - static void io_assign_current_work(struct io_worker *worker, struct io_wq_work *work) { @@ -458,8 +445,6 @@ get_next: unsigned int hash = io_get_work_hash(work); next_hashed = wq_next_work(work); - if (work->creds && worker->cur_creds != work->creds) - io_wq_switch_creds(worker, work); wq->do_work(work); io_assign_current_work(worker, NULL); @@ -495,8 +480,13 @@ static int io_wqe_worker(void *data) struct io_worker *worker = data; struct io_wqe *wqe = worker->wqe; struct io_wq *wq = wqe->wq; + char buf[TASK_COMM_LEN]; - io_worker_start(worker); + worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); + io_wqe_inc_running(worker); + + sprintf(buf, "iou-wrk-%d", wq->task_pid); + set_task_comm(current, buf); while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { set_current_state(TASK_INTERRUPTIBLE); @@ -571,67 +561,11 @@ void io_wq_worker_sleeping(struct task_struct *tsk) raw_spin_unlock_irq(&worker->wqe->lock); } -static int task_thread(void *data, int index) -{ - struct io_worker *worker = data; - struct io_wqe *wqe = worker->wqe; - struct io_wqe_acct *acct = &wqe->acct[index]; - struct io_wq *wq = wqe->wq; - char buf[TASK_COMM_LEN]; - - sprintf(buf, "iou-wrk-%d", wq->task_pid); - set_task_comm(current, buf); - - current->pf_io_worker = worker; - worker->task = current; - - set_cpus_allowed_ptr(current, cpumask_of_node(wqe->node)); - current->flags |= PF_NO_SETAFFINITY; - - raw_spin_lock_irq(&wqe->lock); - hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); - list_add_tail_rcu(&worker->all_list, &wqe->all_list); - worker->flags |= IO_WORKER_F_FREE; - if (index == IO_WQ_ACCT_BOUND) - worker->flags |= IO_WORKER_F_BOUND; - if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) - worker->flags |= IO_WORKER_F_FIXED; - acct->nr_workers++; - raw_spin_unlock_irq(&wqe->lock); - - io_wqe_worker(data); - do_exit(0); -} - -static int task_thread_bound(void *data) -{ - return task_thread(data, IO_WQ_ACCT_BOUND); -} - -static int task_thread_unbound(void *data) -{ - return task_thread(data, IO_WQ_ACCT_UNBOUND); -} - -pid_t io_wq_fork_thread(int (*fn)(void *), void *arg) -{ - unsigned long flags = CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD| - CLONE_IO|SIGCHLD; - struct kernel_clone_args args = { - .flags = ((lower_32_bits(flags) | CLONE_VM | - CLONE_UNTRACED) & ~CSIGNAL), - .exit_signal = (lower_32_bits(flags) & CSIGNAL), - .stack = (unsigned long)fn, - .stack_size = (unsigned long)arg, - }; - - return kernel_clone(&args); -} - static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) { + struct io_wqe_acct *acct = &wqe->acct[index]; struct io_worker *worker; - pid_t pid; + struct task_struct *tsk; __set_current_state(TASK_RUNNING); @@ -645,17 +579,32 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) spin_lock_init(&worker->lock); init_completion(&worker->ref_done); - refcount_inc(&wq->refs); + atomic_inc(&wq->worker_refs); - if (index == IO_WQ_ACCT_BOUND) - pid = io_wq_fork_thread(task_thread_bound, worker); - else - pid = io_wq_fork_thread(task_thread_unbound, worker); - if (pid < 0) { - io_wq_put(wq); + tsk = create_io_thread(io_wqe_worker, worker, wqe->node); + if (IS_ERR(tsk)) { + if (atomic_dec_and_test(&wq->worker_refs)) + complete(&wq->worker_done); kfree(worker); return false; } + + tsk->pf_io_worker = worker; + worker->task = tsk; + set_cpus_allowed_ptr(tsk, cpumask_of_node(wqe->node)); + tsk->flags |= PF_NOFREEZE | PF_NO_SETAFFINITY; + + raw_spin_lock_irq(&wqe->lock); + hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); + list_add_tail_rcu(&worker->all_list, &wqe->all_list); + worker->flags |= IO_WORKER_F_FREE; + if (index == IO_WQ_ACCT_BOUND) + worker->flags |= IO_WORKER_F_BOUND; + if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) + worker->flags |= IO_WORKER_F_FIXED; + acct->nr_workers++; + raw_spin_unlock_irq(&wqe->lock); + wake_up_new_task(tsk); return true; } @@ -664,6 +613,8 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) { struct io_wqe_acct *acct = &wqe->acct[index]; + if (acct->nr_workers && test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) + return false; /* if we have available workers or no work, no need */ if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) return false; @@ -697,6 +648,7 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe, static bool io_wq_worker_wake(struct io_worker *worker, void *data) { + set_notify_signal(worker->task); wake_up_process(worker->task); return false; } @@ -725,6 +677,23 @@ static void io_wq_check_workers(struct io_wq *wq) } } +static bool io_wq_work_match_all(struct io_wq_work *work, void *data) +{ + return true; +} + +static void io_wq_cancel_pending(struct io_wq *wq) +{ + struct io_cb_cancel_data match = { + .fn = io_wq_work_match_all, + .cancel_all = true, + }; + int node; + + for_each_node(node) + io_wqe_cancel_pending_work(wq->wqes[node], &match); +} + /* * Manager thread. Tasked with creating new workers, if we need them. */ @@ -732,25 +701,38 @@ static int io_wq_manager(void *data) { struct io_wq *wq = data; char buf[TASK_COMM_LEN]; + int node; sprintf(buf, "iou-mgr-%d", wq->task_pid); set_task_comm(current, buf); - current->flags |= PF_IO_WORKER; - wq->manager = current; - - complete(&wq->done); do { set_current_state(TASK_INTERRUPTIBLE); io_wq_check_workers(wq); schedule_timeout(HZ); + try_to_freeze(); if (fatal_signal_pending(current)) set_bit(IO_WQ_BIT_EXIT, &wq->state); } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)); io_wq_check_workers(wq); - wq->manager = NULL; - io_wq_put(wq); + + rcu_read_lock(); + for_each_node(node) + io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); + rcu_read_unlock(); + + /* we might not ever have created any workers */ + if (atomic_read(&wq->worker_refs)) + wait_for_completion(&wq->worker_done); + + spin_lock_irq(&wq->hash->wait.lock); + for_each_node(node) + list_del_init(&wq->wqes[node]->wait.entry); + spin_unlock_irq(&wq->hash->wait.lock); + + io_wq_cancel_pending(wq); + complete(&wq->exited); do_exit(0); } @@ -787,23 +769,20 @@ append: static int io_wq_fork_manager(struct io_wq *wq) { - int ret; + struct task_struct *tsk; if (wq->manager) return 0; - clear_bit(IO_WQ_BIT_EXIT, &wq->state); - refcount_inc(&wq->refs); - current->flags |= PF_IO_WORKER; - ret = io_wq_fork_thread(io_wq_manager, wq); - current->flags &= ~PF_IO_WORKER; - if (ret >= 0) { - wait_for_completion(&wq->done); + reinit_completion(&wq->worker_done); + tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE); + if (!IS_ERR(tsk)) { + wq->manager = get_task_struct(tsk); + wake_up_new_task(tsk); return 0; } - io_wq_put(wq); - return ret; + return PTR_ERR(tsk); } static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) @@ -813,7 +792,8 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) unsigned long flags; /* Can only happen if manager creation fails after exec */ - if (unlikely(io_wq_fork_manager(wqe->wq))) { + if (io_wq_fork_manager(wqe->wq) || + test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) { work->flags |= IO_WQ_WORK_CANCEL; wqe->wq->do_work(work); return; @@ -849,14 +829,6 @@ void io_wq_hash_work(struct io_wq_work *work, void *val) work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT)); } -struct io_cb_cancel_data { - work_cancel_fn *fn; - void *data; - int nr_running; - int nr_pending; - bool cancel_all; -}; - static bool io_wq_worker_cancel(struct io_worker *worker, void *data) { struct io_cb_cancel_data *match = data; @@ -1043,16 +1015,18 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) } wq->task_pid = current->pid; - init_completion(&wq->done); + init_completion(&wq->exited); refcount_set(&wq->refs, 1); + init_completion(&wq->worker_done); + atomic_set(&wq->worker_refs, 0); + ret = io_wq_fork_manager(wq); if (!ret) return wq; - io_wq_put(wq); - io_wq_put_hash(data->hash); err: + io_wq_put_hash(data->hash); cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); for_each_node(node) kfree(wq->wqes[node]); @@ -1063,6 +1037,16 @@ err_wq: return ERR_PTR(ret); } +static void io_wq_destroy_manager(struct io_wq *wq) +{ + if (wq->manager) { + wake_up_process(wq->manager); + wait_for_completion(&wq->exited); + put_task_struct(wq->manager); + wq->manager = NULL; + } +} + static void io_wq_destroy(struct io_wq *wq) { int node; @@ -1070,26 +1054,16 @@ static void io_wq_destroy(struct io_wq *wq) cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); set_bit(IO_WQ_BIT_EXIT, &wq->state); - if (wq->manager) - wake_up_process(wq->manager); + io_wq_destroy_manager(wq); - rcu_read_lock(); - for_each_node(node) - io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); - rcu_read_unlock(); - - spin_lock_irq(&wq->hash->wait.lock); for_each_node(node) { struct io_wqe *wqe = wq->wqes[node]; - - list_del_init(&wqe->wait.entry); + WARN_ON_ONCE(!wq_list_empty(&wqe->work_list)); kfree(wqe); } - spin_unlock_irq(&wq->hash->wait.lock); io_wq_put_hash(wq->hash); kfree(wq->wqes); kfree(wq); - } void io_wq_put(struct io_wq *wq) @@ -1098,6 +1072,13 @@ void io_wq_put(struct io_wq *wq) io_wq_destroy(wq); } +void io_wq_put_and_exit(struct io_wq *wq) +{ + set_bit(IO_WQ_BIT_EXIT, &wq->state); + io_wq_destroy_manager(wq); + io_wq_put(wq); +} + static bool io_wq_worker_affinity(struct io_worker *worker, void *data) { struct task_struct *task = worker->task; diff --git a/fs/io-wq.h b/fs/io-wq.h index b6ca12b60c35..5fbf7997149e 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -79,8 +79,8 @@ static inline void wq_list_del(struct io_wq_work_list *list, struct io_wq_work { struct io_wq_work_node list; - const struct cred *creds; unsigned flags; + unsigned short personality; }; static inline struct io_wq_work *wq_next_work(struct io_wq_work *work) @@ -114,12 +114,11 @@ struct io_wq_data { struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data); void io_wq_put(struct io_wq *wq); +void io_wq_put_and_exit(struct io_wq *wq); void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work); void io_wq_hash_work(struct io_wq_work *work, void *val); -pid_t io_wq_fork_thread(int (*fn)(void *), void *arg); - static inline bool io_wq_is_hashed(struct io_wq_work *work) { return work->flags & IO_WQ_WORK_HASHED; diff --git a/fs/io_uring.c b/fs/io_uring.c index 4a088581b0f2..92c25b5f1349 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -74,13 +74,11 @@ #include #include #include -#include #include #include #include #include -#include -#include +#include #define CREATE_TRACE_POINTS #include @@ -276,7 +274,7 @@ struct io_sq_data { unsigned long state; struct completion startup; - struct completion completion; + struct completion parked; struct completion exited; }; @@ -338,7 +336,6 @@ struct io_ring_ctx { unsigned int drain_next: 1; unsigned int eventfd_async: 1; unsigned int restricted: 1; - unsigned int sqo_dead: 1; unsigned int sqo_exec: 1; /* @@ -380,11 +377,6 @@ struct io_ring_ctx { struct io_rings *rings; - /* - * For SQPOLL usage - */ - struct task_struct *sqo_task; - /* Only used for accounting purposes */ struct mm_struct *mm_account; @@ -688,7 +680,6 @@ enum { REQ_F_POLLED_BIT, REQ_F_BUFFER_SELECTED_BIT, REQ_F_NO_FILE_TABLE_BIT, - REQ_F_WORK_INITIALIZED_BIT, REQ_F_LTIMEOUT_ACTIVE_BIT, REQ_F_COMPLETE_INLINE_BIT, @@ -712,7 +703,7 @@ enum { /* fail rest of links */ REQ_F_FAIL_LINK = BIT(REQ_F_FAIL_LINK_BIT), - /* on inflight list */ + /* on inflight list, should be cancelled and waited on exit reliably */ REQ_F_INFLIGHT = BIT(REQ_F_INFLIGHT_BIT), /* read/write uses file position */ REQ_F_CUR_POS = BIT(REQ_F_CUR_POS_BIT), @@ -730,8 +721,6 @@ enum { REQ_F_BUFFER_SELECTED = BIT(REQ_F_BUFFER_SELECTED_BIT), /* doesn't need file table for this request */ REQ_F_NO_FILE_TABLE = BIT(REQ_F_NO_FILE_TABLE_BIT), - /* io_wq_work is initialized */ - REQ_F_WORK_INITIALIZED = BIT(REQ_F_WORK_INITIALIZED_BIT), /* linked timeout is active, i.e. prepared by link's head */ REQ_F_LTIMEOUT_ACTIVE = BIT(REQ_F_LTIMEOUT_ACTIVE_BIT), /* completion is deferred through io_comp_state */ @@ -1080,9 +1069,7 @@ static bool io_match_task(struct io_kiocb *head, return true; io_for_each_link(req, head) { - if (!(req->flags & REQ_F_WORK_INITIALIZED)) - continue; - if (req->file && req->file->f_op == &io_uring_fops) + if (req->flags & REQ_F_INFLIGHT) return true; if (req->task->files == files) return true; @@ -1096,24 +1083,6 @@ static inline void req_set_fail_links(struct io_kiocb *req) req->flags |= REQ_F_FAIL_LINK; } -static inline void __io_req_init_async(struct io_kiocb *req) -{ - memset(&req->work, 0, sizeof(req->work)); - req->flags |= REQ_F_WORK_INITIALIZED; -} - -/* - * Note: must call io_req_init_async() for the first time you - * touch any members of io_wq_work. - */ -static inline void io_req_init_async(struct io_kiocb *req) -{ - if (req->flags & REQ_F_WORK_INITIALIZED) - return; - - __io_req_init_async(req); -} - static void io_ring_ctx_ref_free(struct percpu_ref *ref) { struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs); @@ -1196,37 +1165,11 @@ static bool req_need_defer(struct io_kiocb *req, u32 seq) return false; } -static void io_req_clean_work(struct io_kiocb *req) -{ - if (!(req->flags & REQ_F_WORK_INITIALIZED)) - return; - - if (req->work.creds) { - put_cred(req->work.creds); - req->work.creds = NULL; - } - if (req->flags & REQ_F_INFLIGHT) { - struct io_ring_ctx *ctx = req->ctx; - struct io_uring_task *tctx = req->task->io_uring; - unsigned long flags; - - spin_lock_irqsave(&ctx->inflight_lock, flags); - list_del(&req->inflight_entry); - spin_unlock_irqrestore(&ctx->inflight_lock, flags); - req->flags &= ~REQ_F_INFLIGHT; - if (atomic_read(&tctx->in_idle)) - wake_up(&tctx->wait); - } - - req->flags &= ~REQ_F_WORK_INITIALIZED; -} - static void io_req_track_inflight(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; if (!(req->flags & REQ_F_INFLIGHT)) { - io_req_init_async(req); req->flags |= REQ_F_INFLIGHT; spin_lock_irq(&ctx->inflight_lock); @@ -1240,8 +1183,6 @@ static void io_prep_async_work(struct io_kiocb *req) const struct io_op_def *def = &io_op_defs[req->opcode]; struct io_ring_ctx *ctx = req->ctx; - io_req_init_async(req); - if (req->flags & REQ_F_FORCE_ASYNC) req->work.flags |= IO_WQ_WORK_CONCURRENT; @@ -1252,8 +1193,6 @@ static void io_prep_async_work(struct io_kiocb *req) if (def->unbound_nonreg_file) req->work.flags |= IO_WQ_WORK_UNBOUND; } - if (!req->work.creds) - req->work.creds = get_current_cred(); } static void io_prep_async_link(struct io_kiocb *req) @@ -1264,7 +1203,7 @@ static void io_prep_async_link(struct io_kiocb *req) io_prep_async_work(cur); } -static struct io_kiocb *__io_queue_async_work(struct io_kiocb *req) +static void io_queue_async_work(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; struct io_kiocb *link = io_prep_linked_timeout(req); @@ -1275,18 +1214,9 @@ static struct io_kiocb *__io_queue_async_work(struct io_kiocb *req) trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req, &req->work, req->flags); - io_wq_enqueue(tctx->io_wq, &req->work); - return link; -} - -static void io_queue_async_work(struct io_kiocb *req) -{ - struct io_kiocb *link; - /* init ->work of the whole link before punting */ io_prep_async_link(req); - link = __io_queue_async_work(req); - + io_wq_enqueue(tctx->io_wq, &req->work); if (link) io_queue_linked_timeout(link); } @@ -1521,18 +1451,22 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, return all_flushed; } -static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, +static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force, struct task_struct *tsk, struct files_struct *files) { + bool ret = true; + if (test_bit(0, &ctx->cq_check_overflow)) { /* iopoll syncs against uring_lock, not completion_lock */ if (ctx->flags & IORING_SETUP_IOPOLL) mutex_lock(&ctx->uring_lock); - __io_cqring_overflow_flush(ctx, force, tsk, files); + ret = __io_cqring_overflow_flush(ctx, force, tsk, files); if (ctx->flags & IORING_SETUP_IOPOLL) mutex_unlock(&ctx->uring_lock); } + + return ret; } static void __io_cqring_fill_event(struct io_kiocb *req, long res, long cflags) @@ -1714,9 +1648,19 @@ static void io_dismantle_req(struct io_kiocb *req) io_put_file(req, req->file, (req->flags & REQ_F_FIXED_FILE)); if (req->fixed_rsrc_refs) percpu_ref_put(req->fixed_rsrc_refs); - io_req_clean_work(req); + + if (req->flags & REQ_F_INFLIGHT) { + struct io_ring_ctx *ctx = req->ctx; + unsigned long flags; + + spin_lock_irqsave(&ctx->inflight_lock, flags); + list_del(&req->inflight_entry); + spin_unlock_irqrestore(&ctx->inflight_lock, flags); + req->flags &= ~REQ_F_INFLIGHT; + } } +/* must to be called somewhat shortly after putting a request */ static inline void io_put_task(struct task_struct *task, int nr) { struct io_uring_task *tctx = task->io_uring; @@ -1800,15 +1744,7 @@ static void io_fail_links(struct io_kiocb *req) trace_io_uring_fail_link(req, link); io_cqring_fill_event(link, -ECANCELED); - /* - * It's ok to free under spinlock as they're not linked anymore, - * but avoid REQ_F_WORK_INITIALIZED because it may deadlock on - * work.fs->lock. - */ - if (link->flags & REQ_F_WORK_INITIALIZED) - io_put_req_deferred(link, 2); - else - io_double_put_req(link); + io_put_req_deferred(link, 2); link = nxt; } io_commit_cqring(ctx); @@ -1845,6 +1781,18 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req) return __io_req_find_next(req); } +static void ctx_flush_and_put(struct io_ring_ctx *ctx) +{ + if (!ctx) + return; + if (ctx->submit_state.comp.nr) { + mutex_lock(&ctx->uring_lock); + io_submit_flush_completions(&ctx->submit_state.comp, ctx); + mutex_unlock(&ctx->uring_lock); + } + percpu_ref_put(&ctx->refs); +} + static bool __tctx_task_work(struct io_uring_task *tctx) { struct io_ring_ctx *ctx = NULL; @@ -1862,30 +1810,20 @@ static bool __tctx_task_work(struct io_uring_task *tctx) node = list.first; while (node) { struct io_wq_work_node *next = node->next; - struct io_ring_ctx *this_ctx; struct io_kiocb *req; req = container_of(node, struct io_kiocb, io_task_work.node); - this_ctx = req->ctx; + if (req->ctx != ctx) { + ctx_flush_and_put(ctx); + ctx = req->ctx; + percpu_ref_get(&ctx->refs); + } + req->task_work.func(&req->task_work); node = next; - - if (!ctx) { - ctx = this_ctx; - } else if (ctx != this_ctx) { - mutex_lock(&ctx->uring_lock); - io_submit_flush_completions(&ctx->submit_state.comp, ctx); - mutex_unlock(&ctx->uring_lock); - ctx = this_ctx; - } - } - - if (ctx && ctx->submit_state.comp.nr) { - mutex_lock(&ctx->uring_lock); - io_submit_flush_completions(&ctx->submit_state.comp, ctx); - mutex_unlock(&ctx->uring_lock); } + ctx_flush_and_put(ctx); return list.first != NULL; } @@ -1893,10 +1831,10 @@ static void tctx_task_work(struct callback_head *cb) { struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work); + clear_bit(0, &tctx->task_state); + while (__tctx_task_work(tctx)) cond_resched(); - - clear_bit(0, &tctx->task_state); } static int io_task_work_add(struct task_struct *tsk, struct io_kiocb *req, @@ -2010,7 +1948,7 @@ static void __io_req_task_submit(struct io_kiocb *req) /* ctx stays valid until unlock, even if we drop all ours ctx->refs */ mutex_lock(&ctx->uring_lock); - if (!ctx->sqo_dead && !(current->flags & PF_EXITING) && !current->in_execve) + if (!(current->flags & PF_EXITING) && !current->in_execve) __io_queue_sqe(req); else __io_req_task_cancel(req, -EFAULT); @@ -2472,23 +2410,32 @@ static bool io_resubmit_prep(struct io_kiocb *req) return false; return !io_setup_async_rw(req, iovec, inline_vecs, &iter, false); } -#endif -static bool io_rw_reissue(struct io_kiocb *req) +static bool io_rw_should_reissue(struct io_kiocb *req) { -#ifdef CONFIG_BLOCK umode_t mode = file_inode(req->file)->i_mode; + struct io_ring_ctx *ctx = req->ctx; if (!S_ISBLK(mode) && !S_ISREG(mode)) return false; - if ((req->flags & REQ_F_NOWAIT) || io_wq_current_is_worker()) + if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() && + !(ctx->flags & IORING_SETUP_IOPOLL))) return false; /* * If ref is dying, we might be running poll reap from the exit work. * Don't attempt to reissue from that path, just let it fail with * -EAGAIN. */ - if (percpu_ref_is_dying(&req->ctx->refs)) + if (percpu_ref_is_dying(&ctx->refs)) + return false; + return true; +} +#endif + +static bool io_rw_reissue(struct io_kiocb *req) +{ +#ifdef CONFIG_BLOCK + if (!io_rw_should_reissue(req)) return false; lockdep_assert_held(&req->ctx->uring_lock); @@ -2531,6 +2478,19 @@ static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) { struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw.kiocb); +#ifdef CONFIG_BLOCK + /* Rewind iter, if we have one. iopoll path resubmits as usual */ + if (res == -EAGAIN && io_rw_should_reissue(req)) { + struct io_async_rw *rw = req->async_data; + + if (rw) + iov_iter_revert(&rw->iter, + req->result - iov_iter_count(&rw->iter)); + else if (!io_resubmit_prep(req)) + res = -EIO; + } +#endif + if (kiocb->ki_flags & IOCB_WRITE) kiocb_end_write(req); @@ -3279,6 +3239,8 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags) ret = io_iter_do_read(req, iter); if (ret == -EIOCBQUEUED) { + if (req->async_data) + iov_iter_revert(iter, io_size - iov_iter_count(iter)); goto out_free; } else if (ret == -EAGAIN) { /* IOPOLL retry should happen for io-wq threads */ @@ -3324,6 +3286,7 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags) if (ret == -EIOCBQUEUED) return 0; /* we got some bytes, but not all. retry. */ + kiocb->ki_flags &= ~IOCB_WAITQ; } while (ret > 0 && ret < io_size); done: kiocb_done(kiocb, ret, issue_flags); @@ -3410,6 +3373,8 @@ static int io_write(struct io_kiocb *req, unsigned int issue_flags) /* no retry on NONBLOCK nor RWF_NOWAIT */ if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT)) goto done; + if (ret2 == -EIOCBQUEUED && req->async_data) + iov_iter_revert(iter, io_size - iov_iter_count(iter)); if (!force_nonblock || ret2 != -EAGAIN) { /* IOPOLL retry should happen for io-wq threads */ if ((req->ctx->flags & IORING_SETUP_IOPOLL) && ret2 == -EAGAIN) @@ -3588,7 +3553,6 @@ static int __io_splice_prep(struct io_kiocb *req, * Splice operation will be punted aync, and here need to * modify io_wq_work.flags, so initialize io_wq_work firstly. */ - io_req_init_async(req); req->work.flags |= IO_WQ_WORK_UNBOUND; } @@ -3864,7 +3828,7 @@ err: static int io_openat(struct io_kiocb *req, unsigned int issue_flags) { - return io_openat2(req, issue_flags & IO_URING_F_NONBLOCK); + return io_openat2(req, issue_flags); } static int io_remove_buffers_prep(struct io_kiocb *req, @@ -5003,6 +4967,9 @@ static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt, pt->error = -EINVAL; return; } + /* double add on the same waitqueue head, ignore */ + if (poll->head == head) + return; poll = kmalloc(sizeof(*poll), GFP_ATOMIC); if (!poll) { pt->error = -ENOMEM; @@ -5538,6 +5505,7 @@ static int io_timeout_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe, data->mode = io_translate_timeout_mode(flags); hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode); + io_req_track_inflight(req); return 0; } @@ -5945,8 +5913,22 @@ static void __io_clean_op(struct io_kiocb *req) static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) { struct io_ring_ctx *ctx = req->ctx; + const struct cred *creds = NULL; int ret; + if (req->work.personality) { + const struct cred *new_creds; + + if (!(issue_flags & IO_URING_F_NONBLOCK)) + mutex_lock(&ctx->uring_lock); + new_creds = idr_find(&ctx->personality_idr, req->work.personality); + if (!(issue_flags & IO_URING_F_NONBLOCK)) + mutex_unlock(&ctx->uring_lock); + if (!new_creds) + return -EINVAL; + creds = override_creds(new_creds); + } + switch (req->opcode) { case IORING_OP_NOP: ret = io_nop(req, issue_flags); @@ -6053,6 +6035,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) break; } + if (creds) + revert_creds(creds); + if (ret) return ret; @@ -6216,18 +6201,10 @@ static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req) static void __io_queue_sqe(struct io_kiocb *req) { struct io_kiocb *linked_timeout = io_prep_linked_timeout(req); - const struct cred *old_creds = NULL; int ret; - if ((req->flags & REQ_F_WORK_INITIALIZED) && req->work.creds && - req->work.creds != current_cred()) - old_creds = override_creds(req->work.creds); - ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER); - if (old_creds) - revert_creds(old_creds); - /* * We async punt it if the file wasn't marked NOWAIT, or if the file * doesn't support non-blocking read/write attempts @@ -6314,7 +6291,7 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, { struct io_submit_state *state; unsigned int sqe_flags; - int id, ret = 0; + int ret = 0; req->opcode = READ_ONCE(sqe->opcode); /* same numerical values with corresponding REQ_F_*, safe to copy */ @@ -6346,15 +6323,9 @@ static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req, !io_op_defs[req->opcode].buffer_select) return -EOPNOTSUPP; - id = READ_ONCE(sqe->personality); - if (id) { - __io_req_init_async(req); - req->work.creds = idr_find(&ctx->personality_idr, id); - if (unlikely(!req->work.creds)) - return -EINVAL; - get_cred(req->work.creds); - } - + req->work.list.next = NULL; + req->work.flags = 0; + req->work.personality = READ_ONCE(sqe->personality); state = &ctx->submit_state; /* @@ -6616,8 +6587,7 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries) if (!list_empty(&ctx->iopoll_list)) io_do_iopoll(ctx, &nr_events, 0); - if (to_submit && !ctx->sqo_dead && - likely(!percpu_ref_is_dying(&ctx->refs))) + if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs))) ret = io_submit_sqes(ctx, to_submit); mutex_unlock(&ctx->uring_lock); } @@ -6686,7 +6656,7 @@ static void io_sq_thread_parkme(struct io_sq_data *sqd) * wait_task_inactive(). */ preempt_disable(); - complete(&sqd->completion); + complete(&sqd->parked); schedule_preempt_disabled(); preempt_enable(); } @@ -6703,7 +6673,6 @@ static int io_sq_thread(void *data) sprintf(buf, "iou-sqp-%d", sqd->task_pid); set_task_comm(current, buf); - sqd->thread = current; current->pf_io_worker = NULL; if (sqd->sq_cpu != -1) @@ -6712,8 +6681,6 @@ static int io_sq_thread(void *data) set_cpus_allowed_ptr(current, cpu_online_mask); current->flags |= PF_NO_SETAFFINITY; - complete(&sqd->completion); - wait_for_completion(&sqd->startup); while (!io_sq_thread_should_stop(sqd)) { @@ -6770,6 +6737,7 @@ static int io_sq_thread(void *data) io_ring_set_wakeup_flag(ctx); schedule(); + try_to_freeze(); list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) io_ring_clear_wakeup_flag(ctx); } @@ -6784,18 +6752,25 @@ static int io_sq_thread(void *data) io_run_task_work(); /* - * Clear thread under lock so that concurrent parks work correctly + * Ensure that we park properly if racing with someone trying to park + * while we're exiting. If we fail to grab the lock, check park and + * park if necessary. The ordering with the park bit and the lock + * ensures that we catch this reliably. */ - complete_all(&sqd->completion); - mutex_lock(&sqd->lock); + if (!mutex_trylock(&sqd->lock)) { + if (io_sq_thread_should_park(sqd)) + io_sq_thread_parkme(sqd); + mutex_lock(&sqd->lock); + } + sqd->thread = NULL; list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { ctx->sqo_exec = 1; io_ring_set_wakeup_flag(ctx); } - mutex_unlock(&sqd->lock); complete(&sqd->exited); + mutex_unlock(&sqd->lock); do_exit(0); } @@ -6917,11 +6892,16 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); trace_io_uring_cqring_wait(ctx, min_events); do { - io_cqring_overflow_flush(ctx, false, NULL, NULL); + /* if we can't even flush overflow, don't wait for more */ + if (!io_cqring_overflow_flush(ctx, false, NULL, NULL)) { + ret = -EBUSY; + break; + } prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, TASK_INTERRUPTIBLE); ret = io_cqring_wait_schedule(ctx, &iowq, &timeout); finish_wait(&ctx->wait, &iowq.wq); + cond_resched(); } while (ret > 0); restore_saved_sigmask_unless(ret == -EINTR); @@ -7091,40 +7071,42 @@ static int io_sqe_files_unregister(struct io_ring_ctx *ctx) static void io_sq_thread_unpark(struct io_sq_data *sqd) __releases(&sqd->lock) { - if (!sqd->thread) - return; if (sqd->thread == current) return; clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); - wake_up_state(sqd->thread, TASK_PARKED); + if (sqd->thread) + wake_up_state(sqd->thread, TASK_PARKED); mutex_unlock(&sqd->lock); } -static bool io_sq_thread_park(struct io_sq_data *sqd) +static void io_sq_thread_park(struct io_sq_data *sqd) __acquires(&sqd->lock) { if (sqd->thread == current) - return true; - mutex_lock(&sqd->lock); - if (!sqd->thread) { - mutex_unlock(&sqd->lock); - return false; - } + return; set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state); - wake_up_process(sqd->thread); - wait_for_completion(&sqd->completion); - return true; + mutex_lock(&sqd->lock); + if (sqd->thread) { + wake_up_process(sqd->thread); + wait_for_completion(&sqd->parked); + } } static void io_sq_thread_stop(struct io_sq_data *sqd) { - if (!sqd->thread) + if (test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state)) return; - - set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); - WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state)); - wake_up_process(sqd->thread); - wait_for_completion(&sqd->exited); + mutex_lock(&sqd->lock); + if (sqd->thread) { + set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); + WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state)); + wake_up_process(sqd->thread); + mutex_unlock(&sqd->lock); + wait_for_completion(&sqd->exited); + WARN_ON_ONCE(sqd->thread); + } else { + mutex_unlock(&sqd->lock); + } } static void io_put_sq_data(struct io_sq_data *sqd) @@ -7203,7 +7185,7 @@ static struct io_sq_data *io_get_sq_data(struct io_uring_params *p) mutex_init(&sqd->lock); init_waitqueue_head(&sqd->wait); init_completion(&sqd->startup); - init_completion(&sqd->completion); + init_completion(&sqd->parked); init_completion(&sqd->exited); return sqd; } @@ -7834,6 +7816,8 @@ void __io_uring_free(struct task_struct *tsk) struct io_uring_task *tctx = tsk->io_uring; WARN_ON_ONCE(!xa_empty(&tctx->xa)); + WARN_ON_ONCE(tctx->io_wq); + percpu_counter_destroy(&tctx->inflight); kfree(tctx); tsk->io_uring = NULL; @@ -7841,21 +7825,22 @@ void __io_uring_free(struct task_struct *tsk) static int io_sq_thread_fork(struct io_sq_data *sqd, struct io_ring_ctx *ctx) { + struct task_struct *tsk; int ret; clear_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); - reinit_completion(&sqd->completion); - ctx->sqo_dead = ctx->sqo_exec = 0; + reinit_completion(&sqd->parked); + ctx->sqo_exec = 0; sqd->task_pid = current->pid; - current->flags |= PF_IO_WORKER; - ret = io_wq_fork_thread(io_sq_thread, sqd); - current->flags &= ~PF_IO_WORKER; - if (ret < 0) { - sqd->thread = NULL; - return ret; - } - wait_for_completion(&sqd->completion); - return io_uring_alloc_task_context(sqd->thread, ctx); + tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE); + if (IS_ERR(tsk)) + return PTR_ERR(tsk); + ret = io_uring_alloc_task_context(tsk, ctx); + if (ret) + set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); + sqd->thread = tsk; + wake_up_new_task(tsk); + return ret; } static int io_sq_offload_create(struct io_ring_ctx *ctx, @@ -7878,6 +7863,7 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx, fdput(f); } if (ctx->flags & IORING_SETUP_SQPOLL) { + struct task_struct *tsk; struct io_sq_data *sqd; ret = -EPERM; @@ -7919,15 +7905,16 @@ static int io_sq_offload_create(struct io_ring_ctx *ctx, } sqd->task_pid = current->pid; - current->flags |= PF_IO_WORKER; - ret = io_wq_fork_thread(io_sq_thread, sqd); - current->flags &= ~PF_IO_WORKER; - if (ret < 0) { - sqd->thread = NULL; + tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE); + if (IS_ERR(tsk)) { + ret = PTR_ERR(tsk); goto err; } - wait_for_completion(&sqd->completion); - ret = io_uring_alloc_task_context(sqd->thread, ctx); + ret = io_uring_alloc_task_context(tsk, ctx); + if (ret) + set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state); + sqd->thread = tsk; + wake_up_new_task(tsk); if (ret) goto err; } else if (p->flags & IORING_SETUP_SQ_AFF) { @@ -7946,6 +7933,7 @@ static void io_sq_offload_start(struct io_ring_ctx *ctx) { struct io_sq_data *sqd = ctx->sq_data; + ctx->flags &= ~IORING_SETUP_R_DISABLED; if (ctx->flags & IORING_SETUP_SQPOLL) complete(&sqd->startup); } @@ -8384,7 +8372,7 @@ static void io_req_cache_free(struct list_head *list, struct task_struct *tsk) } } -static void io_req_caches_free(struct io_ring_ctx *ctx, struct task_struct *tsk) +static void io_req_caches_free(struct io_ring_ctx *ctx) { struct io_submit_state *submit_state = &ctx->submit_state; struct io_comp_state *cs = &ctx->submit_state.comp; @@ -8444,7 +8432,7 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx) percpu_ref_exit(&ctx->refs); free_uid(ctx->user); - io_req_caches_free(ctx, NULL); + io_req_caches_free(ctx); if (ctx->hash_map) io_wq_put_hash(ctx->hash_map); kfree(ctx->cancel_hash); @@ -8512,16 +8500,13 @@ static int io_remove_personalities(int id, void *p, void *data) return 0; } -static void io_run_ctx_fallback(struct io_ring_ctx *ctx) +static bool io_run_ctx_fallback(struct io_ring_ctx *ctx) { - struct callback_head *work, *head, *next; + struct callback_head *work, *next; + bool executed = false; do { - do { - head = NULL; - work = READ_ONCE(ctx->exit_task_work); - } while (cmpxchg(&ctx->exit_task_work, work, head) != work); - + work = xchg(&ctx->exit_task_work, NULL); if (!work) break; @@ -8531,7 +8516,10 @@ static void io_run_ctx_fallback(struct io_ring_ctx *ctx) work = next; cond_resched(); } while (work); + executed = true; } while (1); + + return executed; } static void io_ring_exit_work(struct work_struct *work) @@ -8547,7 +8535,6 @@ static void io_ring_exit_work(struct work_struct *work) */ do { io_uring_try_cancel_requests(ctx, NULL, NULL); - io_run_ctx_fallback(ctx); } while (!wait_for_completion_timeout(&ctx->ref_comp, HZ/20)); io_ring_ctx_free(ctx); } @@ -8556,10 +8543,6 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) { mutex_lock(&ctx->uring_lock); percpu_ref_kill(&ctx->refs); - - if (WARN_ON_ONCE((ctx->flags & IORING_SETUP_SQPOLL) && !ctx->sqo_dead)) - ctx->sqo_dead = 1; - /* if force is set, the ring is going away. always drop after that */ ctx->cq_overflow_flushed = 1; if (ctx->rings) @@ -8648,7 +8631,8 @@ static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx, struct files_struct *files) { struct io_task_cancel cancel = { .task = task, .files = files, }; - struct io_uring_task *tctx = current->io_uring; + struct task_struct *tctx_task = task ?: current; + struct io_uring_task *tctx = tctx_task->io_uring; while (1) { enum io_wq_cancel cret; @@ -8671,6 +8655,7 @@ static void io_uring_try_cancel_requests(struct io_ring_ctx *ctx, ret |= io_poll_remove_all(ctx, task, files); ret |= io_kill_timeouts(ctx, task, files); ret |= io_run_task_work(); + ret |= io_run_ctx_fallback(ctx); io_cqring_overflow_flush(ctx, true, task, files); if (!ret) break; @@ -8718,17 +8703,6 @@ static void io_uring_cancel_files(struct io_ring_ctx *ctx, } } -static void io_disable_sqo_submit(struct io_ring_ctx *ctx) -{ - mutex_lock(&ctx->uring_lock); - ctx->sqo_dead = 1; - mutex_unlock(&ctx->uring_lock); - - /* make sure callers enter the ring to get error */ - if (ctx->rings) - io_ring_set_wakeup_flag(ctx); -} - /* * We need to iteratively cancel requests, in case a request has dependent * hard links. These persist even for failure of cancelations, hence keep @@ -8738,15 +8712,17 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx, struct files_struct *files) { struct task_struct *task = current; - bool did_park = false; if ((ctx->flags & IORING_SETUP_SQPOLL) && ctx->sq_data) { - io_disable_sqo_submit(ctx); - did_park = io_sq_thread_park(ctx->sq_data); - if (did_park) { - task = ctx->sq_data->thread; - atomic_inc(&task->io_uring->in_idle); + /* never started, nothing to cancel */ + if (ctx->flags & IORING_SETUP_R_DISABLED) { + io_sq_offload_start(ctx); + return; } + io_sq_thread_park(ctx->sq_data); + task = ctx->sq_data->thread; + if (task) + atomic_inc(&task->io_uring->in_idle); } io_cancel_defer_files(ctx, task, files); @@ -8755,10 +8731,10 @@ static void io_uring_cancel_task_requests(struct io_ring_ctx *ctx, if (!files) io_uring_try_cancel_requests(ctx, task, NULL); - if (did_park) { + if (task) atomic_dec(&task->io_uring->in_idle); + if (ctx->sq_data) io_sq_thread_unpark(ctx->sq_data); - } } /* @@ -8786,10 +8762,6 @@ static int io_uring_add_task_file(struct io_ring_ctx *ctx, struct file *file) fput(file); return ret; } - - /* one and only SQPOLL file note, held by sqo_task */ - WARN_ON_ONCE((ctx->flags & IORING_SETUP_SQPOLL) && - current != ctx->sqo_task); } tctx->last = file; } @@ -8819,13 +8791,17 @@ static void io_uring_del_task_file(struct file *file) fput(file); } -static void io_uring_remove_task_files(struct io_uring_task *tctx) +static void io_uring_clean_tctx(struct io_uring_task *tctx) { struct file *file; unsigned long index; xa_for_each(&tctx->xa, index, file) io_uring_del_task_file(file); + if (tctx->io_wq) { + io_wq_put_and_exit(tctx->io_wq); + tctx->io_wq = NULL; + } } void __io_uring_files_cancel(struct files_struct *files) @@ -8840,13 +8816,8 @@ void __io_uring_files_cancel(struct files_struct *files) io_uring_cancel_task_requests(file->private_data, files); atomic_dec(&tctx->in_idle); - if (files) { - io_uring_remove_task_files(tctx); - if (tctx->io_wq) { - io_wq_put(tctx->io_wq); - tctx->io_wq = NULL; - } - } + if (files) + io_uring_clean_tctx(tctx); } static s64 tctx_inflight(struct io_uring_task *tctx) @@ -8863,11 +8834,12 @@ static void io_uring_cancel_sqpoll(struct io_ring_ctx *ctx) if (!sqd) return; - io_disable_sqo_submit(ctx); - if (!io_sq_thread_park(sqd)) + io_sq_thread_park(sqd); + if (!sqd->thread || !sqd->thread->io_uring) { + io_sq_thread_unpark(sqd); return; + } tctx = ctx->sq_data->thread->io_uring; - atomic_inc(&tctx->in_idle); do { /* read completions before cancelations */ @@ -8903,7 +8875,6 @@ void __io_uring_task_cancel(void) /* make sure overflow events are dropped */ atomic_inc(&tctx->in_idle); - /* trigger io_disable_sqo_submit() */ if (tctx->sqpoll) { struct file *file; unsigned long index; @@ -8933,53 +8904,9 @@ void __io_uring_task_cancel(void) atomic_dec(&tctx->in_idle); - io_uring_remove_task_files(tctx); -} - -static int io_uring_flush(struct file *file, void *data) -{ - struct io_uring_task *tctx = current->io_uring; - struct io_ring_ctx *ctx = file->private_data; - - /* Ignore helper thread files exit */ - if (current->flags & PF_IO_WORKER) - return 0; - - if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) { - io_uring_cancel_task_requests(ctx, NULL); - io_req_caches_free(ctx, current); - } - - io_run_ctx_fallback(ctx); - - if (!tctx) - return 0; - - /* we should have cancelled and erased it before PF_EXITING */ - WARN_ON_ONCE((current->flags & PF_EXITING) && - xa_load(&tctx->xa, (unsigned long)file)); - - /* - * fput() is pending, will be 2 if the only other ref is our potential - * task file note. If the task is exiting, drop regardless of count. - */ - if (atomic_long_read(&file->f_count) != 2) - return 0; - - if (ctx->flags & IORING_SETUP_SQPOLL) { - /* there is only one file note, which is owned by sqo_task */ - WARN_ON_ONCE(ctx->sqo_task != current && - xa_load(&tctx->xa, (unsigned long)file)); - /* sqo_dead check is for when this happens after cancellation */ - WARN_ON_ONCE(ctx->sqo_task == current && !ctx->sqo_dead && - !xa_load(&tctx->xa, (unsigned long)file)); - - io_disable_sqo_submit(ctx); - } - - if (!(ctx->flags & IORING_SETUP_SQPOLL) || ctx->sqo_task == current) - io_uring_del_task_file(file); - return 0; + io_uring_clean_tctx(tctx); + /* all current's requests should be gone, we can kill tctx */ + __io_uring_free(current); } static void *io_uring_validate_mmap_request(struct file *file, @@ -9060,22 +8987,14 @@ static int io_sqpoll_wait_sq(struct io_ring_ctx *ctx) do { if (!io_sqring_full(ctx)) break; - prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE); - if (unlikely(ctx->sqo_dead)) { - ret = -EOWNERDEAD; - goto out; - } - if (!io_sqring_full(ctx)) break; - schedule(); } while (!signal_pending(current)); finish_wait(&ctx->sqo_sq_wait, &wait); -out: return ret; } @@ -9157,8 +9076,6 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, ctx->sqo_exec = 0; } ret = -EOWNERDEAD; - if (unlikely(ctx->sqo_dead)) - goto out; if (flags & IORING_ENTER_SQ_WAKEUP) wake_up(&ctx->sq_data->wait); if (flags & IORING_ENTER_SQ_WAIT) { @@ -9313,7 +9230,6 @@ static void io_uring_show_fdinfo(struct seq_file *m, struct file *f) static const struct file_operations io_uring_fops = { .release = io_uring_release, - .flush = io_uring_flush, .mmap = io_uring_mmap, #ifndef CONFIG_MMU .get_unmapped_area = io_uring_nommu_get_unmapped_area, @@ -9468,7 +9384,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, ctx->compat = in_compat_syscall(); if (!capable(CAP_IPC_LOCK)) ctx->user = get_uid(current_user()); - ctx->sqo_task = current; /* * This is just grabbed for accounting purposes. When a process exits, @@ -9531,7 +9446,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, */ ret = io_uring_install_fd(ctx, file); if (ret < 0) { - io_disable_sqo_submit(ctx); /* fput will clean it up */ fput(file); return ret; @@ -9540,7 +9454,6 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p, trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags); return ret; err: - io_disable_sqo_submit(ctx); io_ring_ctx_wait_and_kill(ctx); return ret; } @@ -9708,10 +9621,7 @@ static int io_register_enable_rings(struct io_ring_ctx *ctx) if (ctx->restrictions.registered) ctx->restricted = 1; - ctx->flags &= ~IORING_SETUP_R_DISABLED; - io_sq_offload_start(ctx); - return 0; } diff --git a/include/linux/io_uring.h b/include/linux/io_uring.h index 51ede771cd99..7cb7bd0e334c 100644 --- a/include/linux/io_uring.h +++ b/include/linux/io_uring.h @@ -38,7 +38,7 @@ void __io_uring_free(struct task_struct *tsk); static inline void io_uring_task_cancel(void) { - if (current->io_uring && !xa_empty(¤t->io_uring->xa)) + if (current->io_uring) __io_uring_task_cancel(); } static inline void io_uring_files_cancel(struct files_struct *files) diff --git a/include/linux/sched/task.h b/include/linux/sched/task.h index c0f71f2e7160..ef02be869cf2 100644 --- a/include/linux/sched/task.h +++ b/include/linux/sched/task.h @@ -31,6 +31,7 @@ struct kernel_clone_args { /* Number of elements in *set_tid */ size_t set_tid_size; int cgroup; + int io_thread; struct cgroup *cgrp; struct css_set *cset; }; @@ -82,6 +83,7 @@ extern void exit_files(struct task_struct *); extern void exit_itimers(struct signal_struct *); extern pid_t kernel_clone(struct kernel_clone_args *kargs); +struct task_struct *create_io_thread(int (*fn)(void *), void *arg, int node); struct task_struct *fork_idle(int); struct mm_struct *copy_init_mm(void); extern pid_t kernel_thread(int (*fn)(void *), void *arg, unsigned long flags); diff --git a/kernel/fork.c b/kernel/fork.c index d66cd1014211..d3171e8e88e5 100644 --- a/kernel/fork.c +++ b/kernel/fork.c @@ -1940,6 +1940,8 @@ static __latent_entropy struct task_struct *copy_process( p = dup_task_struct(current, node); if (!p) goto fork_out; + if (args->io_thread) + p->flags |= PF_IO_WORKER; /* * This _must_ happen before we call free_task(), i.e. before we jump @@ -2410,6 +2412,34 @@ struct mm_struct *copy_init_mm(void) return dup_mm(NULL, &init_mm); } +/* + * This is like kernel_clone(), but shaved down and tailored to just + * creating io_uring workers. It returns a created task, or an error pointer. + * The returned task is inactive, and the caller must fire it up through + * wake_up_new_task(p). All signals are blocked in the created task. + */ +struct task_struct *create_io_thread(int (*fn)(void *), void *arg, int node) +{ + unsigned long flags = CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD| + CLONE_IO; + struct kernel_clone_args args = { + .flags = ((lower_32_bits(flags) | CLONE_VM | + CLONE_UNTRACED) & ~CSIGNAL), + .exit_signal = (lower_32_bits(flags) & CSIGNAL), + .stack = (unsigned long)fn, + .stack_size = (unsigned long)arg, + .io_thread = 1, + }; + struct task_struct *tsk; + + tsk = copy_process(NULL, 0, node, &args); + if (!IS_ERR(tsk)) { + sigfillset(&tsk->blocked); + sigdelsetmask(&tsk->blocked, sigmask(SIGKILL)); + } + return tsk; +} + /* * Ok, this is the main fork-routine. *