more lobby remote socket

lobbyRemoteSocket
Thibault Duplessis 2019-09-04 11:23:04 +02:00
parent c4887ca6d8
commit a69515f4b3
5 changed files with 42 additions and 22 deletions

View File

@ -342,6 +342,10 @@ object mon {
def path(channel: String, path: String) = inc(s"socket.remote.redis.out.path.$channel:$path")
}
}
object lobby {
def tellSri(tpe: String) = inc(s"socket.remote.lobby.tell_sri.$tpe")
val missingSri = inc("socket.remote.lobby.missing_sri")
}
}
}
object trouper {

View File

@ -4,6 +4,7 @@ import play.api.libs.json._
import actorApi._
import lila.socket.RemoteSocket.{ Protocol => P, _ }
import lila.socket.Socket.Sri
import lila.user.{ User, UserRepo }
final class LobbyRemoteSocket(
@ -17,8 +18,6 @@ final class LobbyRemoteSocket(
import LobbyRemoteSocket.Protocol._
private val send: String => Unit = remoteSocketApi.sendTo("lobby-out") _
private val handler: Handler = {
case P.In.ConnectSri(sri, userOpt) =>
userOpt map P.In.ConnectUser.apply foreach remoteSocketApi.baseHandler.lift
@ -34,9 +33,13 @@ final class LobbyRemoteSocket(
lobby ! actorApi.LeaveAllRemote
socket ! actorApi.LeaveAllRemote
case tell @ P.In.TellSri(sri, _, typ, msg) if messagesHandled(typ) =>
case tell @ P.In.TellSri(sri, user, typ, msg) if messagesHandled(typ) =>
lila.mon.socket.remote.lobby.tellSri(typ)
socket.ask[Option[LobbyRemoteSocketMember]](GetRemoteMember(sri, _)) foreach {
case None => logger.warn(s"tell/sri missing member $sri")
case None =>
logger.info(s"tell/sri missing member $sri $user $msg")
lila.mon.socket.remote.lobby.missingSri()
send(Out.disconnectSri(sri))
case Some(member) => controller(member).applyOrElse(typ -> msg, {
case _ => logger.warn(s"Can't handle $typ")
}: lila.socket.Handler.Controller)
@ -48,10 +51,13 @@ final class LobbyRemoteSocket(
remoteSocketApi.subscribe("lobby-in", P.In.baseReader)(handler orElse remoteSocketApi.baseHandler)
bus.subscribeFun('nbMembers, 'nbRounds, 'lobbySocketTellAll) {
private val send: String => Unit = remoteSocketApi.sendTo("lobby-out") _
bus.subscribeFun('nbMembers, 'nbRounds, 'lobbySocketTell) {
case lila.socket.actorApi.NbMembers(nb) => send(Out.nbMembers(nb))
case lila.hub.actorApi.round.NbRounds(nb) => send(Out.nbRounds(nb))
case LobbySocketTellAll(msg) => send(P.Out.tellAll(msg))
case LobbySocketTellAll(msg) => send(Out.tellLobby(msg))
case LobbySocketTellSris(sris, msg) => send(Out.tellSris(sris, msg))
}
}
@ -61,6 +67,10 @@ object LobbyRemoteSocket {
object Out {
def nbMembers(nb: Int) = s"member/nb $nb"
def nbRounds(nb: Int) = s"round/nb $nb"
def tellLobby(payload: JsObject) = s"tell/lobby ${Json stringify payload}"
def disconnectSri(sri: Sri) = s"disconnect/sri $sri"
def tellSris(sris: Iterable[Sri], payload: JsValue) =
s"tell/sris ${sris mkString ","} ${Json stringify payload}"
}
}
}

View File

@ -81,10 +81,7 @@ private[lobby] final class LobbySocket(
case SendHookRemovals =>
if (removedHookIds.nonEmpty) {
val msg = makeMessage("hrm", removedHookIds)
hookSubscriberSris.foreach { sri =>
withActiveMemberBySriString(sri)(_ push msg)
}
sendToActiveHookSubscribers(makeMessage("hrm", removedHookIds))
removedHookIds = ""
}
system.scheduler.scheduleOnce(1249 millis)(this ! SendHookRemovals)
@ -120,10 +117,7 @@ private[lobby] final class LobbySocket(
system.scheduler.scheduleOnce(3 second)(goPlayTheGame) // Darn it
case HookIds(ids) =>
val msg = makeMessage("hli", ids mkString "")
hookSubscriberSris.foreach { sri =>
withActiveMemberBySriString(sri)(_ push msg)
}
sendToActiveHookSubscribers(makeMessage("hli", ids mkString ""))
case lila.hub.actorApi.streamer.StreamsOnAir(html) => notifyAll(makeMessage("streams", html))
@ -143,6 +137,17 @@ private[lobby] final class LobbySocket(
}
}
private def sendToActiveHookSubscribers(msg: JsObject): Unit = {
var remoteSris = List.empty[Sri]
hookSubscriberSris diff idleSris foreach { sri =>
members get sri foreach {
case m: LobbyDirectSocketMember => m push msg
case m => remoteSris = m.sri :: remoteSris
}
}
system.lilaBus.publish(LobbySocketTellSris(remoteSris, msg), 'lobbySocketTell)
}
private def quitRemote(sri: Sri): Unit = {
members -= sri.value
afterQuit(sri)
@ -173,7 +178,7 @@ private[lobby] final class LobbySocket(
case m: LobbyDirectSocketMember => m push msg
case _ =>
}
system.lilaBus.publish(LobbySocketTellAll(msg), 'lobbySocketTellAll)
system.lilaBus.publish(LobbySocketTellAll(msg), 'lobbySocketTell)
}
// TODO let lila-websocket know about active members
@ -182,7 +187,7 @@ private[lobby] final class LobbySocket(
case (sri, m: LobbyDirectSocketMember) if !idleSris(sri) => m push msg
case _ =>
}
system.lilaBus.publish(LobbySocketTellAll(msg), 'lobbySocketTellAll)
system.lilaBus.publish(LobbySocketTellAll(msg), 'lobbySocketTell)
}
private def withActiveMemberBySriString(sri: String)(f: LobbySocketMember => Unit): Unit =

View File

@ -81,6 +81,7 @@ private[lobby] case class GetSrisP(promise: Promise[Sris])
private[lobby] case class GetRemoteMember(sri: Sri, promise: Promise[Option[LobbyRemoteSocketMember]])
private[lobby] case class LobbySocketTellAll(msg: JsObject)
private[lobby] case class LobbySocketTellSris(sris: Iterable[Sri], msg: JsObject)
case class AddHook(hook: Hook)
case class AddSeek(seek: Seek)

View File

@ -2,6 +2,7 @@ package lila.socket
import chess.Centis
import io.lettuce.core._
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
import play.api.libs.json._
import scala.concurrent.Future
@ -27,6 +28,8 @@ final class RemoteSocket(
private val connectedUserIds = collection.mutable.Set.empty[String]
private val watchedGameIds = collection.mutable.Set.empty[String]
def connectToPubSub = redisClient.connectPubSub()
val baseHandler: Handler = {
case In.ConnectUser(userId) => connectedUserIds += userId
case In.DisconnectUser(userId) => connectedUserIds -= userId
@ -78,9 +81,10 @@ final class RemoteSocket(
private val mon = lila.mon.socket.remote
def sendTo(channel: Channel)(msg: String): Unit = {
val conn = connectToPubSub
val chrono = Chronometer.start
Chronometer.syncMon(_.socket.remote.redis.publishTimeSync) {
connOut.async.publish(channel, msg).thenRun {
conn.async.publish(channel, msg).thenRun {
new Runnable { def run = chrono.mon(_.socket.remote.redis.publishTime) }
}
}
@ -90,10 +94,8 @@ final class RemoteSocket(
private val send: String => Unit = sendTo("site-out") _
private val connOut = redisClient.connectPubSub()
def subscribe(channel: Channel, reader: In.Reader)(handler: Handler): Unit = {
val conn = redisClient.connectPubSub()
val conn = connectToPubSub
conn.addListener(new pubsub.RedisPubSubAdapter[String, String] {
override def message(_channel: String, message: String): Unit = {
val raw = RawMsg(message)
@ -146,8 +148,6 @@ object RemoteSocket {
case class TellSri(sri: Sri, userId: Option[String], typ: String, msg: JsObject) extends In
val baseReader: Reader = raw => raw.path match {
case "connect" => ConnectUser(raw.args).some // deprecated
case "disconnect" => DisconnectUser(raw.args).some // deprecated
case "connect/user" => ConnectUser(raw.args).some
case "disconnect/user" => DisconnectUser(raw.args).some
case "connect/sri" => raw.args.split(' ') |> { s => ConnectSri(Sri(s(0)), s lift 1).some }