HTTP stream refactoring

This commit is contained in:
Thibault Duplessis 2018-04-17 17:00:17 +02:00
parent b32a4a454c
commit 8731133646
6 changed files with 22 additions and 21 deletions

View file

@ -270,7 +270,7 @@ object Api extends LilaController {
def gamesByUsersStream = Action.async(parse.tolerantText) { req =>
RequireHttp11(req) {
val userIds = req.body.split(',').take(300).toSet map lila.user.User.normalize
Ok.chunked(Env.game.stream.startedByUserIds(userIds)).fuccess
Ok.chunked(Env.game.gamesByUsersStream(userIds)).fuccess
}
}

View file

@ -135,7 +135,7 @@ final class Env(
pools = pools
)
lazy val eventStream = new EventStream(system, challengeJsonView)
lazy val eventStream = new EventStream(system, challengeJsonView, userEnv.onlineUserIdMemo.put)
private def makeUrl(path: String): String = s"${Net.BaseUrl}/$path"

View file

@ -1,17 +1,19 @@
package lila.api
import scala.concurrent.duration._
import akka.actor._
import play.api.libs.iteratee._
import play.api.libs.json._
import lila.challenge.Challenge
import lila.game.actorApi.UserStartGame
import lila.game.Game
import lila.user.User
import lila.challenge.Challenge
final class EventStream(
system: ActorSystem,
challengeJsonView: lila.challenge.JsonView
challengeJsonView: lila.challenge.JsonView,
setOnline: User.ID => Unit
) {
import lila.common.HttpStream._
@ -26,9 +28,15 @@ final class EventStream(
gamesInProgress foreach pushGameStart
challenges foreach pushChallenge
self ! SetOnline
def receive = {
case SetOnline =>
println(nowSeconds, s"set online ${me.id}")
setOnline(me.id)
context.system.scheduler.scheduleOnce(6 second, self, SetOnline)
case UserStartGame(userId, game) if userId == me.id => pushGameStart(game)
case lila.challenge.Event.Create(c) if c.destUserId has me.id => pushChallenge(c)

View file

@ -16,4 +16,6 @@ object HttpStream {
system.lilaBus.unsubscribe(actor)
actor ! PoisonPill
}
case object SetOnline
}

View file

@ -87,7 +87,7 @@ final class Env(
}
}
lazy val stream = new GameStream(system)
lazy val gamesByUsersStream = new GamesByUsersStream(system)
lazy val bestOpponents = new BestOpponents
}

View file

@ -8,11 +8,12 @@ import actorApi.{ StartGame, FinishGame }
import chess.format.FEN
import lila.user.User
final class GameStream(system: ActorSystem) {
final class GamesByUsersStream(system: ActorSystem) {
import GameStream._
import GamesByUsersStream._
import lila.common.HttpStream._
def startedByUserIds(userIds: Set[User.ID]): Enumerator[String] = {
def apply(userIds: Set[User.ID]): Enumerator[String] = {
def matches(game: Game) = game.userIds match {
case List(u1, u2) if u1 != u2 => userIds(u1) && userIds(u2)
@ -31,12 +32,7 @@ final class GameStream(system: ActorSystem) {
system.lilaBus.subscribe(actor, 'startGame, 'finishGame)
stream = actor.some
},
onComplete = {
stream.foreach { actor =>
system.lilaBus.unsubscribe(actor)
actor ! PoisonPill
}
}
onComplete = onComplete(stream, system)
)
enumerator &> withInitialFen &> toJson &> stringify
@ -46,15 +42,10 @@ final class GameStream(system: ActorSystem) {
Enumeratee.mapM[Game].apply[Game.WithInitialFen](GameRepo.withInitialFen)
private val toJson =
Enumeratee.map[Game.WithInitialFen].apply[JsValue](gameWithInitialFenWriter.writes)
private val stringify =
Enumeratee.map[JsValue].apply[String] { js =>
Json.stringify(js) + "\n"
}
Enumeratee.map[Game.WithInitialFen].apply[JsObject](gameWithInitialFenWriter.writes)
}
object GameStream {
object GamesByUsersStream {
private implicit val fenWriter: Writes[FEN] = Writes[FEN] { f =>
JsString(f.value)