workqueue: disable irq while manipulating PENDING

Queueing operations use WORK_STRUCT_PENDING_BIT to synchronize access
to the target work item.  They first try to claim the bit and proceed
with queueing only after that succeeds and there's a window between
PENDING being set and the actual queueing where the task can be
interrupted or preempted.

There's also a similar window in process_one_work() when clearing
PENDING.  A work item is dequeued, gcwq->lock is released and then
PENDING is cleared and the worker might get interrupted or preempted
between releasing gcwq->lock and clearing PENDING.

cancel[_delayed]_work_sync() tries to claim or steal PENDING.  The
function assumes that a work item with PENDING is either queued or in
the process of being [de]queued.  In the latter case, it busy-loops
until either the work item loses PENDING or is queued.  If canceling
coincides with the above described interrupts or preemptions, the
canceling task will busy-loop while the queueing or executing task is
preempted.

This patch keeps irq disabled across claiming PENDING and actual
queueing and moves PENDING clearing in process_one_work() inside
gcwq->lock so that busy looping from PENDING && !queued doesn't wait
for interrupted/preempted tasks.  Note that, in process_one_work(),
setting last CPU and clearing PENDING got merged into single
operation.

This removes possible long busy-loops and will allow using
try_to_grab_pending() from bh and irq contexts.

v2: __queue_work() was testing preempt_count() to ensure that the
    caller has disabled preemption.  This triggers spuriously if
    !CONFIG_PREEMPT_COUNT.  Use preemptible() instead.  Reported by
    Fengguang Wu.

v3: Disable irq instead of preemption.  IRQ will be disabled while
    grabbing gcwq->lock later anyway and this allows using
    try_to_grab_pending() from bh and irq contexts.

Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: Oleg Nesterov <oleg@redhat.com>
Cc: Fengguang Wu <fengguang.wu@intel.com>
This commit is contained in:
Tejun Heo 2012-08-03 10:30:45 -07:00
parent 959d1af8cf
commit 8930caba3d

View file

