refactor ndjson streams

pull/4348/head
Thibault Duplessis 2018-05-08 16:31:38 +02:00
parent 1eaed3c164
commit 2157c815f2
6 changed files with 20 additions and 29 deletions

View File

@ -240,7 +240,7 @@ object Api extends LilaController {
def gamesByUsersStream = Action.async(parse.tolerantText) { req =>
RequireHttp11(req) {
val userIds = req.body.split(',').take(300).toSet map lila.user.User.normalize
Ok.chunked(Env.game.gamesByUsersStream(userIds)).fuccess
jsonStream(Env.game.gamesByUsersStream(userIds)).fuccess
}
}
@ -248,7 +248,7 @@ object Api extends LilaController {
RequireHttp11(req) {
lila.game.GameRepo.urgentGames(me) flatMap { povs =>
Env.challenge.api.createdByDestId(me.id) map { challenges =>
Ok.chunked(Env.api.eventStream(me, povs.map(_.game), challenges))
jsonOptionStream(Env.api.eventStream(me, povs.map(_.game), challenges))
}
}
}
@ -310,16 +310,21 @@ object Api extends LilaController {
private[controllers] val tooManyRequests = TooManyRequest(jsonError("Error 429: Too many requests! Try again later."))
def toHttp(result: ApiResult): Result = result match {
private[controllers] def toHttp(result: ApiResult): Result = result match {
case Limited => tooManyRequests
case NoData => NotFound
case Custom(result) => result
case Data(json) => Ok(json) as JSON
}
def jsonStream(stream: Enumerator[JsObject]) = Ok.chunked {
stream &> Enumeratee.map { o =>
Json.stringify(o) + "\n"
}
}.withHeaders(CONTENT_TYPE -> ndJsonContentType)
private[controllers] def jsonStream(stream: Enumerator[JsObject]): Result = jsonStringStream {
stream &> Enumeratee.map { o => Json.stringify(o) + "\n" }
}
private[controllers] def jsonOptionStream(stream: Enumerator[Option[JsObject]]): Result = jsonStringStream {
stream &> Enumeratee.map { _ ?? Json.stringify + "\n" }
}
private def jsonStringStream(stream: Enumerator[String]): Result =
Ok.chunked(stream).withHeaders(CONTENT_TYPE -> ndJsonContentType)
}

View File

@ -14,7 +14,7 @@ object Bot extends LilaController {
WithMyBotGame(id, me) { pov =>
RequireHttp11(req) {
lila.game.GameRepo.withInitialFen(pov.game) map { wf =>
Ok.chunked(Env.bot.gameStateStream(me, wf, pov.color))
Api.jsonOptionStream(Env.bot.gameStateStream(me, wf, pov.color))
}
}
}

View File

@ -18,11 +18,11 @@ final class EventStream(
import lila.common.HttpStream._
def apply(me: User, gamesInProgress: List[Game], challenges: List[Challenge]): Enumerator[String] = {
def apply(me: User, gamesInProgress: List[Game], challenges: List[Challenge]): Enumerator[Option[JsObject]] = {
var stream: Option[ActorRef] = None
val enumerator = Concurrent.unicast[Option[JsObject]](
Concurrent.unicast[Option[JsObject]](
onStart = channel => {
val actor = system.actorOf(Props(new Actor {
@ -70,7 +70,5 @@ final class EventStream(
},
onComplete = onComplete(stream, system)
)
enumerator &> stringifyOrEmpty
}
}

View File

@ -23,13 +23,13 @@ final class GameStateStream(
import lila.common.HttpStream._
def apply(me: User, init: Game.WithInitialFen, as: chess.Color): Enumerator[String] = {
def apply(me: User, init: Game.WithInitialFen, as: chess.Color): Enumerator[Option[JsObject]] = {
val id = init.game.id
var stream: Option[ActorRef] = None
val enumerator = Concurrent.unicast[Option[JsObject]](
Concurrent.unicast[Option[JsObject]](
onStart = channel => {
val actor = system.actorOf(Props(new Actor {
@ -89,7 +89,5 @@ final class GameStateStream(
},
onComplete = onComplete(stream, system)
)
enumerator &> stringifyOrEmpty
}
}

View File

@ -6,16 +6,6 @@ import play.api.libs.json._
object HttpStream {
val stringify =
Enumeratee.map[JsObject].apply[String] { js =>
Json.stringify(js) + "\n"
}
val stringifyOrEmpty =
Enumeratee.map[Option[JsObject]].apply[String] {
_ ?? Json.stringify + "\n"
}
def onComplete(stream: Option[ActorRef], system: ActorSystem) =
stream foreach { actor =>
system.lilaBus.unsubscribe(actor)

View File

@ -13,7 +13,7 @@ final class GamesByUsersStream(system: ActorSystem) {
import GamesByUsersStream._
import lila.common.HttpStream._
def apply(userIds: Set[User.ID]): Enumerator[String] = {
def apply(userIds: Set[User.ID]): Enumerator[JsObject] = {
def matches(game: Game) = game.userIds match {
case List(u1, u2) if u1 != u2 => userIds(u1) && userIds(u2)
@ -35,7 +35,7 @@ final class GamesByUsersStream(system: ActorSystem) {
onComplete = onComplete(stream, system)
)
enumerator &> withInitialFen &> toJson &> stringify
enumerator &> withInitialFen &> toJson
}
private val withInitialFen =