kill previous event stream on connection - closes #6694

pull/6698/head
Thibault Duplessis 2020-05-25 09:06:37 -06:00
parent 1343c4d2f4
commit e75b5c803a
3 changed files with 10 additions and 11 deletions

View File

@ -310,19 +310,11 @@ final class Api(
jsonStreamWithKeepAlive(env.game.gamesByUsersStream(userIds))(req).fuccess
}
private val EventStreamConcurrencyLimitPerUser = new lila.memo.ConcurrencyLimit[String](
name = "Event Stream API concurrency per user",
key = "eventStream.concurrency.limit.user",
ttl = 20.minutes,
maxConcurrency = 1
)
def eventStream =
Scoped(_.Bot.Play, _.Board.Play, _.Challenge.Read) { _ => me =>
env.round.proxyRepo.urgentGames(me) flatMap { povs =>
env.challenge.api.createdByDestId(me.id) map { challenges =>
EventStreamConcurrencyLimitPerUser(me.id)(
env.api.eventStream(me, povs.map(_.game), challenges)
)(sourceToNdJsonOption)
sourceToNdJsonOption(env.api.eventStream(me, povs.map(_.game), challenges))
}
}
}

View File

@ -28,7 +28,11 @@ final class EventStream(
me: User,
gamesInProgress: List[Game],
challenges: List[Challenge]
): Source[Option[JsObject], _] =
): Source[Option[JsObject], _] = {
// kill previous one if any
Bus.publish(PoisonPill, s"eventStreamFor:${me.id}")
blueprint mapMaterializedValue { queue =>
gamesInProgress map toJson map some foreach queue.offer
challenges map toJson map some foreach queue.offer
@ -39,6 +43,7 @@ final class EventStream(
actor ! PoisonPill
}
}
}
private def mkActor(me: User, queue: SourceQueueWithComplete[Option[JsObject]]) =
new Actor {
@ -46,6 +51,7 @@ final class EventStream(
val classifiers = List(
s"userStartGame:${me.id}",
s"rematchFor:${me.id}",
s"eventStreamFor:${me.id}",
"challenge"
)
@ -59,6 +65,7 @@ final class EventStream(
override def postStop() = {
super.postStop()
Bus.unsubscribe(self, classifiers)
queue.complete()
}
self ! SetOnline

View File

@ -7,7 +7,7 @@ import play.api.mvc.Results.TooManyRequests
import scala.concurrent.duration.FiniteDuration
/**
* only allow one future at a time per key
* only allow one stream at a time per key
*/
final class ConcurrencyLimit[K](
name: String,