tweak streams

pull/5767/head
Thibault Duplessis 2019-12-16 09:06:48 -06:00
parent 80257aee66
commit 0bf0aa1296
5 changed files with 11 additions and 7 deletions

View File

@ -29,4 +29,8 @@ object LilaStream {
val sinkCount = Sink.fold[Int, Any](0) {
case (total, _) => total + 1
}
def collect[A] = Flow[Option[A]] collect {
case Some(a) => a
}
}

View File

@ -45,7 +45,7 @@ final private class ExplorerIndexer(
.documentSource()
.via(LilaStream.logRate[Game]("fetch")(logger))
.mapAsyncUnordered(8) { makeFastPgn(_, botUserIds) }
.mapConcat(_.toList)
.via(LilaStream.collect)
.via(LilaStream.logRate("index")(logger))
.grouped(50)
.map(_ mkString separator)

View File

@ -9,7 +9,7 @@ import scala.concurrent.duration._
import lila.db.dsl._
import lila.game.BSONHandlers.gameBSONHandler
import lila.game.{ Game, GameRepo, Query }
import lila.common.WorkQueue
import lila.common.{ LilaStream, WorkQueue }
import lila.user.User
final private class Indexer(
@ -81,7 +81,7 @@ final private class Indexer(
.throttle(300, 1 second)
.take(maxGames)
.mapAsync(4)(toEntry)
.mapConcat(_.toList)
.via(LilaStream.collect)
.zipWithIndex
.map { case (e, i) => e.copy(number = fromNumber + i.toInt) }
.grouped(50)

View File

@ -7,7 +7,7 @@ import reactivemongo.api._
import scala.concurrent.duration._
import lila.common.Bus
import lila.common.LilaStream.sinkCount
import lila.common.LilaStream
import lila.db.dsl._
import lila.game.{ Game, Pov }
@ -66,7 +66,7 @@ final private class CorresAlarm(
.documentSource()
.take(200)
.mapAsyncUnordered(4)(alarm => proxyGame(alarm._id))
.mapConcat(_.toList)
.via(LilaStream.collect)
.mapAsyncUnordered(4) { game =>
val pov = Pov(game, game.turnColor)
pov.player.userId.fold(fuccess(true))(u => hasUserId(pov.game, u)).addEffect {
@ -74,7 +74,7 @@ final private class CorresAlarm(
case false => Bus.publish(lila.game.actorApi.CorresAlarmEvent(pov), "corresAlarm")
} >> coll.delete.one($id(game.id))
}
.toMat(sinkCount)(Keep.right)
.toMat(LilaStream.sinkCount)(Keep.right)
.run
.mon(_.round.alarm.time)
.addEffectAnyway(scheduleNext)

View File

@ -52,7 +52,7 @@ final class TournamentStandingApi(
workQueue {
compute(id, page)
} recover {
case e: Exception =>
case _: Exception =>
lila.mon.tournament.standingOverload.increment()
Json.obj(
"failed" -> true,