tweak Duct code
parent
cec30cc001
commit
4f12f41871
|
@ -17,9 +17,8 @@ trait Duct {
|
|||
protected val process: Duct.ReceiveAsync
|
||||
|
||||
def !(msg: Any): Unit =
|
||||
if (stateRef.single.getAndTransform {
|
||||
case None => Some(Queue.empty)
|
||||
case Some(q) => Some(q enqueue msg)
|
||||
if (stateRef.single.getAndTransform { q =>
|
||||
Some(q.fold(Queue.empty[Any])(_ enqueue msg))
|
||||
} isEmpty) run(msg)
|
||||
|
||||
def queueSize = stateRef.single().??(_.size)
|
||||
|
@ -36,9 +35,10 @@ trait Duct {
|
|||
|
||||
private[this] val postRun = (_: Any) =>
|
||||
stateRef.single.getAndTransform {
|
||||
case Some(q) => if (q.isEmpty) None else Some(q.tail)
|
||||
case None => None // Shouldn't happen...
|
||||
} foreach { q => if (q.nonEmpty) run(q.head) }
|
||||
_ flatMap { q =>
|
||||
if (q.isEmpty) None else Some(q.tail)
|
||||
}
|
||||
} flatMap (_.headOption) foreach run
|
||||
}
|
||||
|
||||
object Duct {
|
||||
|
|
Loading…
Reference in New Issue