first implementation for analysis cloud eval live upgrade
parent
6134395eb0
commit
043ff67686
|
@ -11,11 +11,8 @@ private[analyse] final class AnalyseSocketHandler(
|
|||
|
||||
import AnalyseSocket._
|
||||
|
||||
private def controller(member: Member, user: Option[User]): Handler.Controller =
|
||||
evalCacheHandler(member, user)
|
||||
|
||||
def join(uid: Socket.Uid, user: Option[User]): Fu[JsSocketHandler] =
|
||||
Handler(hub, socket, uid, Join(uid, user.map(_.id))) {
|
||||
case Connected(enum, member) => (controller(member, user), enum, member)
|
||||
case Connected(enum, member) => (evalCacheHandler(uid, member, user), enum, member)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import com.typesafe.config.Config
|
|||
final class Env(
|
||||
config: Config,
|
||||
db: lila.db.Env,
|
||||
system: akka.actor.ActorSystem,
|
||||
asyncCache: lila.memo.AsyncCache.Builder
|
||||
) {
|
||||
|
||||
|
@ -12,17 +13,27 @@ final class Env(
|
|||
|
||||
private lazy val truster = new EvalCacheTruster
|
||||
|
||||
private lazy val upgrade = new EvalCacheUpgrade(
|
||||
asyncCache = asyncCache
|
||||
)
|
||||
|
||||
lazy val api = new EvalCacheApi(
|
||||
coll = db(CollectionEvalCache),
|
||||
truster = truster,
|
||||
upgrade = upgrade,
|
||||
asyncCache = asyncCache
|
||||
)
|
||||
|
||||
lazy val socketHandler = new EvalCacheSocketHandler(
|
||||
api = api,
|
||||
truster = truster
|
||||
truster = truster,
|
||||
upgrade = upgrade
|
||||
)
|
||||
|
||||
system.lilaBus.subscribeFun('socketDoor) {
|
||||
case lila.socket.actorApi.SocketLeave(uid, _) => upgrade unregister uid
|
||||
}
|
||||
|
||||
def cli = new lila.common.Cli {
|
||||
def process = {
|
||||
case "eval-cache" :: "drop" :: fenParts =>
|
||||
|
@ -36,6 +47,7 @@ object Env {
|
|||
lazy val current: Env = "evalCache" boot new Env(
|
||||
config = lila.common.PlayApp loadConfig "evalCache",
|
||||
db = lila.db.Env.current,
|
||||
system = lila.common.PlayApp.system,
|
||||
asyncCache = lila.memo.Env.current.asyncCache
|
||||
)
|
||||
}
|
||||
|
|
|
@ -7,10 +7,12 @@ import scala.concurrent.duration._
|
|||
import chess.format.{ FEN, Forsyth }
|
||||
import chess.variant.Variant
|
||||
import lila.db.dsl._
|
||||
import lila.socket.Socket
|
||||
|
||||
final class EvalCacheApi(
|
||||
coll: Coll,
|
||||
truster: EvalCacheTruster,
|
||||
upgrade: EvalCacheUpgrade,
|
||||
asyncCache: lila.memo.AsyncCache.Builder
|
||||
) {
|
||||
|
||||
|
@ -21,15 +23,15 @@ final class EvalCacheApi(
|
|||
id = Id(variant, SmallFen.make(variant, fen)),
|
||||
multiPv = multiPv
|
||||
) map {
|
||||
_.map { lila.evalCache.JsonHandlers.writeEval(_, fen) }
|
||||
_.map { JsonHandlers.writeEval(_, fen) }
|
||||
} addEffect { res =>
|
||||
Forsyth getPly fen.value foreach { ply =>
|
||||
lila.mon.evalCache.register(ply, res.isDefined)
|
||||
}
|
||||
}
|
||||
|
||||
def put(trustedUser: TrustedUser, candidate: Input.Candidate): Funit =
|
||||
candidate.input ?? { put(trustedUser, _) }
|
||||
def put(trustedUser: TrustedUser, candidate: Input.Candidate, uid: Socket.Uid): Funit =
|
||||
candidate.input ?? { put(trustedUser, _, uid) }
|
||||
|
||||
def shouldPut = truster shouldPut _
|
||||
|
||||
|
@ -60,7 +62,7 @@ final class EvalCacheApi(
|
|||
if (res.isDefined) coll.updateFieldUnchecked($id(id), "usedAt", DateTime.now)
|
||||
}
|
||||
|
||||
private def put(trustedUser: TrustedUser, input: Input): Funit = Validator(input) match {
|
||||
private def put(trustedUser: TrustedUser, input: Input, uid: Socket.Uid): Funit = Validator(input) match {
|
||||
case Some(error) =>
|
||||
logger.info(s"Invalid from ${trustedUser.user.username} $error ${input.fen}")
|
||||
funit
|
||||
|
@ -73,12 +75,14 @@ final class EvalCacheApi(
|
|||
usedAt = DateTime.now
|
||||
)
|
||||
coll.insert(entry).recover(lila.db.recoverDuplicateKey(_ => ())) >>-
|
||||
cache.put(input.id, entry.some)
|
||||
cache.put(input.id, entry.some) >>-
|
||||
upgrade.onEval(input, uid)
|
||||
case Some(oldEntry) =>
|
||||
val entry = oldEntry add input.eval
|
||||
!(entry similarTo oldEntry) ?? {
|
||||
coll.update($id(entry.fen), entry, upsert = true).void >>-
|
||||
cache.put(input.id, entry.some)
|
||||
coll.update($id(entry.id), entry, upsert = true).void >>-
|
||||
cache.put(input.id, entry.some) >>-
|
||||
upgrade.onEval(input, uid)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import org.joda.time.DateTime
|
|||
import scalaz.NonEmptyList
|
||||
|
||||
import lila.tree.Eval.Score
|
||||
import lila.user.User
|
||||
|
||||
case class EvalCacheEntry(
|
||||
_id: EvalCacheEntry.Id,
|
||||
|
@ -16,7 +17,7 @@ case class EvalCacheEntry(
|
|||
|
||||
import EvalCacheEntry._
|
||||
|
||||
def fen = _id
|
||||
def id = _id
|
||||
|
||||
def add(eval: Eval) = copy(
|
||||
evals = EvalCacheSelector(eval :: evals),
|
||||
|
@ -31,7 +32,7 @@ case class EvalCacheEntry(
|
|||
.map(_ takePvs multiPv)
|
||||
|
||||
def similarTo(other: EvalCacheEntry) =
|
||||
fen == other.fen && evals == other.evals
|
||||
id == other.id && evals == other.evals
|
||||
}
|
||||
|
||||
object EvalCacheEntry {
|
||||
|
@ -46,7 +47,7 @@ object EvalCacheEntry {
|
|||
pvs: NonEmptyList[Pv],
|
||||
knodes: Knodes,
|
||||
depth: Int,
|
||||
by: lila.user.User.ID,
|
||||
by: User.ID,
|
||||
trust: Trust
|
||||
) {
|
||||
|
||||
|
@ -98,13 +99,13 @@ object EvalCacheEntry {
|
|||
def isTooLow = value <= 0
|
||||
}
|
||||
|
||||
case class TrustedUser(trust: Trust, user: lila.user.User)
|
||||
case class TrustedUser(trust: Trust, user: User)
|
||||
|
||||
final class SmallFen private (val value: String) extends AnyVal with StringValue
|
||||
|
||||
object SmallFen {
|
||||
private[evalCache] def raw(str: String) = new SmallFen(str)
|
||||
def make(variant: Variant, fen: FEN) = {
|
||||
def make(variant: Variant, fen: FEN): SmallFen = {
|
||||
val base = fen.value.split(' ').take(4).mkString("").filter { c =>
|
||||
c != '/' && c != '-' && c != 'w'
|
||||
}
|
||||
|
|
|
@ -8,30 +8,38 @@ import lila.user.User
|
|||
|
||||
final class EvalCacheSocketHandler(
|
||||
api: EvalCacheApi,
|
||||
truster: EvalCacheTruster
|
||||
truster: EvalCacheTruster,
|
||||
upgrade: EvalCacheUpgrade
|
||||
) {
|
||||
|
||||
import EvalCacheEntry._
|
||||
|
||||
def apply(member: SocketMember, user: Option[User]): Handler.Controller =
|
||||
makeController(member, user map truster.makeTrusted)
|
||||
def apply(uid: Socket.Uid, member: SocketMember, user: Option[User]): Handler.Controller =
|
||||
makeController(uid, member, user map truster.makeTrusted)
|
||||
|
||||
private def makeController(member: SocketMember, trustedUser: Option[TrustedUser]): Handler.Controller = {
|
||||
private def makeController(
|
||||
uid: Socket.Uid,
|
||||
member: SocketMember,
|
||||
trustedUser: Option[TrustedUser]
|
||||
): Handler.Controller = {
|
||||
|
||||
case ("evalPut", o) => trustedUser foreach { tu =>
|
||||
JsonHandlers.readPut(tu, o) foreach { api.put(tu, _) }
|
||||
JsonHandlers.readPut(tu, o) foreach { api.put(tu, _, uid) }
|
||||
}
|
||||
|
||||
case ("evalGet", o) => for {
|
||||
d <- o obj "d"
|
||||
variant = chess.variant.Variant orDefault ~d.str("variant")
|
||||
fen <- d str "fen"
|
||||
fen <- d str "fen" map FEN.apply
|
||||
multiPv = (d int "mpv") | 1
|
||||
path <- d str "path"
|
||||
} api.getEvalJson(variant, FEN(fen), multiPv) foreach {
|
||||
_ foreach { json =>
|
||||
member push Socket.makeMessage("evalHit", json + ("path" -> JsString(path)))
|
||||
} {
|
||||
api.getEvalJson(variant, fen, multiPv) foreach {
|
||||
_ foreach { json =>
|
||||
member push Socket.makeMessage("evalHit", json + ("path" -> JsString(path)))
|
||||
}
|
||||
}
|
||||
upgrade.register(uid, member, variant, fen, multiPv, path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
package lila.evalCache
|
||||
|
||||
import play.api.libs.json.JsString
|
||||
import scala.collection.mutable.AnyRefMap
|
||||
|
||||
import chess.format.FEN
|
||||
import chess.variant.Variant
|
||||
import lila.socket.{ Socket, SocketMember }
|
||||
|
||||
/* Upgrades the user's eval when a better one becomes available,
|
||||
* by remembering the last evalGet of each socket member,
|
||||
* and listening to new evals stored.
|
||||
*/
|
||||
private final class EvalCacheUpgrade(
|
||||
asyncCache: lila.memo.AsyncCache.Builder
|
||||
) {
|
||||
import EvalCacheUpgrade._
|
||||
|
||||
private val members = AnyRefMap.empty[UidString, WatchingMember]
|
||||
private val evals = AnyRefMap.empty[SetupId, Set[UidString]]
|
||||
|
||||
def register(uid: Socket.Uid, member: SocketMember, variant: Variant, fen: FEN, multiPv: Int, path: String): Unit = {
|
||||
members get uid.value foreach { wm =>
|
||||
unregisterEval(wm.setupId, uid)
|
||||
}
|
||||
val setupId = makeSetupId(variant, fen, multiPv)
|
||||
members += (uid.value -> WatchingMember(member, setupId, path))
|
||||
evals += (setupId -> (~evals.get(setupId) + uid.value))
|
||||
println("---------------------------------")
|
||||
println(members, "members")
|
||||
println(evals, "evals")
|
||||
}
|
||||
|
||||
def onEval(input: EvalCacheEntry.Input, uid: Socket.Uid): Unit = {
|
||||
(1 to input.eval.multiPv) flatMap { multiPv =>
|
||||
evals get makeSetupId(input.id.variant, input.fen, multiPv)
|
||||
} foreach { uids =>
|
||||
val wms = uids.filter(uid.value !=) flatMap members.get
|
||||
if (wms.nonEmpty) {
|
||||
val json = JsonHandlers.writeEval(input.eval, input.fen)
|
||||
wms foreach { wm =>
|
||||
wm.member push Socket.makeMessage("evalHit", json + ("path" -> JsString(wm.path)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def unregister(uid: Socket.Uid): Unit = members get uid.value foreach { wm =>
|
||||
unregisterEval(wm.setupId, uid)
|
||||
members -= uid.value
|
||||
}
|
||||
|
||||
private def unregisterEval(setupId: SetupId, uid: Socket.Uid): Unit =
|
||||
evals get setupId foreach { uids =>
|
||||
val newUids = uids - uid.value
|
||||
if (newUids.isEmpty) evals -= setupId
|
||||
else evals += (setupId -> newUids)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private object EvalCacheUpgrade {
|
||||
|
||||
private type UidString = String
|
||||
private type SetupId = String
|
||||
|
||||
private def makeSetupId(variant: Variant, fen: FEN, multiPv: Int): SetupId =
|
||||
s"${variant.id}${EvalCacheEntry.SmallFen.make(variant, fen).value}^$multiPv"
|
||||
|
||||
private case class WatchingMember(member: SocketMember, setupId: SetupId, path: String)
|
||||
}
|
|
@ -54,7 +54,7 @@ private[round] final class SocketHandler(
|
|||
case ("talk", o) => o str "d" foreach { messenger.watcher(gameId, member, _) }
|
||||
case ("outoftime", _) => send(QuietFlag) // mobile app BC
|
||||
case ("flag", o) => clientFlag(o, none) foreach send
|
||||
}: Handler.Controller) orElse evalCacheHandler(member, me) orElse lila.chat.Socket.in(
|
||||
}: Handler.Controller) orElse evalCacheHandler(uid, member, me) orElse lila.chat.Socket.in(
|
||||
chatId = Chat.Id(s"$gameId/w"),
|
||||
member = member,
|
||||
socket = socket,
|
||||
|
|
|
@ -247,7 +247,7 @@ final class SocketHandler(
|
|||
chapterId <- o.get[Chapter.Id]("d")
|
||||
} api.analysisRequest(studyId, chapterId, byUserId)
|
||||
|
||||
}: Handler.Controller) orElse evalCacheHandler(member, user) orElse lila.chat.Socket.in(
|
||||
}: Handler.Controller) orElse evalCacheHandler(uid, member, user) orElse lila.chat.Socket.in(
|
||||
chatId = Chat.Id(studyId.value),
|
||||
member = member,
|
||||
socket = socket,
|
||||
|
|
Loading…
Reference in New Issue