dispatch AI work between remotes
This commit is contained in:
parent
c17a8f13d7
commit
c288f85e9a
|
@ -12,12 +12,12 @@ private[app] final class AiStresser(env: lila.ai.Env, system: ActorSystem) {
|
|||
|
||||
def apply {
|
||||
|
||||
(1 to 12) foreach { i ⇒
|
||||
(1 to 10) foreach { i ⇒
|
||||
system.scheduler.scheduleOnce((i * 97) millis) {
|
||||
play(i % 8 + 1, true)
|
||||
}
|
||||
}
|
||||
(1 to 2) foreach { i ⇒
|
||||
(1 to 3) foreach { i ⇒
|
||||
system.scheduler.scheduleOnce((i * 131) millis) {
|
||||
analyse(true)
|
||||
}
|
||||
|
@ -33,12 +33,11 @@ private[app] final class AiStresser(env: lila.ai.Env, system: ActorSystem) {
|
|||
|
||||
def receive = {
|
||||
case Game(moves, it) if it >= moves.size ⇒ {
|
||||
loginfo("play complete")
|
||||
if (loop) newGame pipeTo self
|
||||
}
|
||||
case Game(moves, it) ⇒
|
||||
ai.play(moves take it mkString " ", none, level).effectFold(e ⇒ {
|
||||
logwarn("[ai] server play: " + e)
|
||||
ai.getMove(moves take it mkString " ", none, level).effectFold(e ⇒ {
|
||||
logwarn("[ai] play: " + e)
|
||||
newGame pipeTo self
|
||||
}, { _ ⇒
|
||||
system.scheduler.scheduleOnce(randomize(1 second)) {
|
||||
|
@ -79,7 +78,7 @@ private[app] final class AiStresser(env: lila.ai.Env, system: ActorSystem) {
|
|||
private def randomize(d: FiniteDuration, ratio: Float = 0.1f): FiniteDuration =
|
||||
approximatly(ratio)(d.toMillis) millis
|
||||
|
||||
private val ai = env.stockfishServer
|
||||
private val ai = env.stockfishClient
|
||||
|
||||
private case class Game(moves: List[String], it: Int)
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ final class Env(
|
|||
playMaxMoveTime = config duration "stockfish.play.movetime",
|
||||
analyseMoveTime = config duration "stockfish.analyse.movetime",
|
||||
playTimeout = config duration "stockfish.play.timeout",
|
||||
loadTimeout = config duration "stockfish.load.timeout",
|
||||
analyseTimeout = config duration "stockfish.analyse.timeout",
|
||||
debug = config getBoolean "stockfish.debug")
|
||||
|
||||
|
@ -59,17 +60,19 @@ final class Env(
|
|||
|
||||
private lazy val stockfishAi = new stockfish.Ai(stockfishServer)
|
||||
|
||||
private lazy val stockfishClient = new stockfish.Client(
|
||||
lazy val stockfishClient = new stockfish.Client(
|
||||
dispatcher = system.actorOf(
|
||||
Props(new stockfish.remote.Dispatcher(
|
||||
urls = StockfishRemotes,
|
||||
config = stockfishConfig,
|
||||
router = stockfish.remote.Router(
|
||||
playRoute = StockfishPlayRoute,
|
||||
analyseRoute = StockfishAnalyseRoute,
|
||||
loadRoute = StockfishLoadRoute) _,
|
||||
scheduler = system.scheduler
|
||||
)), name = "stockfish-dispatcher"),
|
||||
fallback = stockfishAi)
|
||||
fallback = stockfishAi,
|
||||
config = stockfishConfig)
|
||||
|
||||
lazy val stockfishServer = new stockfish.Server(
|
||||
queue = stockfishQueue,
|
||||
|
|
|
@ -16,24 +16,32 @@ import lila.hub.actorApi.ai.GetLoad
|
|||
|
||||
final class Client(
|
||||
dispatcher: ActorRef,
|
||||
fallback: lila.ai.Ai) extends lila.ai.Ai {
|
||||
fallback: lila.ai.Ai,
|
||||
config: Config) extends lila.ai.Ai {
|
||||
|
||||
private implicit val timeout = makeTimeout minutes 60
|
||||
|
||||
def play(game: Game, pgn: String, initialFen: Option[String], level: Int): Fu[(Game, Move)] =
|
||||
dispatcher ? Play(pgn, ~initialFen, level) mapTo manifest[String] flatMap {
|
||||
def play(game: Game, pgn: String, initialFen: Option[String], level: Int): Fu[(Game, Move)] = {
|
||||
getMove(pgn, initialFen, level) flatMap {
|
||||
Stockfish.applyMove(game, pgn, _)
|
||||
} recoverWith {
|
||||
case e: Exception ⇒ fallback.play(game, pgn, initialFen, level)
|
||||
}
|
||||
}
|
||||
def getMove(pgn: String, initialFen: Option[String], level: Int): Fu[String] = {
|
||||
implicit val timeout = makeTimeout(config.playTimeout)
|
||||
dispatcher ? Play(pgn, ~initialFen, level) mapTo manifest[String]
|
||||
}
|
||||
|
||||
def analyse(pgn: String, initialFen: Option[String]): Fu[AnalysisMaker] =
|
||||
def analyse(pgn: String, initialFen: Option[String]): Fu[AnalysisMaker] = {
|
||||
implicit val timeout = makeTimeout(config.analyseTimeout)
|
||||
dispatcher ? Analyse(pgn, ~initialFen) mapTo manifest[String] flatMap { str ⇒
|
||||
(AnalysisMaker(str, true) toValid "Can't read analysis results").future
|
||||
} recoverWith {
|
||||
case e: Exception ⇒ fallback.analyse(pgn, initialFen)
|
||||
}
|
||||
}
|
||||
|
||||
def load: Fu[List[Option[Int]]] =
|
||||
def load: Fu[List[Option[Int]]] = {
|
||||
implicit val timeout = makeTimeout(config.loadTimeout)
|
||||
dispatcher ? GetLoad mapTo manifest[List[Option[Int]]]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ private[ai] case class Config(
|
|||
analyseMoveTime: FiniteDuration,
|
||||
playTimeout: FiniteDuration,
|
||||
analyseTimeout: FiniteDuration,
|
||||
loadTimeout: FiniteDuration,
|
||||
debug: Boolean) {
|
||||
|
||||
import Config._
|
||||
|
|
|
@ -14,7 +14,7 @@ private[ai] final class Monitor(queue: ActorRef) extends Actor {
|
|||
|
||||
private var times = Map[Long, Int]()
|
||||
|
||||
private val loadPeriodMillis = 10000
|
||||
private val loadPeriodMillis = 5000
|
||||
|
||||
def receive = {
|
||||
|
||||
|
|
|
@ -3,6 +3,8 @@ package stockfish
|
|||
package remote
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Future
|
||||
import scala.util.{ Try, Success, Failure }
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.{ ask, pipe }
|
||||
|
@ -11,7 +13,10 @@ import actorApi._
|
|||
import lila.common.ws.WS
|
||||
import lila.hub.actorApi.ai.GetLoad
|
||||
|
||||
private[ai] final class Connection(router: Router) extends Actor {
|
||||
private[ai] final class Connection(
|
||||
name: String,
|
||||
config: Config,
|
||||
router: Router) extends Actor {
|
||||
|
||||
private var load = none[Int]
|
||||
|
||||
|
@ -19,15 +24,10 @@ private[ai] final class Connection(router: Router) extends Actor {
|
|||
|
||||
case GetLoad ⇒ sender ! load
|
||||
|
||||
case CalculateLoad ⇒ scala.concurrent.Future {
|
||||
try {
|
||||
load = WS.url(router.load).get() map (_.body) map parseIntOption await makeTimeout.seconds(1)
|
||||
}
|
||||
catch {
|
||||
case e: Exception ⇒ {
|
||||
// logwarn("[stockfish client calculate load] " + e.getMessage)
|
||||
load = none
|
||||
}
|
||||
case CalculateLoad ⇒ Future {
|
||||
Try(WS.url(router.load).get() map (_.body) map parseIntOption await makeTimeout(config.loadTimeout)) match {
|
||||
case Success(l) ⇒ load = l
|
||||
case Failure(e) ⇒ load = none
|
||||
}
|
||||
}
|
||||
case Play(pgn, fen, level) ⇒ WS.url(router.play).withQueryString(
|
||||
|
|
|
@ -3,49 +3,76 @@ package stockfish
|
|||
package remote
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Random
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.{ ask, pipe }
|
||||
import akka.util.Timeout
|
||||
|
||||
import actorApi._
|
||||
import lila.hub.actorApi.ai.GetLoad
|
||||
|
||||
private[ai] final class Dispatcher(
|
||||
urls: List[String],
|
||||
config: Config,
|
||||
router: String ⇒ Router,
|
||||
scheduler: Scheduler) extends Actor {
|
||||
|
||||
private var lastAnalysis = 0
|
||||
private var connectionsWithLoad: List[(ActorRef, Option[Int])] = Nil
|
||||
|
||||
def receive = {
|
||||
|
||||
case CalculateLoad ⇒ {
|
||||
implicit val timeout = makeTimeout(config.loadTimeout)
|
||||
connections map { c ⇒
|
||||
c ? GetLoad mapTo manifest[Option[Int]] map { l ⇒ List(c -> l) }
|
||||
}
|
||||
}.suml foreach { connectionsWithLoad = _ }
|
||||
|
||||
case GetLoad ⇒ sender ! connectionsWithLoad.map(_._2)
|
||||
|
||||
case x: Play ⇒ {
|
||||
val connection = (connectionsWithLoad collect {
|
||||
case (a, Some(l)) ⇒ a -> (Random nextInt math.max(1, l))
|
||||
}).sortBy(x ⇒ x._2).headOption.map(_._1)
|
||||
forward(connection, x, sender)(makeTimeout(config.playTimeout))
|
||||
}
|
||||
|
||||
case x: Analyse ⇒ {
|
||||
implicit val timeout = makeTimeout(config.analyseTimeout)
|
||||
val (xs, nb, la) = (connectionsWithLoad, connectionsWithLoad.size, lastAnalysis)
|
||||
val index = (la + 1) to (la + nb) map (_ % nb) find { index ⇒
|
||||
xs lift index exists (_._2.isDefined)
|
||||
}
|
||||
val connection = index flatMap xs.lift map (_._1)
|
||||
forward(connection, x, sender)(makeTimeout(config.analyseTimeout))
|
||||
index foreach { lastAnalysis = _ }
|
||||
}
|
||||
}
|
||||
|
||||
private def forward(to: Option[ActorRef], msg: Any, sender: ActorRef)(implicit timeout: Timeout) {
|
||||
to match {
|
||||
case None ⇒ sender ! Status.Failure(new Exception("[stockfish dispatcher] No available remote found"))
|
||||
case Some(a) ⇒ a ? msg pipeTo sender
|
||||
}
|
||||
}
|
||||
|
||||
private lazy val connections: List[ActorRef] = urls map { url ⇒
|
||||
val name = urlToActorName(url)
|
||||
context.actorOf(
|
||||
Props(new Connection(router(url))),
|
||||
name = urlToActorName(url)
|
||||
Props(new Connection(name, config, router(url))),
|
||||
name = name
|
||||
) ~ { actor ⇒
|
||||
scheduler.schedule(200.millis, 1.second, actor, CalculateLoad)
|
||||
}
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case GetLoad ⇒ loaded map { _ map { _._2 } } pipeTo sender
|
||||
case x: Play ⇒ forward(x)
|
||||
case x: Analyse ⇒ forward(x)
|
||||
override def preStart {
|
||||
scheduler.schedule(0.second, 1.second, self, CalculateLoad)
|
||||
}
|
||||
|
||||
private def forward(x: Any) = lessLoadedConnection.effectFold(
|
||||
e ⇒ sender ! Status.Failure(e),
|
||||
c ⇒ c forward x)
|
||||
|
||||
private def loaded: Fu[List[(ActorRef, Option[Int])]] = {
|
||||
import makeTimeout.short
|
||||
connections map { c ⇒
|
||||
c ? GetLoad mapTo manifest[Option[Int]] map { l ⇒ List(c -> l) }
|
||||
}
|
||||
}.suml
|
||||
|
||||
private def lessLoadedConnection: Fu[ActorRef] = loaded map { xs ⇒
|
||||
(xs collect {
|
||||
case (a, Some(l)) ⇒ a -> l
|
||||
}).sortBy(x ⇒ x._2).headOption.map(_._1)
|
||||
} flatten s"[stockfish] No available remote found"
|
||||
private lazy val noRemote = new Exception("[stockfish dispatcher] No available remote found")
|
||||
|
||||
private val urlRegex = """^https?://([^\/]+)/.+$""".r
|
||||
private def urlToActorName(url: String) = url match {
|
||||
|
|
Loading…
Reference in a new issue