on start, batch load round games from DB
parent
a217b16c9b
commit
8fdc038ae7
|
@ -5,7 +5,7 @@ import chess.{ Color, Status }
|
|||
import org.joda.time.DateTime
|
||||
import reactivemongo.akkastream.{ cursorProducer, AkkaStreamCursor }
|
||||
import reactivemongo.api.commands.WriteResult
|
||||
import reactivemongo.api.{ ReadPreference, WriteConcern }
|
||||
import reactivemongo.api.{ Cursor, ReadPreference, WriteConcern }
|
||||
|
||||
import lila.common.ThreadLocalRandom
|
||||
import lila.db.BSON.BSONJodaDateTimeHandler
|
||||
|
@ -155,6 +155,8 @@ final class GameRepo(val coll: Coll)(implicit ec: scala.concurrent.ExecutionCont
|
|||
): AkkaStreamCursor[Game] =
|
||||
coll.find(selector).sort(sort).batchSize(batchSize).cursor[Game](readPreference)
|
||||
|
||||
def byIdsCursor(ids: Iterable[Game.ID]): Cursor[Game] = coll.find($inIds(ids)).cursor[Game]()
|
||||
|
||||
def goBerserk(pov: Pov): Funit =
|
||||
coll.update
|
||||
.one(
|
||||
|
|
|
@ -47,6 +47,18 @@ final class AsyncActorConcMap[D <: AsyncActor](
|
|||
|
||||
def size: Int = asyncActors.size()
|
||||
|
||||
def loadOrTell(id: String, load: () => D, tell: D => Unit): Unit =
|
||||
asyncActors
|
||||
.compute(
|
||||
id,
|
||||
(_, a) =>
|
||||
Option(a).fold(load()) { present =>
|
||||
tell(present)
|
||||
present
|
||||
}
|
||||
)
|
||||
.unit
|
||||
|
||||
def terminate(id: String, lastWill: AsyncActor => Unit): Unit =
|
||||
asyncActors
|
||||
.computeIfPresent(
|
||||
|
|
|
@ -10,7 +10,8 @@ import lila.game.{ Game, GameRepo, Pov, Progress }
|
|||
// NOT thread safe
|
||||
final private class GameProxy(
|
||||
id: Game.ID,
|
||||
dependencies: GameProxy.Dependencies
|
||||
dependencies: GameProxy.Dependencies,
|
||||
private[this] var cache: Fu[Option[Game]]
|
||||
)(implicit ec: scala.concurrent.ExecutionContext) {
|
||||
|
||||
import GameProxy._
|
||||
|
@ -94,10 +95,6 @@ final private class GameProxy(
|
|||
dirtyProgress = none
|
||||
}
|
||||
}
|
||||
|
||||
private[this] var cache: Fu[Option[Game]] = fetch
|
||||
|
||||
private[this] def fetch = gameRepo game id recoverDefault none
|
||||
}
|
||||
|
||||
private object GameProxy {
|
||||
|
|
|
@ -30,7 +30,8 @@ import lila.user.User
|
|||
final private[round] class RoundAsyncActor(
|
||||
dependencies: RoundAsyncActor.Dependencies,
|
||||
gameId: Game.ID,
|
||||
socketSend: String => Unit
|
||||
socketSend: String => Unit,
|
||||
private var version: SocketVersion
|
||||
)(implicit
|
||||
ec: scala.concurrent.ExecutionContext,
|
||||
proxy: GameProxy
|
||||
|
@ -42,8 +43,6 @@ final private[round] class RoundAsyncActor(
|
|||
|
||||
private var takebackSituation: Option[TakebackSituation] = None
|
||||
|
||||
private var version = SocketVersion(0)
|
||||
|
||||
private var mightBeSimul = true // until proven otherwise
|
||||
|
||||
final private class Player(color: Color) {
|
||||
|
|
|
@ -7,7 +7,7 @@ import chess.format.Uci
|
|||
import chess.{ Black, Centis, Color, MoveMetrics, Speed, White }
|
||||
import play.api.libs.json._
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.{ ExecutionContext, Promise }
|
||||
|
||||
import lila.chat.{ BusChan, Chat }
|
||||
import lila.common.{ Bus, IpAddress, Lilakka }
|
||||
|
@ -22,6 +22,7 @@ import lila.room.RoomSocket.{ Protocol => RP, _ }
|
|||
import lila.socket.RemoteSocket.{ Protocol => P, _ }
|
||||
import lila.socket.Socket.{ makeMessage, SocketVersion }
|
||||
import lila.user.User
|
||||
import reactivemongo.api.Cursor
|
||||
|
||||
final class RoundSocket(
|
||||
remoteSocketApi: lila.socket.RemoteSocket,
|
||||
|
@ -70,27 +71,31 @@ final class RoundSocket(
|
|||
}
|
||||
|
||||
val rounds = new AsyncActorConcMap[RoundAsyncActor](
|
||||
mkAsyncActor = id => {
|
||||
val proxy = new GameProxy(id, proxyDependencies)
|
||||
val asyncActor = new RoundAsyncActor(
|
||||
dependencies = roundDependencies,
|
||||
gameId = id,
|
||||
socketSend = sendForGameId(id)
|
||||
)(ec, proxy)
|
||||
terminationDelay schedule Game.Id(id)
|
||||
asyncActor.getGame dforeach {
|
||||
_ foreach { game =>
|
||||
scheduleExpiration(game)
|
||||
goneWeightsFor(game) dforeach { w =>
|
||||
asyncActor ! RoundAsyncActor.SetGameInfo(game, w)
|
||||
}
|
||||
}
|
||||
}
|
||||
asyncActor
|
||||
},
|
||||
mkAsyncActor = id =>
|
||||
makeRoundActor(id, SocketVersion(0), roundDependencies.gameRepo game id recoverDefault none),
|
||||
initialCapacity = 65536
|
||||
)
|
||||
|
||||
private def makeRoundActor(id: Game.ID, version: SocketVersion, gameFu: Fu[Option[Game]]) = {
|
||||
val proxy = new GameProxy(id, proxyDependencies, gameFu)
|
||||
val roundActor = new RoundAsyncActor(
|
||||
dependencies = roundDependencies,
|
||||
gameId = id,
|
||||
socketSend = sendForGameId(id),
|
||||
version = version
|
||||
)(ec, proxy)
|
||||
terminationDelay schedule Game.Id(id)
|
||||
gameFu dforeach {
|
||||
_ foreach { game =>
|
||||
scheduleExpiration(game)
|
||||
goneWeightsFor(game) dforeach { w =>
|
||||
roundActor ! RoundAsyncActor.SetGameInfo(game, w)
|
||||
}
|
||||
}
|
||||
}
|
||||
roundActor
|
||||
}
|
||||
|
||||
private def tellRound(gameId: Game.Id, msg: Any): Unit = rounds.tell(gameId.value, msg)
|
||||
|
||||
private lazy val roundHandler: Handler = {
|
||||
|
@ -143,9 +148,7 @@ final class RoundSocket(
|
|||
case P.In.TellSri(sri, userId, tpe, msg) => // eval cache
|
||||
Bus.publish(TellSriIn(sri.value, userId, msg), s"remoteSocketIn:$tpe")
|
||||
case RP.In.SetVersions(versions) =>
|
||||
versions foreach { case (roomId, version) =>
|
||||
rounds.tell(roomId, SetVersion(version))
|
||||
}
|
||||
preloadRoundsWithVersions(versions)
|
||||
send(Protocol.Out.versioningReady)
|
||||
case P.In.Ping(id) => send(P.Out.pong(id))
|
||||
case P.In.WsBoot =>
|
||||
|
@ -211,6 +214,55 @@ final class RoundSocket(
|
|||
}
|
||||
|
||||
private val terminationDelay = new TerminationDelay(system.scheduler, 1 minute, finishRound)
|
||||
|
||||
// on startup we get all ongoing game IDs and versions from lila-ws
|
||||
// load them into round actors with batched DB queries
|
||||
private def preloadRoundsWithVersions(rooms: Iterable[(Game.ID, SocketVersion)]) = {
|
||||
|
||||
// load all actors synchronously, giving them game futures from promises we'll fulfill later
|
||||
val gamePromises: Map[Game.ID, Promise[Option[Game]]] = rooms.view.map { case (id, version) =>
|
||||
val promise = Promise[Option[Game]]()
|
||||
rounds.loadOrTell(
|
||||
id,
|
||||
load = () => makeRoundActor(id, version, promise.future),
|
||||
tell = _ ! SetVersion(version)
|
||||
)
|
||||
id -> promise
|
||||
}.toMap
|
||||
|
||||
// fullfill the promises with batched DB requests
|
||||
rooms
|
||||
.map(_._1)
|
||||
.grouped(1024)
|
||||
.map { ids =>
|
||||
roundDependencies.gameRepo
|
||||
.byIdsCursor(ids)
|
||||
.foldWhile[Set[Game.ID]](Set.empty[Game.ID])(
|
||||
(ids, game) =>
|
||||
Cursor.Cont[Set[Game.ID]] {
|
||||
gamePromises.get(game.id).foreach(_ success game.some)
|
||||
ids + game.id
|
||||
},
|
||||
Cursor.ContOnError { (_, err) => logger.error("Can't load round game", err) }
|
||||
) addEffect { loadedIds =>
|
||||
logger.info(s"Loaded ${loadedIds.size}/${ids.size} round games")
|
||||
} recover { case e: Exception =>
|
||||
logger.error(s"Can't load ${ids.size} round games", e)
|
||||
Set.empty
|
||||
}
|
||||
}
|
||||
.sequenceFu
|
||||
.map(_.flatten)
|
||||
.addEffect { loadedIds =>
|
||||
val missingIds = gamePromises.keySet -- loadedIds
|
||||
if (missingIds.nonEmpty) {
|
||||
logger.warn(s"${missingIds.size} round games could not be loaded")
|
||||
missingIds.foreach { id =>
|
||||
gamePromises.get(id).foreach(_ success none)
|
||||
}
|
||||
} else logger.info("All round games loaded")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object RoundSocket {
|
||||
|
|
Loading…
Reference in New Issue