minimize event bus subscriptions to improve broadcasting efficiency

This commit is contained in:
Thibault Duplessis 2013-10-29 02:16:28 +01:00
parent 6df6e80d18
commit 0a59da4e21
6 changed files with 59 additions and 28 deletions

View file

@ -51,7 +51,8 @@ final class Env(
if (!Env.ai.isServer) {
loginfo("[boot] Preloading modules")
(Env.site,
(Env.socket,
Env.site,
Env.tournament,
Env.lobby,
Env.game,
@ -60,7 +61,6 @@ final class Env(
Env.round,
Env.team,
Env.message,
Env.socket,
Env.timeline,
Env.gameSearch,
Env.teamSearch,

View file

@ -7,9 +7,6 @@ import lila.common.PimpedConfig._
final class Env(config: Config, system: ActorSystem) {
private val SocketHubName = config getString "socket.hub.name"
private val SocketHubTimeout = config duration "socket.hub.timeout"
object actor {
val game = select("actor.game.actor")
val gameIndexer = select("actor.game.indexer")
@ -39,6 +36,7 @@ final class Env(config: Config, system: ActorSystem) {
val tournament = select("socket.tournament")
val site = select("socket.site")
val monitor = select("socket.monitor")
val hub = select("socket.hub")
}
private def select(name: String) =

View file

@ -26,12 +26,6 @@ private[round] final class Socket(
disconnectTimeout: Duration,
ragequitTimeout: Duration) extends SocketActor[Member](uidTimeout) {
context.system.lilaBus.subscribe(self, 'changeFeaturedGame)
override def postStop() {
context.system.lilaBus.unsubscribe(self)
}
private val timeBomb = new TimeBomb(socketTimeout)
private final class Player(color: Color) {

View file

@ -5,31 +5,29 @@ import akka.pattern.{ ask, pipe }
import com.typesafe.config.Config
import actorApi._
import lila.common.PimpedConfig._
import makeTimeout.short
final class Env(
config: Config,
system: ActorSystem,
scheduler: lila.common.Scheduler,
hub: lila.hub.Env) {
scheduler: lila.common.Scheduler) {
import scala.concurrent.duration._
private val population = system.actorOf(
Props(new Population),
name = "population")
private val PopulationName = config getString "population.name"
private val HubName = config getString "hub.name"
private val sockets = List(
hub.socket.lobby,
hub.socket.site,
hub.socket.round,
hub.socket.tournament)
private val socketHub =
system.actorOf(Props[SocketHub], name = HubName)
private val population =
system.actorOf(Props[Population], name = PopulationName)
private val bus = system.lilaBus
scheduler.once(5 seconds) {
scheduler.effect(4 seconds, "publish broom to event bus") {
bus.publish(actorApi.Broom, 'broom)
scheduler.message(4 seconds) {
socketHub -> actorApi.Broom
}
scheduler.effect(1 seconds, "calculate nb members") {
population ? PopulationGet foreach {
@ -42,7 +40,7 @@ final class Env(
object Env {
lazy val current = "[boot] socket" describes new Env(
config = lila.common.PlayApp loadConfig "socket",
system = lila.common.PlayApp.system,
scheduler = lila.common.PlayApp.scheduler,
hub = lila.hub.Env.current)
scheduler = lila.common.PlayApp.scheduler)
}

View file

@ -20,11 +20,11 @@ abstract class SocketActor[M <: SocketMember](uidTtl: Duration) extends Socket w
val lilaBus = context.system.lilaBus
lilaBus.subscribe(self, 'moveEvent, 'users, 'deploy, 'nbMembers, 'broom)
lilaBus.publish(lila.socket.SocketHub.Subscribe(self), 'socket)
override def postStop() {
lilaBus.publish(lila.socket.SocketHub.Unsubscribe(self), 'socket)
members.keys foreach eject
lilaBus.unsubscribe(self)
}
// to be defined in subclassing actor

View file

@ -0,0 +1,41 @@
package lila.socket
import scala.concurrent.duration._
import akka.actor._
import akka.dispatch.Dispatchers
import akka.pattern.{ ask, pipe }
import akka.routing._
import actorApi._
final class SocketHub extends Actor {
private val sockets = collection.mutable.Set[ActorRef]()
context.system.lilaBus.subscribe(self,
'moveEvent, 'users, 'deploy, 'nbMembers, 'socket,
// FIXME this event only concern the current TV room
'changeFeaturedGame)
override def postStop() {
context.system.lilaBus.unsubscribe(self)
}
import SocketHub._
def receive = {
case Subscribe(socket) sockets += socket
case Unsubscribe(socket) sockets -= socket
case msg sockets foreach (_ ! msg)
}
}
case object SocketHub {
case class Subscribe(actor: ActorRef)
case class Unsubscribe(actor: ActorRef)
}