fix irwin stream sub channel
parent
d59f702ae8
commit
adb4303d56
|
@ -7,25 +7,25 @@ import lila.common.Bus
|
|||
|
||||
final class IrwinStream {
|
||||
|
||||
private val classifier = "userSignup"
|
||||
private val channel = "irwin"
|
||||
|
||||
private val blueprint =
|
||||
Source
|
||||
.queue[IrwinRequest](32, akka.stream.OverflowStrategy.dropHead)
|
||||
.queue[IrwinRequest](64, akka.stream.OverflowStrategy.dropHead)
|
||||
.map(requestJson)
|
||||
.map { js =>
|
||||
s"${Json.stringify(js)}\n"
|
||||
}
|
||||
|
||||
def apply(): Source[String, _] = blueprint mapMaterializedValue { queue =>
|
||||
val sub = Bus.subscribeFun(classifier) {
|
||||
val sub = Bus.subscribeFun(channel) {
|
||||
case req: IrwinRequest =>
|
||||
lila.mon.mod.irwin.streamEventType("request").increment()
|
||||
queue offer req
|
||||
}
|
||||
|
||||
queue.watchCompletion dforeach { _ =>
|
||||
Bus.unsubscribe(sub, classifier)
|
||||
Bus.unsubscribe(sub, channel)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue