migrate explorer module

rm0193-mapreduce
Thibault Duplessis 2019-12-03 15:30:53 -06:00
parent 3bedf6ee84
commit ad3fd4ebd7
10 changed files with 88 additions and 83 deletions

View File

@ -0,0 +1,26 @@
package lila.common
import akka.NotUsed
import akka.stream.scaladsl._
import scala.concurrent.duration._
object LilaStream {
def flowRate[T](
metric: T => Int = (_: T) => 1,
outputDelay: FiniteDuration = 1 second
): Flow[T, Double, NotUsed] =
Flow[T]
.conflateWithSeed(metric(_)) { case (acc, x) => acc + metric(x) }
.zip(Source.tick(outputDelay, outputDelay, NotUsed))
.map(_._1.toDouble / outputDelay.toUnit(SECONDS))
def logRate[T](
name: String,
metric: T => Int = (_: T) => 1,
outputDelay: FiniteDuration = 1 second
)(logger: play.api.LoggerLike): Flow[T, T, NotUsed] =
Flow[T]
.alsoTo(flowRate[T](metric, outputDelay)
.to(Sink.foreach(r => logger.info(s"[rate] $name $r"))))
}

View File

@ -1,29 +1,28 @@
package lila.explorer
import akka.actor._
import com.typesafe.config.Config
import com.softwaremill.macwire._
import play.api.Configuration
import scala.concurrent.duration._
case class InternalEndpoint(value: String) extends AnyVal with StringValue
final class Env(
config: Config,
gameColl: lila.db.dsl.Coll,
appConfig: Configuration,
gameRepo: lila.game.GameRepo,
userRepo: lila.user.UserRepo,
gameImporter: lila.importer.Importer,
getBotUserIds: () => Fu[Set[lila.user.User.ID]],
getBotUserIds: lila.user.GetBotIds,
settingStore: lila.memo.SettingStore.Builder,
system: ActorSystem
) {
ws: play.api.libs.ws.WSClient
)(implicit system: akka.actor.ActorSystem) {
private val InternalEndpoint = config getString "internal_endpoint"
private lazy val internalEndpoint = InternalEndpoint {
appConfig.get[String]("explorer.internal_endpoint")
}
private lazy val indexer = new ExplorerIndexer(
gameColl = gameColl,
getBotUserIds = getBotUserIds,
internalEndpoint = InternalEndpoint
)
private lazy val indexer: ExplorerIndexer = wire[ExplorerIndexer]
lazy val importer = new ExplorerImporter(
endpoint = InternalEndpoint,
gameImporter = gameImporter
)
lazy val importer = wire[ExplorerImporter]
def cli = new lila.common.Cli {
def process = {
@ -41,15 +40,3 @@ final class Env(
case lila.game.actorApi.FinishGame(game, _, _) if !game.aborted && indexFlowSetting.get() => indexer(game)
}
}
object Env {
lazy val current = "explorer" boot new Env(
config = lila.common.PlayApp loadConfig "explorer",
gameColl = lila.game.Env.current.gameColl,
gameImporter = lila.importer.Env.current.importer,
getBotUserIds = () => lila.user.Env.current.cached.botIds.get,
settingStore = lila.memo.Env.current.settingStore,
system = lila.common.PlayApp.system
)
}

View File

@ -6,16 +6,18 @@ import lila.game.{ Game, GameRepo }
import lila.importer.{ Importer, ImportData }
final class ExplorerImporter(
endpoint: String,
gameImporter: Importer
endpoint: InternalEndpoint,
gameRepo: GameRepo,
gameImporter: Importer,
ws: play.api.libs.ws.WSClient
) {
private val masterGameEncodingFixedAt = new DateTime(2016, 3, 9, 0, 0)
def apply(id: Game.ID): Fu[Option[Game]] =
GameRepo game id flatMap {
gameRepo game id flatMap {
case Some(game) if !game.isPgnImport || game.createdAt.isAfter(masterGameEncodingFixedAt) => fuccess(game.some)
case _ => (GameRepo remove id) >> fetchPgn(id) flatMap {
case _ => (gameRepo remove id) >> fetchPgn(id) flatMap {
case None => fuccess(none)
case Some(pgn) => gameImporter(
ImportData(pgn, none),
@ -26,9 +28,7 @@ final class ExplorerImporter(
}
private def fetchPgn(id: String): Fu[Option[String]] = {
import play.api.libs.ws.WS
import play.api.Play.current
WS.url(s"$endpoint/master/pgn/$id").get() map {
ws.url(s"$endpoint/master/pgn/$id").get() map {
case res if res.status == 200 => res.body.some
case _ => None
}

View File

@ -1,27 +1,27 @@
package lila.explorer
import akka.stream.scaladsl.Sink
import chess.format.pgn.Tag
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import play.api.libs.iteratee._
import play.api.libs.ws.WS
import play.api.Play.current
import play.api.libs.ws.WSClient
import scala.util.Random.nextFloat
import scala.util.{ Try, Success, Failure }
import lila.common.LilaStream
import lila.db.dsl._
import lila.game.BSONHandlers.gameBSONHandler
import lila.game.{ Game, GameRepo, Query, PgnDump, Player }
import lila.user.{ User, UserRepo }
private final class ExplorerIndexer(
gameColl: Coll,
getBotUserIds: () => Fu[Set[User.ID]],
internalEndpoint: String
) {
gameRepo: GameRepo,
userRepo: UserRepo,
getBotUserIds: lila.user.GetBotIds,
ws: play.api.libs.ws.WSClient,
internalEndpoint: InternalEndpoint
)(implicit mat: akka.stream.Materializer) {
private val maxGames = Int.MaxValue
private val batchSize = 50
private val separator = "\n\n\n"
private val datePattern = "yyyy-MM-dd"
private val dateFormatter = DateTimeFormat forPattern datePattern
@ -32,8 +32,6 @@ private final class ExplorerIndexer(
private def parseDate(str: String): Option[DateTime] =
Try(dateFormatter parseDateTime str).toOption
type GamePGN = (Game, String)
def apply(sinceStr: String): Funit = getBotUserIds() flatMap { botUserIds =>
parseDate(sinceStr).fold(fufail[Unit](s"Invalid date $sinceStr")) { since =>
logger.info(s"Start indexing since $since")
@ -46,36 +44,24 @@ private final class ExplorerIndexer(
Query.bothRatingsGreaterThan(1501)
import reactivemongo.api._
import reactivemongo.play.iteratees.cursorProducer
gameColl.find(query)
.sort(Query.sortChronological)
.cursor[Game](ReadPreference.secondary)
.enumerator(maxGames) &>
Enumeratee.mapM[Game].apply[Option[GamePGN]] { game =>
makeFastPgn(game, botUserIds) map { _ map { game -> _ } }
} &>
Enumeratee.collect { case Some(el) => el } &>
Enumeratee.grouped(Iteratee takeUpTo batchSize) |>>>
Iteratee.foldM[Seq[GamePGN], Long](nowMillis) {
case (millis, pairs) =>
WS.url(internalEndPointUrl).put(pairs.map(_._2) mkString separator).flatMap {
case res if res.status == 200 =>
val date = pairs.headOption.map(_._1.createdAt) ?? dateTimeFormatter.print
val nb = pairs.size
val gameMs = (nowMillis - millis) / nb.toDouble
logger.info(s"$date $nb ${gameMs.toInt} ms/game ${(1000 / gameMs).toInt} games/s")
funit
case res => fufail(s"Stop import because of status ${res.status}")
} >> {
pairs.headOption match {
case None => fufail(s"No games left, import complete!")
case Some((g, _)) if (g.createdAt.isAfter(DateTime.now.minusMinutes(10))) =>
fufail(s"Found a recent game, import complete!")
case _ => funit
}
} inject nowMillis
} void
gameRepo
.sortedCursor(query, Query.sortChronological)
.documentSource()
.via(LilaStream.logRate[Game]("fetch")(logger))
.mapAsyncUnordered(8) { makeFastPgn(_, botUserIds) }
.mapConcat(_.toList)
.via(LilaStream.logRate("index")(logger))
.grouped(50)
.map(_ mkString separator)
.mapAsyncUnordered(2) { pgn =>
ws.url(internalEndPointUrl).put(pgn).flatMap {
case res if res.status == 200 => funit
case res => fufail(s"Stop import because of status ${res.status}")
}
}
.to(Sink.ignore)
.run.void
}
}
@ -92,7 +78,7 @@ private final class ExplorerIndexer(
buf += pgn
val startAt = nowMillis
if (buf.size >= max) {
WS.url(internalEndPointUrl).put(buf mkString separator) andThen {
ws.url(internalEndPointUrl).put(buf mkString separator) andThen {
case Success(res) if res.status == 200 =>
lila.mon.explorer.index.time(((nowMillis - startAt) / max).toInt)
lila.mon.explorer.index.success(max)
@ -150,8 +136,8 @@ private final class ExplorerIndexer(
if probability(game, averageRating) > nextFloat
if !game.userIds.exists(botUserIds.contains)
if valid(game)
} yield GameRepo initialFen game flatMap { initialFen =>
UserRepo.usernamesByIds(game.userIds) map { usernames =>
} yield gameRepo initialFen game flatMap { initialFen =>
userRepo.usernamesByIds(game.userIds) map { usernames =>
def username(color: chess.Color) = game.player(color).userId flatMap { id =>
usernames.find(_.toLowerCase == id)
} orElse game.player(color).userId getOrElse "?"

View File

@ -63,7 +63,7 @@ private[puzzle] final class PuzzleBatch(
step: Int,
idRange: Range,
nb: Int
): Fu[List[Puzzle]] = puzzleColl.find(rangeSelector(
): Fu[List[Puzzle]] = puzzleColl.ext.find(rangeSelector(
rating = rating,
tolerance = tolerance,
idRange = idRange

View File

@ -69,7 +69,7 @@ private[puzzle] final class Selector(
tolerance: Int,
step: Int,
idRange: Range
): Fu[Option[Puzzle]] = puzzleColl.find(rangeSelector(
): Fu[Option[Puzzle]] = puzzleColl.ext.find(rangeSelector(
rating = rating,
tolerance = tolerance,
idRange = idRange

View File

@ -22,7 +22,7 @@ final class UserInfosApi(roundColl: Coll, currentPuzzleId: User => Fu[Option[Puz
private def fetchRounds(userId: User.ID, currentPuzzleId: Option[PuzzleId]): Fu[List[Round]] = {
val idSelector = $doc("$regex" -> BSONRegex(s"^$userId:", "")) ++
currentPuzzleId.?? { id => $doc("$lte" -> s"$userId:${Round encode id}") }
roundColl.find($doc(Round.BSONFields.id -> idSelector))
roundColl.ext.find($doc(Round.BSONFields.id -> idSelector))
.sort($sort desc Round.BSONFields.id)
.list[Round](historySize atLeast chartSize)
.map(_.reverse)

View File

@ -72,7 +72,7 @@ final class Cached(
def apply(perf: PerfType) = rankingApi.weeklyRatingDistribution(perf)
}
val botIds = asyncCache.single[Set[User.ID]](
private[user] val botIds = asyncCache.single[Set[User.ID]](
name = "user.botIds",
f = userRepo.botIds,
expireAfter = _.ExpireAfterWrite(10 minutes)

View File

@ -41,6 +41,8 @@ final class Env(
val isOnline = new IsOnline(userId => onlineUserIds() contains userId)
lazy val botIds = new GetBotIds(() => cached.botIds.get)
lazy val jsonView = wire[JsonView]
lazy val noteApi = {

View File

@ -3,3 +3,7 @@ package lila.user
final class IsOnline(f: User.ID => Boolean) extends (User.ID => Boolean) {
def apply(u: User.ID) = f(u)
}
final class GetBotIds(f: () => Fu[Set[User.ID]]) extends (() => Fu[Set[User.ID]]) {
def apply() = f()
}