rewrite Lobby as LobbyTrouper

roundTrouper
Thibault Duplessis 2018-12-09 08:59:46 +07:00
parent d6e27e5838
commit 89c1c9f01f
10 changed files with 54 additions and 68 deletions

View File

@ -324,7 +324,6 @@ lobby {
socket { socket {
uid.ttl = ${site.socket.uid.ttl} uid.ttl = ${site.socket.uid.ttl}
} }
actor.name = lobby-actor
net.domain = ${net.domain} net.domain = ${net.domain}
broom_period = 2 seconds broom_period = 2 seconds
resync_ids_period = 25 seconds resync_ids_period = 25 seconds
@ -646,7 +645,6 @@ hub {
user = ${timeline.user.actor.name} user = ${timeline.user.actor.name}
} }
bookmark = ${bookmark.actor.name} bookmark = ${bookmark.actor.name}
lobby = ${lobby.actor.name}
relation = ${relation.actor.name} relation = ${relation.actor.name}
report = ${report.actor.name} report = ${report.actor.name}
shutup = ${shutup.actor.name} shutup = ${shutup.actor.name}

View File

@ -292,6 +292,9 @@ object mon {
} }
def queueSize(name: String) = rec(s"socket.queue_size.$name") def queueSize(name: String) = rec(s"socket.queue_size.$name")
} }
object trouper {
def queueSize(name: String) = rec(s"trouper.queue_size.$name")
}
object mod { object mod {
object report { object report {
val unprocessed = rec("mod.report.unprocessed") val unprocessed = rec("mod.report.unprocessed")

View File

@ -14,7 +14,6 @@ final class Env(config: Config, system: ActorSystem) {
val tournamentApi = select("actor.tournament.api") val tournamentApi = select("actor.tournament.api")
val timeline = select("actor.timeline.user") val timeline = select("actor.timeline.user")
val bookmark = select("actor.bookmark") val bookmark = select("actor.bookmark")
val lobby = select("actor.lobby")
val relation = select("actor.relation") val relation = select("actor.relation")
val report = select("actor.report") val report = select("actor.report")
val shutup = select("actor.shutup") val shutup = select("actor.shutup")

View File

@ -20,7 +20,6 @@ final class Env(
private val settings = new { private val settings = new {
val NetDomain = config getString "net.domain" val NetDomain = config getString "net.domain"
val SocketUidTtl = config duration "socket.uid.ttl" val SocketUidTtl = config duration "socket.uid.ttl"
val ActorName = config getString "actor.name"
val BroomPeriod = config duration "broom_period" val BroomPeriod = config duration "broom_period"
val ResyncIdsPeriod = config duration "resync_ids_period" val ResyncIdsPeriod = config duration "resync_ids_period"
val CollectionSeek = config getString "collection.seek" val CollectionSeek = config getString "collection.seek"
@ -42,10 +41,13 @@ final class Env(
maxPerUser = SeekMaxPerUser maxPerUser = SeekMaxPerUser
) )
val lobby = Lobby.start(system, ActorName, private val lobbyTrouper = LobbyTrouper.start(
system,
broomPeriod = BroomPeriod, broomPeriod = BroomPeriod,
resyncIdsPeriod = ResyncIdsPeriod) { resyncIdsPeriod = ResyncIdsPeriod
new Lobby( ) {
new LobbyTrouper(
system = system,
socket = socket, socket = socket,
seekApi = seekApi, seekApi = seekApi,
gameCache = gameCache, gameCache = gameCache,
@ -59,7 +61,7 @@ final class Env(
lazy val socketHandler = new SocketHandler( lazy val socketHandler = new SocketHandler(
hub = hub, hub = hub,
lobby = lobby, lobby = lobbyTrouper,
socket = socket, socket = socket,
poolApi = poolApi, poolApi = poolApi,
blocking = blocking blocking = blocking

View File

@ -3,15 +3,15 @@ package lila.lobby
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Promise import scala.concurrent.Promise
import akka.actor._
import akka.pattern.{ ask, pipe }
import org.joda.time.DateTime import org.joda.time.DateTime
import actorApi._ import actorApi._
import lila.game.GameRepo import lila.game.GameRepo
import lila.hub.Trouper
import lila.socket.Socket.Uids import lila.socket.Socket.Uids
private[lobby] final class Lobby( private[lobby] final class LobbyTrouper(
system: akka.actor.ActorSystem,
socket: Socket, socket: Socket,
seekApi: SeekApi, seekApi: SeekApi,
gameCache: lila.game.Cached, gameCache: lila.game.Cached,
@ -19,26 +19,28 @@ private[lobby] final class Lobby(
blocking: String => Fu[Set[String]], blocking: String => Fu[Set[String]],
playban: String => Fu[Option[lila.playban.TempBan]], playban: String => Fu[Option[lila.playban.TempBan]],
poolApi: lila.pool.PoolApi, poolApi: lila.pool.PoolApi,
onStart: String => Unit onStart: lila.game.Game.ID => Unit
) extends Actor { ) extends Trouper {
def receive = { import LobbyTrouper._
val process: Trouper.Receive = {
case msg @ AddHook(hook) => { case msg @ AddHook(hook) => {
lila.mon.lobby.hook.create() lila.mon.lobby.hook.create()
HookRepo byUid hook.uid foreach remove HookRepo byUid hook.uid foreach remove
hook.sid ?? { sid => HookRepo bySid sid foreach remove } hook.sid ?? { sid => HookRepo bySid sid foreach remove }
(!hook.compatibleWithPools).??(findCompatible(hook)) foreach { (!hook.compatibleWithPools).??(findCompatible(hook)) foreach {
case Some(h) => self ! BiteHook(h.id, hook.uid, hook.user) case Some(h) => this ! BiteHook(h.id, hook.uid, hook.user)
case None => self ! SaveHook(msg) case None => this ! SaveHook(msg)
} }
} }
case msg @ AddSeek(seek) => case msg @ AddSeek(seek) =>
lila.mon.lobby.seek.create() lila.mon.lobby.seek.create()
findCompatible(seek) foreach { findCompatible(seek) foreach {
case Some(s) => self ! BiteSeek(s.id, seek.user) case Some(s) => this ! BiteSeek(s.id, seek.user)
case None => self ! SaveSeek(msg) case None => this ! SaveSeek(msg)
} }
case SaveHook(msg) => case SaveHook(msg) =>
@ -59,7 +61,7 @@ private[lobby] final class Lobby(
case BiteHook(hookId, uid, user) => NoPlayban(user) { case BiteHook(hookId, uid, user) => NoPlayban(user) {
HookRepo byId hookId foreach { hook => HookRepo byId hookId foreach { hook =>
HookRepo byUid uid foreach remove HookRepo byUid uid foreach remove
Biter(hook, uid, user) pipeTo self Biter(hook, uid, user) foreach this.!
} }
} }
@ -69,7 +71,7 @@ private[lobby] final class Lobby(
lila.mon.lobby.seek.join() lila.mon.lobby.seek.join()
seekApi find seekId foreach { seekApi find seekId foreach {
_ foreach { seek => _ foreach { seek =>
Biter(seek, user) pipeTo self Biter(seek, user) foreach this.!
} }
} }
} }
@ -88,7 +90,7 @@ private[lobby] final class Lobby(
socket ! RemoveSeek(seek.id) socket ! RemoveSeek(seek.id)
} }
case Lobby.Tick(promise) => case Tick(promise) =>
HookRepo.truncateIfNeeded HookRepo.truncateIfNeeded
implicit val timeout = makeTimeout seconds 5 implicit val timeout = makeTimeout seconds 5
socket.ask[Uids](GetUidsP).chronometer socket.ask[Uids](GetUidsP).chronometer
@ -96,10 +98,9 @@ private[lobby] final class Lobby(
.mon(_.lobby.socket.getUids) .mon(_.lobby.socket.getUids)
.result .result
.logFailure(logger, err => s"broom cannot get uids from socket: $err") .logFailure(logger, err => s"broom cannot get uids from socket: $err")
.map { Lobby.WithPromise(_, promise) } .foreach { this ! WithPromise(_, promise) }
.pipeTo(self)
case Lobby.WithPromise(Uids(uids), promise) => case WithPromise(Uids(uids), promise) =>
poolApi socketIds uids poolApi socketIds uids
val createdBefore = DateTime.now minusSeconds 5 val createdBefore = DateTime.now minusSeconds 5
val hooks = { val hooks = {
@ -111,10 +112,11 @@ private[lobby] final class Lobby(
// s"broom uids:${uids.size} before:${createdBefore} hooks:${hooks.map(_.id)}") // s"broom uids:${uids.size} before:${createdBefore} hooks:${hooks.map(_.id)}")
if (hooks.nonEmpty) { if (hooks.nonEmpty) {
// logger.debug(s"remove ${hooks.size} hooks") // logger.debug(s"remove ${hooks.size} hooks")
self ! RemoveHooks(hooks) this ! RemoveHooks(hooks)
} }
lila.mon.lobby.socket.member(uids.size) lila.mon.lobby.socket.member(uids.size)
lila.mon.lobby.hook.size(HookRepo.size) lila.mon.lobby.hook.size(HookRepo.size)
lila.mon.trouper.queueSize("lobby")(estimateQueueSize)
promise.success(()) promise.success(())
case RemoveHooks(hooks) => hooks foreach remove case RemoveHooks(hooks) => hooks foreach remove
@ -125,8 +127,8 @@ private[lobby] final class Lobby(
case msg @ HookSub(member, true) => case msg @ HookSub(member, true) =>
socket ! AllHooksFor(member, HookRepo.vector.filter { Biter.showHookTo(_, member) }) socket ! AllHooksFor(member, HookRepo.vector.filter { Biter.showHookTo(_, member) })
case lila.pool.HookThieve.GetCandidates(clock) => case lila.pool.HookThieve.GetCandidates(clock, promise) =>
sender ! lila.pool.HookThieve.PoolHooks(HookRepo poolCandidates clock) promise success lila.pool.HookThieve.PoolHooks(HookRepo poolCandidates clock)
case lila.pool.HookThieve.StolenHookIds(ids) => case lila.pool.HookThieve.StolenHookIds(ids) =>
HookRepo byIds ids.toSet foreach remove HookRepo byIds ids.toSet foreach remove
@ -168,21 +170,18 @@ private[lobby] final class Lobby(
} }
} }
private object Lobby { private object LobbyTrouper {
private case class Tick(promise: Promise[Unit]) private case class Tick(promise: Promise[Unit])
private case class WithPromise[A](value: A, promise: Promise[Unit]) private case class WithPromise[A](value: A, promise: Promise[Unit])
def start( def start(
system: ActorSystem, system: akka.actor.ActorSystem,
name: String,
broomPeriod: FiniteDuration, broomPeriod: FiniteDuration,
resyncIdsPeriod: FiniteDuration resyncIdsPeriod: FiniteDuration
)(instance: => Actor) = { )(trouper: => LobbyTrouper) = {
system.scheduler.schedule(15 seconds, resyncIdsPeriod)(trouper ! actorApi.Resync)
val ref = system.actorOf(Props(instance), name = name)
system.scheduler.schedule(15 seconds, resyncIdsPeriod, ref, actorApi.Resync)
system.scheduler.scheduleOnce(7 seconds) { system.scheduler.scheduleOnce(7 seconds) {
lila.common.ResilientScheduler( lila.common.ResilientScheduler(
every = broomPeriod, every = broomPeriod,
@ -190,11 +189,9 @@ private object Lobby {
system = system, system = system,
logger = logger logger = logger
) { ) {
val promise = Promise[Unit]() trouper.ask[Unit](Tick)
ref ! Tick(promise)
promise.future
} }
} }
ref trouper
} }
} }

View File

@ -1,6 +1,5 @@
package lila.lobby package lila.lobby
import akka.actor._
import scala.concurrent.duration._ import scala.concurrent.duration._
import actorApi._ import actorApi._
@ -13,7 +12,7 @@ import ornicar.scalalib.Zero
private[lobby] final class SocketHandler( private[lobby] final class SocketHandler(
hub: lila.hub.Env, hub: lila.hub.Env,
lobby: ActorRef, lobby: LobbyTrouper,
socket: Socket, socket: Socket,
poolApi: PoolApi, poolApi: PoolApi,
blocking: String => Fu[Set[String]] blocking: String => Fu[Set[String]]

View File

@ -5,13 +5,12 @@ import scala.concurrent.duration._
import lila.hub.FutureSequencer import lila.hub.FutureSequencer
final class Env( final class Env(
lobbyActor: akka.actor.ActorSelection,
playbanApi: lila.playban.PlaybanApi,
system: akka.actor.ActorSystem, system: akka.actor.ActorSystem,
playbanApi: lila.playban.PlaybanApi,
onStart: String => Unit onStart: String => Unit
) { ) {
private lazy val hookThieve = new HookThieve(lobbyActor) private lazy val hookThieve = new HookThieve(system.lilaBus)
lazy val api = new PoolApi( lazy val api = new PoolApi(
configs = PoolList.all, configs = PoolList.all,
@ -35,9 +34,8 @@ final class Env(
object Env { object Env {
lazy val current: Env = "pool" boot new Env( lazy val current: Env = "pool" boot new Env(
lobbyActor = lila.hub.Env.current.lobby,
playbanApi = lila.playban.Env.current.api,
system = lila.common.PlayApp.system, system = lila.common.PlayApp.system,
playbanApi = lila.playban.Env.current.api,
onStart = lila.game.Env.current.onStart onStart = lila.game.Env.current.onStart
) )
} }

View File

@ -1,36 +1,30 @@
package lila.pool package lila.pool
import akka.actor.ActorSelection import scala.concurrent.Promise
import akka.pattern.ask
private final class HookThieve(lobby: ActorSelection) { private final class HookThieve(bus: lila.common.Bus) {
import HookThieve._ import HookThieve._
def candidates(clock: chess.Clock.Config, monId: String): Fu[PoolHooks] = { def candidates(clock: chess.Clock.Config, monId: String): Fu[PoolHooks] =
import makeTimeout.short bus.ask[PoolHooks]('lobby)(GetCandidates(clock, _)) recover {
lobby ? GetCandidates(clock) mapTo manifest[PoolHooks] addEffect { res => case _ =>
lila.mon.lobby.pool.thieve.candidates(monId)(res.hooks.size) lila.mon.lobby.pool.thieve.timeout(monId)()
PoolHooks(Vector.empty)
} }
} recover {
case _ =>
lila.mon.lobby.pool.thieve.timeout(monId)()
PoolHooks(Vector.empty)
}
def stolen(poolHooks: Vector[PoolHook], monId: String) = { def stolen(poolHooks: Vector[PoolHook], monId: String) = {
lila.mon.lobby.pool.thieve.stolen(monId)(poolHooks.size) lila.mon.lobby.pool.thieve.stolen(monId)(poolHooks.size)
if (poolHooks.nonEmpty) lobby ! StolenHookIds(poolHooks.map(_.hookId)) if (poolHooks.nonEmpty) bus.publish(StolenHookIds(poolHooks.map(_.hookId)), 'lobby)
} }
} }
object HookThieve { object HookThieve {
case class GetCandidates(clock: chess.Clock.Config) case class GetCandidates(clock: chess.Clock.Config, promise: Promise[PoolHooks])
case class StolenHookIds(ids: Vector[String]) case class StolenHookIds(ids: Vector[String])
case class PoolHook(hookId: String, member: PoolMember) { case class PoolHook(hookId: String, member: PoolMember) {
def is(m: PoolMember) = member.userId == m.userId def is(m: PoolMember) = member.userId == m.userId
} }

View File

@ -8,7 +8,6 @@ import lila.user.UserContext
final class Env( final class Env(
config: AppConfig, config: AppConfig,
db: lila.db.Env, db: lila.db.Env,
hub: lila.hub.Env,
fishnetPlayer: lila.fishnet.Player, fishnetPlayer: lila.fishnet.Player,
onStart: String => Unit, onStart: String => Unit,
prefApi: lila.pref.PrefApi, prefApi: lila.pref.PrefApi,
@ -27,7 +26,7 @@ final class Env(
ctx.me.fold(AnonConfigRepo filter ctx.req)(UserConfigRepo.filter) ctx.me.fold(AnonConfigRepo filter ctx.req)(UserConfigRepo.filter)
lazy val processor = new Processor( lazy val processor = new Processor(
lobby = hub.lobby, bus = system.lilaBus,
gameCache = gameCache, gameCache = gameCache,
maxPlaying = MaxPlaying, maxPlaying = MaxPlaying,
fishnetPlayer = fishnetPlayer, fishnetPlayer = fishnetPlayer,
@ -43,7 +42,6 @@ object Env {
lazy val current = "setup" boot new Env( lazy val current = "setup" boot new Env(
config = lila.common.PlayApp loadConfig "setup", config = lila.common.PlayApp loadConfig "setup",
db = lila.db.Env.current, db = lila.db.Env.current,
hub = lila.hub.Env.current,
fishnetPlayer = lila.fishnet.Env.current.player, fishnetPlayer = lila.fishnet.Env.current.player,
onStart = lila.game.Env.current.onStart, onStart = lila.game.Env.current.onStart,
prefApi = lila.pref.Env.current.api, prefApi = lila.pref.Env.current.api,

View File

@ -1,13 +1,11 @@
package lila.setup package lila.setup
import akka.actor.ActorSelection
import lila.game.{ GameRepo, Pov, PerfPicker } import lila.game.{ GameRepo, Pov, PerfPicker }
import lila.lobby.actorApi.{ AddHook, AddSeek } import lila.lobby.actorApi.{ AddHook, AddSeek }
import lila.user.{ User, UserContext } import lila.user.{ User, UserContext }
private[setup] final class Processor( private[setup] final class Processor(
lobby: ActorSelection, bus: lila.common.Bus,
gameCache: lila.game.Cached, gameCache: lila.game.Cached,
maxPlaying: Int, maxPlaying: Int,
fishnetPlayer: lila.fishnet.Player, fishnetPlayer: lila.fishnet.Player,
@ -37,13 +35,13 @@ private[setup] final class Processor(
saveConfig(_ withHook config) >> { saveConfig(_ withHook config) >> {
config.hook(uid, ctx.me, sid, blocking) match { config.hook(uid, ctx.me, sid, blocking) match {
case Left(hook) => fuccess { case Left(hook) => fuccess {
lobby ! AddHook(hook) bus.publish(AddHook(hook), 'lobby)
Created(hook.id) Created(hook.id)
} }
case Right(Some(seek)) => ctx.userId.??(gameCache.nbPlaying) map { nbPlaying => case Right(Some(seek)) => ctx.userId.??(gameCache.nbPlaying) map { nbPlaying =>
if (nbPlaying >= maxPlaying) Refused if (nbPlaying >= maxPlaying) Refused
else { else {
lobby ! AddSeek(seek) bus.publish(AddSeek(seek), 'lobby)
Created(seek.id) Created(seek.id)
} }
} }