@ -537,9 +537,10 @@ static int work_next_color(int color)
* work is on queue. Once execution starts, WORK_STRUCT_CWQ is * work is on queue. Once execution starts, WORK_STRUCT_CWQ is
* cleared and the work data contains the cpu number it was last on. * cleared and the work data contains the cpu number it was last on.
* *
* set_work_{cwq|cpu}() and clear_work_data() can be used to set the * set_work_cwq(), set_work_cpu_and_clear_pending() and clear_work_data()
* cwq, cpu or clear work->data. These functions should only be * can be used to set the cwq, cpu or clear work->data. These functions
* called while the work is owned - ie. while the PENDING bit is set. * should only be called while the work is owned - ie. while the PENDING
* bit is set.
* *
* get_work_[g]cwq() can be used to obtain the gcwq or cwq * get_work_[g]cwq() can be used to obtain the gcwq or cwq
* corresponding to a work. gcwq is available once the work has been * corresponding to a work. gcwq is available once the work has been
@ -561,9 +562,10 @@ static void set_work_cwq(struct work_struct *work,
WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags); WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags);
} }
static void set_work_cpu(struct work_struct *work, unsigned int cpu) static void set_work_cpu_and_clear_pending(struct work_struct *work,
unsigned int cpu)
{ {
set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING); set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, 0);
} }
static void clear_work_data(struct work_struct *work) static void clear_work_data(struct work_struct *work)
@ -981,7 +983,14 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct cpu_workqueue_struct *cwq; struct cpu_workqueue_struct *cwq;
struct list_head *worklist; struct list_head *worklist;
unsigned int work_flags; unsigned int work_flags;
unsigned long flags;
/*
* While a work item is PENDING && off queue, a task trying to
* steal the PENDING will busy-loop waiting for it to either get
* queued or lose PENDING. Grabbing PENDING and queueing should
* happen with IRQ disabled.
*/
WARN_ON_ONCE(!irqs_disabled());
debug_work_activate(work); debug_work_activate(work);
@ -1008,7 +1017,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
(last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) { (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
struct worker *worker; struct worker *worker;
spin_lock_irqsave(&last_gcwq->lock, flags); spin_lock(&last_gcwq->lock);
worker = find_worker_executing_work(last_gcwq, work); worker = find_worker_executing_work(last_gcwq, work);
@ -1016,14 +1025,15 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
gcwq = last_gcwq; gcwq = last_gcwq;
else { else {
/* meh... not running there, queue here */ /* meh... not running there, queue here */
spin_unlock_irqrestore(&last_gcwq->lock, flags); spin_unlock(&last_gcwq->lock);
spin_lock_irqsave(&gcwq->lock, flags); spin_lock(&gcwq->lock);
} }
} else } else {
spin_lock_irqsave(&gcwq->lock, flags); spin_lock(&gcwq->lock);
}
} else { } else {
gcwq = get_gcwq(WORK_CPU_UNBOUND); gcwq = get_gcwq(WORK_CPU_UNBOUND);
spin_lock_irqsave(&gcwq->lock, flags); spin_lock(&gcwq->lock);
} }
/* gcwq determined, get cwq and queue */ /* gcwq determined, get cwq and queue */
@ -1031,7 +1041,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
trace_workqueue_queue_work(cpu, cwq, work); trace_workqueue_queue_work(cpu, cwq, work);
if (WARN_ON(!list_empty(&work->entry))) { if (WARN_ON(!list_empty(&work->entry))) {
spin_unlock_irqrestore(&gcwq->lock, flags); spin_unlock(&gcwq->lock);
return; return;
} }
@ -1049,7 +1059,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
insert_work(cwq, work, worklist, work_flags); insert_work(cwq, work, worklist, work_flags);
spin_unlock_irqrestore(&gcwq->lock, flags); spin_unlock(&gcwq->lock);
} }
/** /**
@ -1067,11 +1077,16 @@ bool queue_work_on(int cpu, struct workqueue_struct *wq,
struct work_struct *work) struct work_struct *work)
{ {
bool ret = false; bool ret = false;
unsigned long flags;
local_irq_save(flags);
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
__queue_work(cpu, wq, work); __queue_work(cpu, wq, work);
ret = true; ret = true;
} }
local_irq_restore(flags);
return ret; return ret;
} }
EXPORT_SYMBOL_GPL(queue_work_on); EXPORT_SYMBOL_GPL(queue_work_on);
@ -1102,7 +1117,9 @@ static void delayed_work_timer_fn(unsigned long __data)
struct delayed_work *dwork = (struct delayed_work *)__data; struct delayed_work *dwork = (struct delayed_work *)__data;
struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work); struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
local_irq_disable();
__queue_work(smp_processor_id(), cwq->wq, &dwork->work); __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
local_irq_enable();
} }
/** /**
@ -1120,6 +1137,10 @@ bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct timer_list *timer = &dwork->timer; struct timer_list *timer = &dwork->timer;
struct work_struct *work = &dwork->work; struct work_struct *work = &dwork->work;
bool ret = false; bool ret = false;
unsigned long flags;
/* read the comment in __queue_work() */
local_irq_save(flags);
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) { if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
unsigned int lcpu; unsigned int lcpu;
@ -1156,6 +1177,8 @@ bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
add_timer(timer); add_timer(timer);
ret = true; ret = true;
} }
local_irq_restore(flags);
return ret; return ret;
} }
EXPORT_SYMBOL_GPL(queue_delayed_work_on); EXPORT_SYMBOL_GPL(queue_delayed_work_on);
@ -1970,15 +1993,13 @@ __acquires(&gcwq->lock)
return; return;
} }
/* claim and process */ /* claim and dequeue */
debug_work_deactivate(work); debug_work_deactivate(work);
hlist_add_head(&worker->hentry, bwh); hlist_add_head(&worker->hentry, bwh);
worker->current_work = work; worker->current_work = work;
worker->current_cwq = cwq; worker->current_cwq = cwq;
work_color = get_work_color(work); work_color = get_work_color(work);
/* record the current cpu number in the work data and dequeue */
set_work_cpu(work, gcwq->cpu);
list_del_init(&work->entry); list_del_init(&work->entry);
/* /*
@ -1995,10 +2016,18 @@ __acquires(&gcwq->lock)
if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool)) if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool))
wake_up_worker(pool); wake_up_worker(pool);
spin_unlock_irq(&gcwq->lock); /*
* Record the last CPU and clear PENDING. The following wmb is
* paired with the implied mb in test_and_set_bit(PENDING) and
* ensures all updates to @work made here are visible to and
* precede any updates by the next PENDING owner. Also, clear
* PENDING inside @gcwq->lock so that PENDING and queued state
* changes happen together while IRQ is disabled.
*/
smp_wmb();
set_work_cpu_and_clear_pending(work, gcwq->cpu);
smp_wmb(); /* paired with test_and_set_bit(PENDING) */ spin_unlock_irq(&gcwq->lock);
work_clear_pending(work);
lock_map_acquire_read(&cwq->wq->lockdep_map); lock_map_acquire_read(&cwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map); lock_map_acquire(&lockdep_map);
@ -2836,9 +2865,11 @@ EXPORT_SYMBOL_GPL(cancel_work_sync);
*/ */
bool flush_delayed_work(struct delayed_work *dwork) bool flush_delayed_work(struct delayed_work *dwork)
{ {
local_irq_disable();
if (del_timer_sync(&dwork->timer)) if (del_timer_sync(&dwork->timer))
__queue_work(raw_smp_processor_id(), __queue_work(raw_smp_processor_id(),
get_work_cwq(&dwork->work)->wq, &dwork->work); get_work_cwq(&dwork->work)->wq, &dwork->work);
local_irq_enable();
return flush_work(&dwork->work); return flush_work(&dwork->work);
} }
EXPORT_SYMBOL(flush_delayed_work); EXPORT_SYMBOL(flush_delayed_work);
@ -2857,9 +2888,11 @@ EXPORT_SYMBOL(flush_delayed_work);
*/ */
bool flush_delayed_work_sync(struct delayed_work *dwork) bool flush_delayed_work_sync(struct delayed_work *dwork)
{ {
local_irq_disable();
if (del_timer_sync(&dwork->timer)) if (del_timer_sync(&dwork->timer))
__queue_work(raw_smp_processor_id(), __queue_work(raw_smp_processor_id(),
get_work_cwq(&dwork->work)->wq, &dwork->work); get_work_cwq(&dwork->work)->wq, &dwork->work);
local_irq_enable();
return flush_work_sync(&dwork->work); return flush_work_sync(&dwork->work);
} }
EXPORT_SYMBOL(flush_delayed_work_sync); EXPORT_SYMBOL(flush_delayed_work_sync);