more stream review

scapegoat
Thibault Duplessis 2019-12-08 21:20:36 -06:00
parent b37e2d3b2a
commit b385cb0119
4 changed files with 18 additions and 17 deletions

View File

@ -1,6 +1,6 @@
package lila.explorer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl._
import chess.format.pgn.Tag
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
@ -55,8 +55,9 @@ private final class ExplorerIndexer(
case res => fufail(s"Stop import because of status ${res.status}")
}
}
.to(Sink.ignore)
.run.void
.toMat(Sink.ignore)(Keep.right)
.run
.void
}
}

View File

@ -39,8 +39,12 @@ private[round] final class Titivate(
case Run => gameRepo.count(_.checkable).flatMap { total =>
lila.mon.round.titivate.total(total)
gameRepo.cursor(Query.checkable).documentSource()
.via(Flow[Game] take 100).via(gameFlow).toMat(gameSink)(Keep.right).run
gameRepo.cursor(Query.checkable)
.documentSource()
.take(100)
.via(gameFlow)
.toMat(gameSink)(Keep.right)
.run
.mon(_.round.titivate.time)
.addEffect(lila.mon.round.titivate.game(_))
.>> {

View File

@ -17,25 +17,21 @@ private final class LeaderboardIndexer(
import LeaderboardApi._
import BSONHandlers._
def generateAll: Funit = leaderboardRepo.coll.delete.one($empty) >> {
def generateAll: Funit = leaderboardRepo.coll.delete.one($empty) >>
tournamentRepo.coll.ext.find(tournamentRepo.finishedSelect)
.sort($sort desc "startsAt")
.cursor[Tournament](ReadPreference.secondaryPreferred)
.documentSource()
.take(20_000)
.via(lila.common.LilaStream.logRate[Tournament]("leaderboard index tour")(logger))
.mapAsyncUnordered(1)(generateTourEntries)
.mapConcat(identity)
.via(lila.common.LilaStream.logRate[Entry]("leaderboard index entries")(logger))
.grouped(500)
.mapAsyncUnordered(1) { entries =>
saveEntries(entries) inject entries.size
}
.fold(0)((acc, nb) => acc + nb)
.wireTap { nb =>
if (nb % 5 == 0) logger.info(s"Generating leaderboards... $nb")
}
.to(Sink.ignore)
.mapAsyncUnordered(1)(saveEntries)
.toMat(Sink.ignore)(Keep.right)
.run
}.void
.void
def indexOne(tour: Tournament): Funit =
leaderboardRepo.coll.delete.one($doc("t" -> tour.id)) >>

View File

@ -486,8 +486,8 @@ final class TournamentApi(
def resultStream(tour: Tournament, perSecond: MaxPerSecond, nb: Int): Source[Player.Result, _] =
playerRepo.sortedCursor(tour.id, perSecond.value)
.documentSource()
.throttle(perSecond.value, 1 second)
.take(nb)
.throttle(perSecond.value, 1 second)
.zipWithIndex
.mapAsync(8) {
case (player, index) =>
@ -499,8 +499,8 @@ final class TournamentApi(
def byOwnerStream(owner: User, perSecond: MaxPerSecond, nb: Int): Source[Tournament, _] =
tournamentRepo.sortedCursor(owner, perSecond.value)
.documentSource()
.throttle(perSecond.value, 1 second)
.take(nb)
.throttle(perSecond.value, 1 second)
private def playerPovs(tour: Tournament, userId: User.ID, nb: Int): Fu[List[LightPov]] =
pairingRepo.recentIdsByTourAndUserId(tour.id, userId, nb) flatMap