send empty events to keep the stream alive - closes #6538
parent
01ffa3d8e1
commit
c27649e790
|
@ -278,7 +278,7 @@ final class Api(
|
|||
|
||||
private def gamesByUsers(max: Int)(req: Request[String]) = {
|
||||
val userIds = req.body.split(',').view.take(max).map(lila.user.User.normalize).toSet
|
||||
jsonStream(env.game.gamesByUsersStream(userIds))(req).fuccess
|
||||
jsonStreamWithKeepAlive(env.game.gamesByUsersStream(userIds))(req).fuccess
|
||||
}
|
||||
|
||||
private val EventStreamConcurrencyLimitPerUser = new lila.memo.ConcurrencyLimit[String](
|
||||
|
@ -344,6 +344,11 @@ final class Api(
|
|||
def jsonStream(makeSource: => Source[JsValue, _])(implicit req: RequestHeader): Result =
|
||||
GlobalConcurrencyLimitPerIP(HTTPRequest lastRemoteAddress req)(makeSource)(sourceToNdJson)
|
||||
|
||||
def jsonStreamWithKeepAlive(
|
||||
makeSource: => Source[Option[JsValue], _]
|
||||
)(implicit req: RequestHeader): Result =
|
||||
GlobalConcurrencyLimitPerIP(HTTPRequest lastRemoteAddress req)(makeSource)(sourceToNdJsonOption)
|
||||
|
||||
def sourceToNdJson(source: Source[JsValue, _]) =
|
||||
sourceToNdJsonString {
|
||||
source.map { o =>
|
||||
|
|
|
@ -2,6 +2,7 @@ package lila.game
|
|||
|
||||
import akka.stream.scaladsl._
|
||||
import play.api.libs.json._
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import actorApi.{ FinishGame, StartGame }
|
||||
import chess.format.FEN
|
||||
|
@ -10,16 +11,21 @@ import lila.common.Json.jodaWrites
|
|||
import lila.game.Game
|
||||
import lila.user.User
|
||||
|
||||
final class GamesByUsersStream(gameRepo: lila.game.GameRepo)(implicit ec: scala.concurrent.ExecutionContext) {
|
||||
final class GamesByUsersStream(gameRepo: lila.game.GameRepo)(
|
||||
implicit ec: scala.concurrent.ExecutionContext
|
||||
) {
|
||||
|
||||
private val chans = List("startGame", "finishGame")
|
||||
private val keepAliveInterval = 70.seconds // play's idleTimeout = 75s
|
||||
private val chans = List("startGame", "finishGame")
|
||||
|
||||
private val blueprint = Source
|
||||
.queue[Game](64, akka.stream.OverflowStrategy.dropHead)
|
||||
.mapAsync(1)(gameRepo.withInitialFen)
|
||||
.map(gameWithInitialFenWriter.writes)
|
||||
.map(some)
|
||||
.merge(Source.tick(keepAliveInterval, keepAliveInterval, none))
|
||||
|
||||
def apply(userIds: Set[User.ID]): Source[JsObject, _] =
|
||||
def apply(userIds: Set[User.ID]): Source[Option[JsValue], _] =
|
||||
blueprint mapMaterializedValue { queue =>
|
||||
def matches(game: Game) = game.userIds match {
|
||||
case List(u1, u2) if u1 != u2 => userIds(u1) && userIds(u2)
|
||||
|
|
Loading…
Reference in New Issue