improve relay sync
parent
b0c5b817aa
commit
7abe9d597f
|
@ -251,13 +251,13 @@ object mon {
|
|||
}
|
||||
object relay {
|
||||
private def by(official: Boolean) = if (official) "official" else "user"
|
||||
def ongoing(official: Boolean) = gauge("relay.ongoing").withTag("by", by(official))
|
||||
def moves(official: Boolean, slug: String) = counter("relay.moves").withTags(
|
||||
private def relay(official: Boolean, slug: String) =
|
||||
Map("by" -> by(official), "slug" -> slug)
|
||||
)
|
||||
def syncTime(official: Boolean, slug: String) = timer("relay.sync.time").withTags(
|
||||
Map("by" -> by(official), "slug" -> slug)
|
||||
)
|
||||
def ongoing(official: Boolean) = gauge("relay.ongoing").withTag("by", by(official))
|
||||
def games(official: Boolean, slug: String) = gauge("relay.games").withTags(relay(official, slug))
|
||||
def moves(official: Boolean, slug: String) = counter("relay.moves").withTags(relay(official, slug))
|
||||
def fetchTime(official: Boolean, slug: String) = timer("relay.fetch.time").withTags(relay(official, slug))
|
||||
def syncTime(official: Boolean, slug: String) = timer("relay.sync.time").withTags(relay(official, slug))
|
||||
}
|
||||
object bot {
|
||||
def moves(username: String) = counter("bot.moves").withTag("name", username)
|
||||
|
|
|
@ -34,7 +34,7 @@ final private class RelayFetch(
|
|||
case object Tick
|
||||
|
||||
def scheduleNext =
|
||||
context.system.scheduler.scheduleOnce(600 millis, self, Tick)
|
||||
context.system.scheduler.scheduleOnce(500 millis, self, Tick)
|
||||
|
||||
def receive = {
|
||||
|
||||
|
@ -66,10 +66,12 @@ final private class RelayFetch(
|
|||
def processRelay(relay: Relay): Fu[Relay] =
|
||||
if (!relay.sync.playing) fuccess(relay.withSync(_.play))
|
||||
else
|
||||
doProcess(relay)
|
||||
fetchGames(relay)
|
||||
.mon(_.relay.fetchTime(relay.official, relay.slug))
|
||||
.addEffect(gs => lila.mon.relay.games(relay.official, relay.slug).update(gs.size))
|
||||
.flatMap { games =>
|
||||
sync(relay, games)
|
||||
.withTimeout(5 seconds, SyncResult.Timeout)
|
||||
.withTimeout(7 seconds, SyncResult.Timeout)
|
||||
.mon(_.relay.syncTime(relay.official, relay.slug))
|
||||
.map { res =>
|
||||
res -> relay.withSync(_ addLog SyncLog.event(res.moves, none))
|
||||
|
@ -126,7 +128,7 @@ final private class RelayFetch(
|
|||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import RelayFetch.GamesSeenBy
|
||||
|
||||
private def doProcess(relay: Relay): Fu[RelayGames] =
|
||||
private def fetchGames(relay: Relay): Fu[RelayGames] =
|
||||
cache.asMap
|
||||
.compute(
|
||||
relay.sync.upstream,
|
||||
|
@ -267,19 +269,19 @@ private object RelayFetch {
|
|||
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
|
||||
def apply(multiPgn: MultiPgn): Fu[List[RelayGame]] =
|
||||
def apply(multiPgn: MultiPgn): Fu[Vector[RelayGame]] =
|
||||
multiPgn.value
|
||||
.foldLeft[Try[(List[RelayGame], Int)]](Success(List.empty -> 0)) {
|
||||
.foldLeft[Try[(Vector[RelayGame], Int)]](Success(Vector.empty -> 0)) {
|
||||
case (Success((acc, index)), pgn) =>
|
||||
pgnCache.get(pgn) flatMap { f =>
|
||||
val game = f(index)
|
||||
if (game.isEmpty) Failure(LilaException(s"Found an empty PGN at index $index"))
|
||||
else Success((game :: acc, index + 1))
|
||||
else Success((acc :+ game, index + 1))
|
||||
}
|
||||
case (acc, _) => acc
|
||||
}
|
||||
.future
|
||||
.dmap(_._1.reverse)
|
||||
.dmap(_._1)
|
||||
|
||||
private val pgnCache: LoadingCache[String, Try[Int => RelayGame]] = CacheApi.scaffeineNoScheduler
|
||||
.expireAfterAccess(2 minutes)
|
||||
|
|
|
@ -36,7 +36,7 @@ private object RelayInputSanity {
|
|||
|
||||
// TCEC style has one game per file, and reuses the file for all games
|
||||
private def isValidTCEC(chapters: List[Chapter], games: RelayGames) = games match {
|
||||
case List(onlyGame) =>
|
||||
case Vector(onlyGame) =>
|
||||
chapters.lastOption.exists { c =>
|
||||
onlyGame staticTagsMatch c.tags
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ final private class RelaySync(
|
|||
RelayInputSanity(chapters, games) match {
|
||||
case Some(fail) => fufail(fail.msg)
|
||||
case None =>
|
||||
lila.common.Future.traverseSequentially(games) { game =>
|
||||
lila.common.Future.linear(games) { game =>
|
||||
findCorrespondingChapter(game, chapters, games.size) match {
|
||||
case Some(chapter) => updateChapter(study, chapter, game)
|
||||
case None =>
|
||||
|
|
|
@ -4,5 +4,5 @@ package object relay extends PackageObject {
|
|||
|
||||
private[relay] val logger = lila.log("relay")
|
||||
|
||||
private[relay] type RelayGames = List[RelayGame]
|
||||
private[relay] type RelayGames = Vector[RelayGame]
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue