lila/modules/relay/src/main/RelayFetch.scala

346 lines
11 KiB
Scala

package lila.relay
import akka.actor._
import chess.format.pgn.Tags
import com.github.blemale.scaffeine.LoadingCache
import io.lemonlabs.uri.Url
import org.joda.time.DateTime
import play.api.libs.json._
import play.api.libs.ws.StandaloneWSClient
import RelayRound.Sync.{ UpstreamIds, UpstreamUrl }
import scala.concurrent.duration._
import lila.base.LilaException
import lila.memo.CacheApi
import lila.study.MultiPgn
import lila.tree.Node.Comments
import lila.game.{ GameRepo, PgnDump }
import lila.round.GameProxyRepo
final private class RelayFetch(
sync: RelaySync,
api: RelayApi,
irc: lila.irc.IrcApi,
formatApi: RelayFormatApi,
gameRepo: GameRepo,
pgnDump: PgnDump,
gameProxy: GameProxyRepo,
ws: StandaloneWSClient
) extends Actor {
implicit def system = context.system
implicit def ec = context.dispatcher
override def preStart(): Unit = {
context setReceiveTimeout 20.seconds
context.system.scheduler.scheduleOnce(10.seconds)(scheduleNext())
()
}
case object Tick
def scheduleNext(): Unit =
context.system.scheduler.scheduleOnce(500 millis, self, Tick).unit
def receive = {
case ReceiveTimeout =>
val msg = "RelaySync timed out!"
logger.error(msg)
throw new RuntimeException(msg)
case Tick =>
api.toSync.flatMap { relays =>
List(true, false) foreach { official =>
lila.mon.relay.ongoing(official).update(relays.count(_.tour.official == official))
}
relays.map { rt =>
if (rt.round.sync.ongoing) processRelay(rt) flatMap { newRelay =>
api.update(rt.round)(_ => newRelay)
}
else if (rt.round.hasStarted) {
logger.info(s"Finish by lack of activity ${rt.round}")
api.update(rt.round)(_.finish)
} else if (rt.round.shouldGiveUp) {
logger.info(s"Finish for lack of start ${rt.round}")
api.update(rt.round)(_.finish)
} else fuccess(rt.round)
}.sequenceFu addEffectAnyway scheduleNext()
}.unit
}
// no writing the relay; only reading!
def processRelay(rt: RelayRound.WithTour): Fu[RelayRound] =
if (!rt.round.sync.playing) fuccess(rt.round.withSync(_.play))
else
fetchGames(rt)
.mon(_.relay.fetchTime(rt.tour.official, rt.round.slug))
.addEffect(gs => lila.mon.relay.games(rt.tour.official, rt.round.slug).update(gs.size).unit)
.flatMap { games =>
sync(rt, games)
.withTimeout(7 seconds, SyncResult.Timeout)
.mon(_.relay.syncTime(rt.tour.official, rt.round.slug))
.map { res =>
res -> rt.round
.withSync(_ addLog SyncLog.event(res.nbMoves, none))
.copy(finished = games.forall(_.end.isDefined))
}
}
.recover { case e: Exception =>
(e match {
case SyncResult.Timeout =>
if (rt.tour.official) logger.info(s"Sync timeout ${rt.round}")
SyncResult.Timeout
case _ =>
if (rt.tour.official) logger.info(s"Sync error ${rt.round} ${e.getMessage take 80}")
SyncResult.Error(e.getMessage)
}) -> rt.round.withSync(_ addLog SyncLog.event(0, e.some))
}
.map { case (result, newRelay) =>
afterSync(result, newRelay withTour rt.tour)
}
def afterSync(result: SyncResult, rt: RelayRound.WithTour): RelayRound =
result match {
case result: SyncResult.Ok if result.nbMoves == 0 => continueRelay(rt)
case result: SyncResult.Ok =>
continueRelay(rt)
lila.mon.relay.moves(rt.tour.official, rt.round.slug).increment(result.nbMoves)
continueRelay(rt.round.ensureStarted.resume withTour rt.tour)
case _ => continueRelay(rt)
}
def continueRelay(rt: RelayRound.WithTour): RelayRound =
rt.round.sync.upstream.fold(rt.round) { upstream =>
val seconds =
if (rt.round.sync.log.alwaysFails && !upstream.local) {
rt.round.sync.log.events.lastOption
.filterNot(_.isTimeout)
.flatMap(_.error)
.ifTrue(rt.tour.official && rt.round.shouldHaveStarted)
.filterNot(_ contains "Cannot parse moves")
.filterNot(_ contains "Found an empty PGN")
.foreach { irc.broadcastError(rt.round.id.value, rt.fullName, _) }
60
} else
rt.round.sync.delay getOrElse {
if (upstream.local) 3 else 6
}
rt.round.withSync {
_.copy(
nextAt = DateTime.now plusSeconds {
seconds atLeast { if (rt.round.sync.log.justTimedOut) 10 else 2 }
} some
)
}
}
import com.github.benmanes.caffeine.cache.Cache
import RelayFetch.GamesSeenBy
private val gameIdsUpstreamPgnFlags = PgnDump.WithFlags(
clocks = true,
moves = true,
tags = true,
evals = false,
opening = false,
literate = false,
pgnInJson = false,
delayMoves = true
)
private def fetchGames(rt: RelayRound.WithTour): Fu[RelayGames] =
rt.round.sync.upstream ?? {
case UpstreamIds(ids) =>
gameRepo.gamesFromSecondary(ids) flatMap
gameProxy.upgradeIfPresent flatMap
gameRepo.withInitialFens flatMap {
_.map { case (game, fen) =>
pgnDump(game, fen, gameIdsUpstreamPgnFlags).dmap(_.render)
}.sequenceFu dmap MultiPgn.apply
} flatMap RelayFetch.multiPgnToGames.apply
case url: UpstreamUrl =>
cache.asMap
.compute(
url,
(_, v) =>
Option(v) match {
case Some(GamesSeenBy(games, seenBy)) if !seenBy(rt.round.id) =>
GamesSeenBy(games, seenBy + rt.round.id)
case _ =>
GamesSeenBy(doFetchUrl(url, RelayFetch.maxChapters(rt.tour)), Set(rt.round.id))
}
)
.games
}
// The goal of this is to make sure that an upstream used by several broadcast
// is only pulled from as many times as necessary, and not more.
private val cache: Cache[UpstreamUrl, GamesSeenBy] = CacheApi.scaffeineNoScheduler
.initialCapacity(4)
.maximumSize(16)
.build[UpstreamUrl, GamesSeenBy]()
.underlying
private def doFetchUrl(upstream: UpstreamUrl, max: Int): Fu[RelayGames] = {
import RelayFetch.DgtJson._
formatApi get upstream.withRound flatMap {
case RelayFormat.SingleFile(doc) =>
doc.format match {
// all games in a single PGN file
case RelayFormat.DocFormat.Pgn => httpGet(doc.url) map { MultiPgn.split(_, max) }
// maybe a single JSON game? Why not
case RelayFormat.DocFormat.Json =>
httpGetJson[GameJson](doc.url)(gameReads) map { game =>
MultiPgn(List(game.toPgn()))
}
}
case RelayFormat.ManyFiles(indexUrl, makeGameDoc) =>
httpGetJson[RoundJson](indexUrl) flatMap { round =>
round.pairings.zipWithIndex
.map { case (pairing, i) =>
val number = i + 1
val gameDoc = makeGameDoc(number)
(gameDoc.format match {
case RelayFormat.DocFormat.Pgn => httpGet(gameDoc.url)
case RelayFormat.DocFormat.Json =>
httpGetJson[GameJson](gameDoc.url).recover { case _: Exception =>
GameJson(moves = Nil, result = none)
} map { _.toPgn(pairing.tags) }
}) map (number -> _)
}
.sequenceFu
.map { results =>
MultiPgn(results.sortBy(_._1).map(_._2))
}
}
} flatMap RelayFetch.multiPgnToGames.apply
}
private def httpGet(url: Url): Fu[String] =
ws.url(url.toString)
.withRequestTimeout(4.seconds)
.get()
.flatMap {
case res if res.status == 200 => fuccess(res.body)
case res => fufail(s"[${res.status}] $url")
}
private def httpGetJson[A: Reads](url: Url): Fu[A] =
for {
str <- httpGet(url)
json <- scala.concurrent.Future(Json parse str) // Json.parse throws exceptions (!)
data <-
implicitly[Reads[A]]
.reads(json)
.fold(
err => fufail(s"Invalid JSON from $url: $err"),
fuccess
)
} yield data
}
private object RelayFetch {
case class GamesSeenBy(games: Fu[RelayGames], seenBy: Set[RelayRound.Id])
def maxChapters(tour: RelayTour) =
lila.study.Study.maxChapters * (if (tour.official) 2 else 1)
private object DgtJson {
case class PairingPlayer(
fname: Option[String],
mname: Option[String],
lname: Option[String],
title: Option[String]
) {
def fullName =
some {
List(fname, mname, lname).flatten mkString " "
}.filter(_.nonEmpty)
}
case class RoundJsonPairing(white: PairingPlayer, black: PairingPlayer, result: String) {
import chess.format.pgn._
def tags =
Tags(
List(
white.fullName map { v =>
Tag(_.White, v)
},
white.title map { v =>
Tag(_.WhiteTitle, v)
},
black.fullName map { v =>
Tag(_.Black, v)
},
black.title map { v =>
Tag(_.BlackTitle, v)
},
Tag(_.Result, result).some
).flatten
)
}
case class RoundJson(pairings: List[RoundJsonPairing])
implicit val pairingPlayerReads = Json.reads[PairingPlayer]
implicit val roundPairingReads = Json.reads[RoundJsonPairing]
implicit val roundReads = Json.reads[RoundJson]
case class GameJson(moves: List[String], result: Option[String]) {
def toPgn(extraTags: Tags = Tags.empty) = {
val strMoves = moves.map(_ split ' ') map { move =>
chess.format.pgn.Move(
san = ~move.headOption,
secondsLeft = move.lift(1).map(_.takeWhile(_.isDigit)) flatMap (_.toIntOption)
)
} mkString " "
s"$extraTags\n\n$strMoves"
}
}
implicit val gameReads = Json.reads[GameJson]
}
object multiPgnToGames {
import scala.util.{ Failure, Success, Try }
def apply(multiPgn: MultiPgn): Fu[Vector[RelayGame]] =
multiPgn.value
.foldLeft[Try[(Vector[RelayGame], Int)]](Success(Vector.empty -> 0)) {
case (Success((acc, index)), pgn) =>
pgnCache.get(pgn) flatMap { f =>
val game = f(index)
if (game.isEmpty) Failure(LilaException(s"Found an empty PGN at index $index"))
else Success((acc :+ game, index + 1))
}
case (acc, _) => acc
}
.future
.dmap(_._1)
private val pgnCache: LoadingCache[String, Try[Int => RelayGame]] = CacheApi.scaffeineNoScheduler
.expireAfterAccess(2 minutes)
.maximumSize(512)
.build(compute)
private def compute(pgn: String): Try[Int => RelayGame] =
lila.study
.PgnImport(pgn, Nil)
.fold(
err => Failure(LilaException(err)),
res =>
Success(index =>
RelayGame(
index = index,
tags = res.tags,
variant = res.variant,
root = res.root.copy(
comments = Comments.empty,
children = res.root.children.updateMainline(_.copy(comments = Comments.empty))
),
end = res.end
)
)
)
}
}