rethink broadcasting of realtime moves for greater perf

This commit is contained in:
Thibault Duplessis 2014-11-15 23:34:26 +01:00
parent 705d82de64
commit b9948e1150
15 changed files with 81 additions and 54 deletions

@ -1 +1 @@
Subproject commit 01fc724d9e72e764e1c678efe835ce904bf92252
Subproject commit dc1537482dbe77d58b16899f7edbe6ffa55e21fa

View file

@ -30,6 +30,7 @@ final class Env(config: Config, system: ActorSystem) {
val evaluator = select("actor.evaluator")
val chat = select("actor.chat")
val analyser = select("actor.analyser")
val moveBroadcast = select("actor.move_broadcast")
}
object socket {

View file

@ -28,8 +28,8 @@ private[lobby] final class SocketHandler(
lobby ! BiteHook(id, uid, member.user)
}
case ("cancel", o) => lobby ! CancelHook(uid)
case ("liveGames", o) => o str "d" foreach { ids =>
socket ! LiveGames(uid, ids.split(' ').toList)
case ("startWatching", o) => o str "d" foreach { ids =>
hub.actor.moveBroadcast ! StartWatching(uid, member, ids.split(' ').toSet)
}
}

View file

@ -34,8 +34,8 @@ private[round] final class SocketHandler(
member.playerIdOption.fold[Handler.Controller]({
case ("p", o) => o int "v" foreach { v => socket ! PingVersion(uid, v) }
case ("liveGames", o) => o str "d" foreach { ids =>
socket ! LiveGames(uid, ids.split(' ').toList)
case ("startWatching", o) => o str "d" foreach { ids =>
hub.actor.moveBroadcast ! StartWatching(uid, member, ids.split(' ').toSet)
}
case ("talk", o) => o str "d" foreach { text =>
messenger.watcher(gameId, member, text, socket)
@ -68,8 +68,8 @@ private[round] final class SocketHandler(
case ("challenge", o) => ((o str "d") |@| member.userId) apply {
case (to, from) => hub.actor.challenger ! lila.hub.actorApi.setup.RemindChallenge(gameId, from, to)
}
case ("liveGames", o) => o str "d" foreach { ids =>
socket ! LiveGames(uid, ids.split(' ').toList)
case ("startWatching", o) => o str "d" foreach { ids =>
hub.actor.moveBroadcast ! StartWatching(uid, member, ids.split(' ').toSet)
}
case ("talk", o) => o str "d" foreach { text =>
messenger.owner(gameId, member, text, socket)

View file

@ -8,7 +8,7 @@ import play.api.libs.json._
import actorApi._
import lila.socket._
import lila.socket.actorApi.{ Connected, SendToFlag }
import lila.socket.actorApi.SendToFlag
private[site] final class Socket(timeout: Duration) extends SocketActor[Member](timeout) {

View file

@ -8,7 +8,7 @@ import play.api.libs.json._
import actorApi._
import lila.common.PimpedJson._
import lila.socket._
import lila.socket.actorApi.{ Connected, LiveGames }
import lila.socket.actorApi.StartWatching
private[site] final class SocketHandler(
socket: ActorRef,
@ -19,15 +19,14 @@ private[site] final class SocketHandler(
userId: Option[String],
flag: Option[String]): Fu[JsSocketHandler] = {
def controller: Handler.Controller = {
case ("liveGames", o) => o str "d" foreach { ids =>
socket ! LiveGames(uid, ids.split(' ').toList)
def controller(member: Member): Handler.Controller = {
case ("startWatching", o) => o str "d" foreach { ids =>
hub.actor.moveBroadcast ! StartWatching(uid, member, ids.split(' ').toSet)
}
}
Handler(hub, socket, uid, Join(uid, userId, flag), userId) {
case Connected(enum, member) =>
controller -> enum
case Connected(enum, member) => controller(member) -> enum
}
}
}

View file

@ -16,3 +16,4 @@ case class Member(
}
case class Join(uid: String, userId: Option[String], flag: Option[String])
private[site] case class Connected(enumerator: JsEnumerator, member: Member)

View file

@ -14,14 +14,14 @@ final class Env(
import scala.concurrent.duration._
private val PopulationName = config getString "population.name"
private val HubName = config getString "hub.name"
private val MoveBroadcastName = config getString "move_broadcast.name"
private val socketHub =
system.actorOf(Props[SocketHub], name = HubName)
private val socketHub = system.actorOf(Props[SocketHub], name = HubName)
private val population =
system.actorOf(Props[Population], name = PopulationName)
private val population = system.actorOf(Props[Population])
system.actorOf(Props[MoveBroadcast], name = MoveBroadcastName)
scheduler.once(5 seconds) {
scheduler.message(4 seconds) { socketHub -> actorApi.Broom }

View file

@ -0,0 +1,52 @@
package lila.socket
import akka.actor._
import scala.concurrent.duration._
import actorApi.{ SocketLeave, StartWatching }
import lila.hub.actorApi.round.MoveEvent
private final class MoveBroadcast extends Actor {
context.system.lilaBus.subscribe(self, 'moveEvent, 'socketDoor)
type UID = String
type GameId = String
case class WatchingMember(member: SocketMember, gameIds: Set[GameId])
var members = Map.empty[UID, WatchingMember]
var games = Map.empty[GameId, Set[UID]]
def status = s"members: $members\ngames: $games"
def receive = {
case move: MoveEvent =>
games get move.gameId foreach { mIds =>
val msg = Socket.makeMessage("fen", play.api.libs.json.Json.obj(
"id" -> move.gameId,
"fen" -> move.fen,
"lm" -> move.move
))
mIds flatMap members.get foreach (_.member push msg)
}
case StartWatching(uid, member, gameIds) =>
members = members + (uid -> WatchingMember(member, gameIds ++ members.get(uid).??(_.gameIds)))
gameIds foreach { id =>
games = games + (id -> (~games.get(id) + uid))
}
case SocketLeave(uid) => members get uid foreach { m =>
members = members - uid
m.gameIds foreach { id =>
games get id foreach { uids =>
val newUids = uids - uid
if (newUids.isEmpty) games = games - id
else games = games + (id -> newUids)
}
}
}
}
}

View file

@ -9,7 +9,6 @@ import play.twirl.api.Html
import actorApi._
import lila.hub.actorApi.game.ChangeFeatured
import lila.hub.actorApi.round.MoveEvent
import lila.hub.actorApi.{ Deploy, GetUids, WithUserIds, SendTo, SendTos }
import lila.memo.ExpireSetMemo
@ -47,10 +46,6 @@ abstract class SocketActor[M <: SocketMember](uidTtl: Duration) extends Socket w
case GetUids => sender ! uids
case LiveGames(uid, gameIds) => registerLiveGames(uid, gameIds)
case move: MoveEvent => notifyMove(move)
case SendTo(userId, msg) => sendTo(userId, msg)
case SendTos(userIds, msg) => sendTos(userIds, msg)
@ -151,26 +146,11 @@ abstract class SocketActor[M <: SocketMember](uidTtl: Duration) extends Socket w
def userIds: Iterable[String] = members.values.flatMap(_.userId)
def notifyMove(move: MoveEvent) {
lazy val msg = makeMessage("fen", Json.obj(
"id" -> move.gameId,
"fen" -> move.fen,
"lm" -> move.move
))
members.values foreach { m =>
if (m hasLiveGame move.gameId) m push msg
}
}
def showSpectators(users: List[lila.common.LightUser], nbAnons: Int) = nbAnons match {
case 0 => users.distinct.map(_.titleName)
case x => users.distinct.map(_.titleName) :+ ("Anonymous (%d)" format x)
}
def registerLiveGames(uid: String, ids: List[String]) {
withMember(uid)(_ addLiveGames ids)
}
def withMember(uid: String)(f: M => Unit) {
members get uid foreach f
}

View file

@ -13,7 +13,7 @@ final class SocketHub extends Actor {
private val sockets = collection.mutable.Set[ActorRef]()
context.system.lilaBus.subscribe(self, 'moveEvent, 'users, 'deploy, 'nbMembers, 'socket)
context.system.lilaBus.subscribe(self, 'users, 'deploy, 'nbMembers, 'socket)
override def postStop() {
context.system.lilaBus.unsubscribe(self)

View file

@ -8,13 +8,6 @@ trait SocketMember extends Ordered[SocketMember] {
val userId: Option[String]
val troll: Boolean
// FIXME
private val privateLiveGames = new java.util.ArrayList[String]()
def hasLiveGame(id: String) = privateLiveGames contains id
def addLiveGames(ids: List[String]) { ids foreach privateLiveGames.add }
def isAuth = userId.isDefined
def compare(other: SocketMember) = ~userId compare ~other.userId

View file

@ -16,7 +16,6 @@ case class Quit(uid: String)
case class SocketEnter[M <: SocketMember](uid: String, member: M)
case class SocketLeave(uid: String)
case class LiveGames(uid: String, gameIds: List[String])
case class Resync(uid: String)
case object GetVersion
@ -26,3 +25,5 @@ case class SendToFlag(flag: String, message: JsObject)
case object PopulationGet
case object PopulationTell
case class NbMembers(nb: Int)
case class StartWatching(uid: String, member: SocketMember, gameIds: Set[String])

View file

@ -46,8 +46,8 @@ private[tournament] final class SocketHandler(
uid: String,
member: Member): Handler.Controller = {
case ("p", o) => o int "v" foreach { v => socket ! PingVersion(uid, v) }
case ("liveGames", o) => o str "d" foreach { ids =>
socket ! LiveGames(uid, ids.split(' ').toList)
case ("startWatching", o) => o str "d" foreach { ids =>
hub.actor.moveBroadcast ! StartWatching(uid, member, ids.split(' ').toSet)
}
case ("talk", o) => o str "d" foreach { text =>
member.userId foreach { userId =>

View file

@ -1025,20 +1025,20 @@ var storage = {
var socketOpened = false;
function registerLiveGames() {
function startWatching() {
if (!socketOpened) return;
var ids = [];
$('a.mini_board.live').removeClass("live").each(function() {
ids.push($(this).data("live"));
});
if (ids.length > 0) {
lichess.socket.send("liveGames", ids.join(" "));
lichess.socket.send("startWatching", ids.join(" "));
}
}
$('body').on('lichess.content_loaded', registerLiveGames);
$('body').on('lichess.content_loaded', startWatching);
$('body').on('socket.open', function() {
socketOpened = true;
registerLiveGames();
startWatching();
});
setTimeout(function() {