improve socket communications

pull/83/head
Thibault Duplessis 2014-02-23 12:54:10 +01:00
parent 785bca0d01
commit c303db1ac4
16 changed files with 61 additions and 75 deletions

View File

@ -16,7 +16,7 @@ object Main extends LilaController {
def websocket = Socket { implicit ctx =>
get("sri") ?? { uid =>
Env.site.socketHandler(uid, ctx.userId, get("flag"), getBool("tv"))
Env.site.socketHandler(uid, ctx.userId, get("flag"))
}
}

View File

@ -24,7 +24,7 @@ object Round extends LilaController with TheftPrevention {
def websocketWatcher(gameId: String, color: String) = Socket[JsValue] { implicit ctx =>
(get("sri") |@| getInt("version")).tupled ?? {
case (uid, version) => env.socketHandler.watcher(gameId, color, version, uid, ctx.me, ctx.ip, getBool("tv"))
case (uid, version) => env.socketHandler.watcher(gameId, color, version, uid, ctx.me, ctx.ip)
}
}

View File

@ -18,7 +18,7 @@ final class Bus(system: ActorSystem) extends Extension with EventBus {
* @return true if successful and false if not (because it was already subscribed to that Classifier, or otherwise)
*/
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = {
log(s"subscribe $to $subscriber")
// log(s"subscribe $to $subscriber")
bus.subscribe(subscriber, to)
}
@ -32,7 +32,7 @@ final class Bus(system: ActorSystem) extends Extension with EventBus {
* @return true if successful and false if not (because it wasn't subscribed to that Classifier, or otherwise)
*/
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = {
log(s"[UN]subscribe $from $subscriber")
// log(s"[UN]subscribe $from $subscriber")
bus.unsubscribe(subscriber, from)
}
@ -40,7 +40,7 @@ final class Bus(system: ActorSystem) extends Extension with EventBus {
* Attempts to deregister the subscriber from all Classifiers it may be subscribed to
*/
def unsubscribe(subscriber: Subscriber) {
log(s"[UN]subscribe ALL $subscriber")
// log(s"[UN]subscribe ALL $subscriber")
bus unsubscribe subscriber
}
@ -48,11 +48,11 @@ final class Bus(system: ActorSystem) extends Extension with EventBus {
* Publishes the specified Event to this bus
*/
def publish(event: Event) {
log(event.toString)
// log(event.toString)
bus publish event
}
private def log(msg: String) {
private def log(msg: => String) {
// loginfo(msg)
}

View File

@ -37,7 +37,7 @@ final class Featured(
oneId = game.id.some
rendererActor ? actorApi.RenderFeaturedJs(game) onSuccess {
case html: Html =>
bus.publish(lila.hub.actorApi.game.ChangeFeatured(game.id, html), 'changeFeaturedGame)
bus.publish(lila.hub.actorApi.game.ChangeFeatured(html), 'changeFeaturedGame)
}
GameRepo setTv game.id
}

View File

@ -95,7 +95,7 @@ case class Propagate(data: Atom, propagations: List[Propagation] = Nil) {
}
package game {
case class ChangeFeatured(id: String, html: Html)
case class ChangeFeatured(html: Html)
case object Count
}

View File

@ -24,6 +24,8 @@ private[lobby] final class Socket(
router: akka.actor.ActorSelection,
uidTtl: Duration) extends SocketActor[Member](uidTtl) with Historical[Member] {
context.system.lilaBus.subscribe(self, 'changeFeaturedGame)
def receiveSpecific = {
case PingVersion(uid, v) => {

View File

@ -36,7 +36,6 @@ private[lobby] final class SocketHandler(
val join = Join(uid = uid, user = user)
Handler(hub, socket, uid, join, user map (_.id)) {
case Connected(enum, member) =>
member.setTv
controller(socket, uid, member) -> enum
}
}

View File

@ -57,22 +57,27 @@ final class Env(
def receive = actorMapReceive
}), name = ActorMapName)
private val socketHub = system.actorOf(
Props(new lila.socket.SocketHubActor[Socket] {
def mkActor(id: String) = new Socket(
gameId = id,
history = history(),
getUsername = getUsername,
uidTimeout = UidTimeout,
socketTimeout = SocketTimeout,
disconnectTimeout = PlayerDisconnectTimeout,
ragequitTimeout = PlayerRagequitTimeout)
def receive: Receive = ({
case msg@lila.chat.actorApi.ChatLine(id, line) =>
self ! lila.hub.actorApi.map.Tell(id take 8, msg)
}: Receive) orElse socketHubReceive
}),
name = SocketName)
private val socketHub = {
val actor = system.actorOf(
Props(new lila.socket.SocketHubActor[Socket] {
def mkActor(id: String) = new Socket(
gameId = id,
history = history(),
getUsername = getUsername,
uidTimeout = UidTimeout,
socketTimeout = SocketTimeout,
disconnectTimeout = PlayerDisconnectTimeout,
ragequitTimeout = PlayerRagequitTimeout)
def receive: Receive = ({
case msg@lila.chat.actorApi.ChatLine(id, line) =>
self ! lila.hub.actorApi.map.Tell(id take 8, msg)
case m: lila.hub.actorApi.game.ChangeFeatured => tellAll(m)
}: Receive) orElse socketHubReceive
}),
name = SocketName)
system.lilaBus.subscribe(actor, 'changeFeaturedGame)
actor
}
lazy val socketHandler = new SocketHandler(
hub = hub,

View File

@ -85,10 +85,9 @@ private[round] final class SocketHandler(
version: Int,
uid: String,
user: Option[User],
ip: String,
tv: Boolean): Fu[JsSocketHandler] =
ip: String): Fu[JsSocketHandler] =
GameRepo.pov(gameId, colorName) flatMap {
_ ?? { join(_, none, version, uid, "", user, ip, tv) }
_ ?? { join(_, none, version, uid, "", user, ip) }
}
def player(
@ -99,7 +98,7 @@ private[round] final class SocketHandler(
user: Option[User],
ip: String): Fu[JsSocketHandler] =
GameRepo.pov(fullId) flatMap {
_ ?? { join(_, Some(Game takePlayerId fullId), version, uid, token, user, ip, false) }
_ ?? { join(_, Some(Game takePlayerId fullId), version, uid, token, user, ip) }
}
private def join(
@ -109,8 +108,7 @@ private[round] final class SocketHandler(
uid: String,
token: String,
user: Option[User],
ip: String,
tv: Boolean): Fu[JsSocketHandler] = {
ip: String): Fu[JsSocketHandler] = {
val join = Join(
uid = uid,
user = user,
@ -121,7 +119,6 @@ private[round] final class SocketHandler(
socketHub ? Get(pov.gameId) mapTo manifest[ActorRef] flatMap { socket =>
Handler(hub, socket, uid, join, user map (_.id)) {
case Connected(enum, member) =>
if (tv) member.setTv
controller(pov.gameId, socket, uid, pov.ref, member) -> enum
}
}

View File

@ -21,7 +21,7 @@ private final class TvBroadcast extends Actor {
case TvBroadcast.GetEnumerator => sender ! enumerator
case ChangeFeatured(_, html) =>
case ChangeFeatured(html) =>
channel push makeMessage("featured", Json.obj("html" -> html.toString))
case move: MoveEvent => channel push makeMessage("fen", Json.obj(

View File

@ -30,7 +30,7 @@ private[simulation] final class WatcherBot(
}
case Event(Some(game: lila.game.Game), _) => {
roundEnv.socketHandler.watcher(game.id, "white", 0, uid, user, ip, true) pipeTo self
roundEnv.socketHandler.watcher(game.id, "white", 0, uid, user, ip) pipeTo self
goto(TvConnect) using Id(game.id)
}
@ -61,7 +61,7 @@ private[simulation] final class WatcherBot(
case Event(Message("featured_id", obj), watcher: Watcher) => obj str "d" map { id =>
watcher.channel.eofAndEnd()
roundEnv.socketHandler.watcher(id, "white", 0, uid, user, ip, true) pipeTo self
roundEnv.socketHandler.watcher(id, "white", 0, uid, user, ip) pipeTo self
goto(TvConnect) using Id(id)
} getOrElse stay

View File

@ -17,8 +17,7 @@ private[site] final class SocketHandler(
def apply(
uid: String,
userId: Option[String],
flag: Option[String],
tv: Boolean): Fu[JsSocketHandler] = {
flag: Option[String]): Fu[JsSocketHandler] = {
def controller: Handler.Controller = {
case ("liveGames", o) => o str "d" foreach { ids =>
@ -28,7 +27,6 @@ private[site] final class SocketHandler(
Handler(hub, socket, uid, Join(uid, userId, flag), userId) {
case Connected(enum, member) =>
if (tv) member.setTv
controller -> enum
}
}

View File

@ -34,32 +34,34 @@ abstract class SocketActor[M <: SocketMember](uidTtl: Duration) extends Socket w
// generic message handler
def receiveGeneric: Receive = {
case Ping(uid) => ping(uid)
case Ping(uid) => ping(uid)
case Broom => broom
case Broom => broom
// when a member quits
case Quit(uid) => quit(uid)
case Quit(uid) => quit(uid)
case NbMembers(nb) => pong = makePong(nb)
case NbMembers(nb) => pong = makePong(nb)
case WithUserIds(f) => f(userIds)
case WithUserIds(f) => f(userIds)
case GetUids => sender ! uids
case GetUids => sender ! uids
case LiveGames(uid, gameIds) => registerLiveGames(uid, gameIds)
case LiveGames(uid, gameIds) => registerLiveGames(uid, gameIds)
case move: MoveEvent => notifyMove(move)
case move: MoveEvent => notifyMove(move)
case SendTo(userId, msg) => sendTo(userId, msg)
case SendTo(userId, msg) => sendTo(userId, msg)
case SendTos(userIds, msg) => sendTos(userIds, msg)
case SendTos(userIds, msg) => sendTos(userIds, msg)
case Resync(uid) => resync(uid)
case Resync(uid) => resync(uid)
case Deploy(event, html) => notifyAll(makeMessage(event.key, html))
case Deploy(event, html) => notifyAll(makeMessage(event.key, html))
case ChangeFeatured(id, html) => notifyFeatured(id, html)
// the actor instance must subscribe to 'changeFeaturedGame to receive this message
// context.system.lilaBus.subscribe(self, 'changeFeaturedGame)
case ChangeFeatured(html) => notifyFeatured(html)
}
def receive = receiveSpecific orElse receiveGeneric
@ -173,10 +175,8 @@ abstract class SocketActor[M <: SocketMember](uidTtl: Duration) extends Socket w
members get uid foreach f
}
private def notifyFeatured(id: String, html: Html) {
val msg = makeMessage("featured", Json.obj("id" -> id, "html" -> html.toString))
members.values foreach { m =>
if (m.hasTv) m.channel push msg
}
private def notifyFeatured(html: Html) {
val msg = makeMessage("featured", Json.obj("html" -> html.toString))
members.values foreach { _.channel push msg }
}
}

View File

@ -13,9 +13,7 @@ final class SocketHub extends Actor {
private val sockets = collection.mutable.Set[ActorRef]()
context.system.lilaBus.subscribe(self,
'moveEvent, 'users, 'deploy, 'nbMembers, 'socket,
'changeFeaturedGame)
context.system.lilaBus.subscribe(self, 'moveEvent, 'users, 'deploy, 'nbMembers, 'socket)
override def postStop() {
context.system.lilaBus.unsubscribe(self)

View File

@ -8,15 +8,11 @@ trait SocketMember extends Ordered[SocketMember] {
// FIXME
private val privateLiveGames = collection.mutable.Set[String]()
private var privateHasTv: Boolean = false
def liveGames: Set[String] = privateLiveGames.toSet
def addLiveGames(ids: List[String]) { ids foreach privateLiveGames.+= }
def hasTv = privateHasTv
def setTv { privateHasTv = true }
def isAuth = userId.isDefined
def compare(other: SocketMember) = ~userId compare ~other.userId

View File

@ -579,15 +579,7 @@ var storage = {
setTimeout(function() {
if (lichess.socket === null) {
var socketSettings = lichess.socketDefaults;
if ($('body').hasClass('embed-tv')) {
socketSettings.params.tv = 1;
socketSettings.events.featured = function changeFeatured(o) {
$('#featured_game').html(o.html);
$('body').trigger('lichess.content_loaded');
};
}
lichess.socket = new strongSocket("/socket", 0, socketSettings);
lichess.socket = new strongSocket("/socket", 0, lichess.socketDefaults);
}
$(document).idleTimer(lichess.idleTime)
.on('idle.idleTimer', function() {
@ -928,8 +920,7 @@ var storage = {
name: "game"
},
params: {
ran: "--ranph--",
tv: self.options.tv ? 1 : null
ran: "--ranph--"
},
events: {
possible_moves: function(event) {