complete socket trouper rewrite and delete socket actor remains

roundTrouper
Thibault Duplessis 2018-12-08 16:20:21 +07:00
parent 893fdc3e03
commit 2c52a32916
50 changed files with 372 additions and 538 deletions

View File

@ -401,7 +401,6 @@ round {
ragequit.timeout = 10 seconds
}
socket {
name = round-socket
timeout = 30 seconds
}
collection {
@ -658,9 +657,6 @@ hub {
notify = ${notify.actor.name}
study = ${study.actor.name}
}
socket {
round = ${round.socket.name}
}
}
dbplugin = disabled

View File

@ -1,5 +1,7 @@
package lila.analyse
import play.api.libs.iteratee._
import play.api.libs.json.JsValue
import scala.concurrent.duration._
import scala.concurrent.Promise
@ -7,14 +9,23 @@ import lila.hub.Trouper
import lila.socket._
private final class AnalyseSocket(
val system: akka.actor.ActorSystem,
system: akka.actor.ActorSystem,
uidTtl: FiniteDuration
) extends SocketTrouper[AnalyseSocket.Member](uidTtl) with LoneSocket {
) extends SocketTrouper[AnalyseSocket.Member](system, uidTtl) with LoneSocket {
def monitoringName = "analyse"
def broomFrequency = 4027 millis
def receiveSpecific = PartialFunction.empty
import AnalyseSocket._
def receiveSpecific = {
case Join(uid, userId, promise) =>
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = Member(channel, userId)
addMember(uid, member)
promise success Connected(enumerator, member)
}
}
private object AnalyseSocket {
@ -25,4 +36,7 @@ private object AnalyseSocket {
) extends SocketMember {
val troll = false
}
private[analyse] case class Join(uid: Socket.Uid, userId: Option[String], promise: Promise[Connected])
private[analyse] case class Connected(enumerator: JsEnumerator, member: Member)
}

View File

@ -12,8 +12,8 @@ private[analyse] final class AnalyseSocketHandler(
import AnalyseSocket._
def join(uid: Socket.Uid, user: Option[User]): Fu[JsSocketHandler] =
socket.addMember(uid)(Member(_, user.map(_.id))) map {
case (member, enum) => Handler.iteratee(
socket.ask[Connected](Join(uid, user.map(_.id), _)) map {
case Connected(enum, member) => Handler.iteratee(
hub,
evalCacheHandler(uid, member, user),
member,

View File

@ -10,7 +10,6 @@ import lila.hub.actorApi.map.Tell
final class Analyser(
indexer: ActorSelection,
requesterApi: RequesterApi,
roundSocket: ActorSelection,
studyActor: ActorSelection,
bus: lila.common.Bus
) {
@ -44,13 +43,15 @@ final class Analyser(
private def sendAnalysisProgress(analysis: Analysis, complete: Boolean): Funit = analysis.studyId match {
case None => GameRepo gameWithInitialFen analysis.id map {
_ ?? {
case (game, initialFen) =>
roundSocket ! Tell(analysis.id, actorApi.AnalysisProgress(
case (game, initialFen) => bus.publish(
Tell(analysis.id, actorApi.AnalysisProgress(
analysis = analysis,
game = game,
variant = game.variant,
initialFen = initialFen | FEN(game.variant.initialFen)
))
)),
'roundSocket
)
}
}
case Some(studyId) => fuccess {

View File

@ -24,7 +24,6 @@ final class Env(
lazy val analyser = new Analyser(
indexer = indexer,
requesterApi = requesterApi,
roundSocket = hub.socket.round,
studyActor = hub.actor.study,
bus = system.lilaBus
)

View File

@ -11,11 +11,7 @@ final class Env(
lazy val jsonView = new BotJsonView(lightUserApi)
lazy val gameStateStream = new GameStateStream(
system,
jsonView,
hub.socket.round
)
lazy val gameStateStream = new GameStateStream(system, jsonView)
lazy val player = new BotPlayer(hub.actor.chat)(system)

View File

@ -18,8 +18,7 @@ import lila.user.User
final class GameStateStream(
system: ActorSystem,
jsonView: BotJsonView,
roundSocketHub: ActorSelection
jsonView: BotJsonView
) {
private case object SetOnline
@ -87,8 +86,10 @@ final class GameStateStream(
gameOver = true
channel.eofAndEnd()
}
def setConnected(v: Boolean) =
roundSocketHub ! Tell(init.game.id, BotConnected(as, v))
def setConnected(v: Boolean) = system.lilaBus.publish(
Tell(init.game.id, BotConnected(as, v)),
'roundSocket
)
}))
stream = actor.some
},

View File

@ -12,13 +12,13 @@ import lila.socket.Socket.{ Uid, GetVersionP, SocketVersion }
import lila.socket.{ History, Historical }
private final class ChallengeSocket(
val system: ActorSystem,
system: ActorSystem,
challengeId: String,
val history: History[Unit],
protected val history: History[Unit],
getChallenge: Challenge.ID => Fu[Option[Challenge]],
uidTtl: Duration,
keepMeAlive: () => Unit
) extends SocketTrouper[ChallengeSocket.Member](uidTtl) with Historical[ChallengeSocket.Member, Unit] {
) extends SocketTrouper[ChallengeSocket.Member](system, uidTtl) with Historical[ChallengeSocket.Member, Unit] {
def receiveSpecific = {
@ -35,7 +35,7 @@ private final class ChallengeSocket(
case GetVersionP(promise) => promise success history.version
case ChallengeSocket.JoinP(uid, userId, owner, version, promise) =>
case ChallengeSocket.Join(uid, userId, owner, version, promise) =>
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = ChallengeSocket.Member(channel, userId, owner)
addMember(uid, member)
@ -65,7 +65,7 @@ private object ChallengeSocket {
val troll = false
}
case class JoinP(uid: Uid, userId: Option[String], owner: Boolean, version: Option[SocketVersion], promise: Promise[Connected])
case class Join(uid: Uid, userId: Option[String], owner: Boolean, version: Option[SocketVersion], promise: Promise[Connected])
case class Connected(enumerator: JsEnumerator, member: Member)
case object Reload

View File

@ -26,7 +26,7 @@ private[challenge] final class SocketHandler(
version: Option[SocketVersion]
): Fu[JsSocketHandler] = {
val socket = socketMap getOrMake challengeId
socket.ask[Connected](JoinP(uid, userId, owner, version, _)) map {
socket.ask[Connected](Join(uid, userId, owner, version, _)) map {
case Connected(enum, member) => Handler.iteratee(
hub,
controller(socket, challengeId, uid, member),

View File

@ -3,6 +3,7 @@ package lila.fishnet
import akka.actor._
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.concurrent.Promise
final class Env(
config: Config,
@ -59,9 +60,9 @@ final class Env(
sink = sink,
socketExists = id => {
import lila.hub.actorApi.map.Exists
import akka.pattern.ask
import makeTimeout.short
hub.socket.round ? Exists(id) mapTo manifest[Boolean]
val promise = Promise[Boolean]
bus.publish(Exists(id, promise), 'roundSocket)
promise.future
},
clientVersion = clientVersion,
offlineMode = OfflineMode,

View File

@ -1,51 +0,0 @@
package lila.hub
import actorApi.map._
import akka.actor._
trait ActorMap extends Actor {
private val actors = scala.collection.mutable.AnyRefMap.empty[String, ActorRef]
def mkActor(id: String): Actor
def actorMapReceive: Receive = {
case Get(id) => sender ! getOrMake(id)
case Tell(id, msg) => getOrMake(id) forward msg
case TellAll(msg) => actors.foreachValue(_ forward msg)
case TellIds(ids, msg) => ids foreach { id =>
actors get id foreach (_ forward msg)
}
case Ask(id, msg) => getOrMake(id) forward msg
case Terminated(actor) =>
context unwatch actor
actors foreach {
case (id, a) => if (a == actor) actors -= id
}
case Exists(id) => sender ! actors.contains(id)
}
protected def size = actors.size
private def getOrMake(id: String) = actors get id getOrElse {
val actor = context.actorOf(Props(mkActor(id)), name = id)
actors += (id -> actor)
context watch actor
actor
}
}
object ActorMap {
def apply(make: String => Actor) = new ActorMap {
def mkActor(id: String) = make(id)
def receive = actorMapReceive
}
}

View File

@ -26,10 +26,6 @@ final class Env(config: Config, system: ActorSystem) {
val study = select("actor.study")
}
object socket {
val round = select("socket.round")
}
val bus = system.lilaBus
private def select(name: String) =

View File

@ -19,6 +19,8 @@ final class TrouperMap[T <: Trouper](
def tell(id: String, msg: Any): Unit = getOrMake(id) ! msg
def tellIfPresent(id: String, msg: Any): Unit = getIfPresent(id) foreach (_ ! msg)
def tellAll(msg: Any) = troupers.asMap().asScala.foreach(_._2 ! msg)
def tellIds(ids: Seq[String], msg: Any): Unit = ids foreach { tell(_, msg) }
@ -46,14 +48,14 @@ final class TrouperMap[T <: Trouper](
.expireAfterAccess(accessTimeout.toMillis, TimeUnit.MILLISECONDS)
.removalListener(new RemovalListener[String, T] {
def onRemoval(id: String, trouper: T, cause: RemovalCause): Unit = {
// println(id, "remove trouper")
println(id, "remove trouper")
trouper.stop()
}
})
.build[String, T](new CacheLoader[String, T] {
def load(id: String): T = {
val t = mkTrouper(id)
// println(id, "start trouper")
println(id, "start trouper")
t
}
})

View File

@ -35,13 +35,12 @@ package map {
case class TellIds(ids: Seq[String], msg: Any)
case class TellAll(msg: Any)
case class Ask(id: String, msg: Any)
case class Exists(id: String)
case class Exists(id: String, promise: Promise[Boolean])
}
case class WithUserIds(f: Iterable[String] => Unit)
case class HasUserId(userId: String)
case class HasUserIdP(userId: String, promise: Promise[Boolean])
case class HasUserId(userId: String, promise: Promise[Boolean])
package report {
case class Cheater(userId: String, text: String)
@ -245,7 +244,7 @@ package round {
)
case class NbRounds(nb: Int)
case class Berserk(gameId: String, userId: String)
case class IsOnGame(color: chess.Color)
case class IsOnGame(color: chess.Color, promise: Promise[Boolean])
sealed trait SocketEvent
case class TourStanding(json: JsArray)
case class FishnetPlay(uci: Uci, currentFen: chess.format.FEN)

View File

@ -16,9 +16,9 @@ import lila.socket.Socket.{ Uid, Uids }
import lila.socket.{ SocketTrouper, LoneSocket }
private[lobby] final class Socket(
val system: ActorSystem,
system: ActorSystem,
uidTtl: FiniteDuration
) extends SocketTrouper[Member](uidTtl) with LoneSocket {
) extends SocketTrouper[Member](system, uidTtl) with LoneSocket {
def monitoringName = "lobby"
def broomFrequency = 4073 millis
@ -45,7 +45,7 @@ private[lobby] final class Socket(
idleUids retain members.contains
hookSubscriberUids retain members.contains
case JoinP(uid, user, blocks, mobile, promise) =>
case Join(uid, user, blocks, mobile, promise) =>
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = Member(channel, user, blocks, uid, mobile)
addMember(uid, member)

View File

@ -95,7 +95,7 @@ private[lobby] final class SocketHandler(
def apply(uid: Uid, user: Option[User], mobile: Boolean): Fu[JsSocketHandler] =
(user ?? (u => blocking(u.id))) flatMap { blockedUserIds =>
socket.ask[Connected](JoinP(uid, user, blockedUserIds, mobile, _)) map {
socket.ask[Connected](Join(uid, user, blockedUserIds, mobile, _)) map {
case Connected(enum, member) => Handler.iteratee(
hub,
controller(socket, member, user.exists(_.isBot)),

View File

@ -47,7 +47,7 @@ private[lobby] case class BiteHook(hookId: String, uid: Uid, user: Option[LobbyU
private[lobby] case class BiteSeek(seekId: String, user: LobbyUser)
private[lobby] case class JoinHook(uid: Uid, hook: Hook, game: Game, creatorColor: chess.Color)
private[lobby] case class JoinSeek(userId: String, seek: Seek, game: Game, creatorColor: chess.Color)
private[lobby] case class JoinP(uid: Uid, user: Option[User], blocking: Set[String], mobile: Boolean, promise: Promise[Connected])
private[lobby] case class Join(uid: Uid, user: Option[User], blocking: Set[String], mobile: Boolean, promise: Promise[Connected])
private[lobby] case object Resync
private[lobby] case class HookIds(ids: Vector[String])

View File

@ -7,7 +7,6 @@ final class Env(
config: Config,
db: lila.db.Env,
getLightUser: lila.common.LightUser.GetterSync,
roundSocketHub: ActorSelection,
scheduler: lila.common.Scheduler,
system: ActorSystem
) {
@ -32,7 +31,7 @@ final class Env(
private lazy val pushApi = new PushApi(
oneSignalPush,
getLightUser,
roundSocketHub,
bus = system.lilaBus,
scheduler = scheduler
)
@ -54,7 +53,6 @@ object Env {
db = lila.db.Env.current,
system = lila.common.PlayApp.system,
getLightUser = lila.user.Env.current.lightUserSync,
roundSocketHub = lila.hub.Env.current.socket.round,
scheduler = lila.common.PlayApp.scheduler,
config = lila.common.PlayApp loadConfig "push"
)

View File

@ -1,9 +1,9 @@
package lila.push
import akka.actor._
import akka.pattern.ask
import play.api.libs.json._
import scala.concurrent.duration._
import scala.concurrent.Promise
import chess.format.Forsyth
import lila.challenge.Challenge
@ -16,7 +16,7 @@ import lila.message.{ Thread, Post }
private final class PushApi(
oneSignalPush: OneSignalPush,
implicit val lightUser: LightUser.GetterSync,
roundSocketHub: ActorSelection,
bus: lila.common.Bus,
scheduler: lila.common.Scheduler
) {
@ -220,8 +220,12 @@ private final class PushApi(
}
private def IfAway(pov: Pov)(f: => Funit): Funit = {
import makeTimeout.short
roundSocketHub ? Ask(pov.gameId, IsOnGame(pov.color)) mapTo manifest[Boolean] flatMap {
val promise = Promise[Boolean]
bus.publish(
Ask(pov.gameId, IsOnGame(pov.color, promise)),
'roundSocket
)
promise.future flatMap {
case true => funit
case false => f
}

View File

@ -15,7 +15,7 @@ import makeTimeout.short
private final class CorresAlarm(
coll: Coll,
roundSocketHub: ActorSelection
socketMap: SocketMap
) extends Actor {
object Run
@ -55,7 +55,7 @@ private final class CorresAlarm(
case (count, alarm) => GameRepo.game(alarm._id).flatMap {
_ ?? { game =>
val pov = Pov(game, game.turnColor)
roundSocketHub ? Ask(pov.gameId, IsOnGame(pov.color)) mapTo manifest[Boolean] addEffect {
socketMap.ask[Boolean](pov.gameId)(IsOnGame(pov.color, _)) addEffect {
case true => // already looking at the game
case false => context.system.lilaBus.publish(
lila.game.actorApi.CorresAlarmEvent(pov),

View File

@ -8,9 +8,10 @@ import scala.concurrent.duration._
import actorApi.{ GetSocketStatus, SocketStatus }
import lila.game.{ Game, GameRepo, Pov }
import lila.hub.actorApi.map.{ Ask, Tell }
import lila.hub.actorApi.map.{ Ask, Tell, Exists }
import lila.hub.actorApi.round.{ Abort, Resign, FishnetPlay }
import lila.hub.actorApi.{ HasUserId, DeployPost }
import lila.hub.TrouperMap
final class Env(
config: Config,
@ -43,7 +44,6 @@ final class Env(
val PlayerRagequitTimeout = config duration "player.ragequit.timeout"
val AnimationDuration = config duration "animation.duration"
val MoretimeDuration = config duration "moretime"
val SocketName = config getString "socket.name"
val SocketTimeout = config duration "socket.timeout"
val NetDomain = config getString "net.domain"
val ActiveTtl = config duration "active.ttl"
@ -69,7 +69,7 @@ final class Env(
player = player,
drawer = drawer,
forecastApi = forecastApi,
socketHub = socketHub,
socketMap = socketMap,
moretimeDuration = MoretimeDuration
)
val roundMap = new lila.hub.DuctMap[Round](
@ -84,9 +84,9 @@ final class Env(
accessTimeout = ActiveTtl
)
bus.subscribeFun('roundMapTell, 'deploy, 'accountClose) {
case Tell(id, msg) => roundMap.tell(id, msg)
case DeployPost => roundMap.tellAll(DeployPost)
bus.subscribeFun('roundMapTell) { case Tell(id, msg) => roundMap.tell(id, msg) }
bus.subscribeFun('deploy) { case DeployPost => roundMap.tellAll(DeployPost) }
bus.subscribeFun('accountClose) {
case lila.hub.actorApi.security.CloseAccount(userId) => GameRepo.allPlaying(userId) map {
_ foreach { pov =>
roundMap.tell(pov.gameId, Resign(pov.playerId))
@ -113,39 +113,45 @@ final class Env(
}
}
private val socketHub = {
val actor = system.actorOf(
Props(new lila.socket.SocketHubActor[Socket] {
private var historyPersistenceEnabled = false
def mkActor(id: String) = new Socket(
gameId = id,
history = eventHistory(id, historyPersistenceEnabled),
lightUser = lightUser,
uidTimeout = UidTimeout,
socketTimeout = SocketTimeout,
disconnectTimeout = PlayerDisconnectTimeout,
ragequitTimeout = PlayerRagequitTimeout,
simulActor = hub.actor.simul
)
def receive: Receive = ({
case msg @ lila.chat.actorApi.ChatLine(id, line) =>
self ! Tell(id.value take 8, msg)
case _: lila.hub.actorApi.Deploy =>
logger.warn("Enable history persistence")
historyPersistenceEnabled = true
// if the deploy didn't go through, cancel persistence
system.scheduler.scheduleOnce(10.minutes) {
logger.warn("Disabling round history persistence!")
historyPersistenceEnabled = false
}
case msg: lila.game.actorApi.StartGame =>
self ! Tell(msg.game.id, msg)
}: Receive) orElse socketHubReceive
}),
name = SocketName
)
bus.subscribe(actor, 'tvSelect, 'startGame, 'deploy)
actor
private var historyPersistenceEnabled = false
private val socketMap: SocketMap = new TrouperMap[RoundSocket](
mkTrouper = (id: Game.ID) => new RoundSocket(
system = system,
gameId = id,
history = eventHistory(id, historyPersistenceEnabled),
lightUser = lightUser,
uidTtl = UidTimeout,
disconnectTimeout = PlayerDisconnectTimeout,
ragequitTimeout = PlayerRagequitTimeout,
simulActor = hub.actor.simul,
keepMeAlive = () => socketMap touch id
),
accessTimeout = SocketTimeout
)
system.scheduler.schedule(30 seconds, 30 seconds) {
socketMap.monitor("round.socketMap")
}
system.scheduler.schedule(10 seconds, 4001 millis) {
socketMap tellAll lila.socket.actorApi.Broom
}
bus.subscribeFun('startGame) {
case msg: lila.game.actorApi.StartGame => socketMap.tellIfPresent(msg.game.id, msg)
}
bus.subscribeFun('roundSocket) {
case Tell(id, msg) => socketMap.tellIfPresent(id, msg)
case Ask(id, msg) => socketMap.tell(id, msg)
case Exists(id, promise) => promise success socketMap.exists(id)
}
bus.subscribeFun('deploy) {
case m: lila.hub.actorApi.Deploy =>
socketMap tellAll m
logger.warn("Enable history persistence")
historyPersistenceEnabled = true
// if the deploy didn't go through, cancel persistence
system.scheduler.scheduleOnce(10.minutes) {
logger.warn("Disabling round history persistence!")
historyPersistenceEnabled = false
}
}
lazy val selfReport = new SelfReport(roundMap, slackApi)
@ -163,7 +169,7 @@ final class Env(
lazy val socketHandler = new SocketHandler(
hub = hub,
roundMap = roundMap,
socketHub = socketHub,
socketMap = socketMap,
messenger = messenger,
evalCacheHandler = evalCacheHandler,
selfReport = selfReport,
@ -222,15 +228,11 @@ final class Env(
chat = hub.actor.chat
)
def getSocketStatus(gameId: Game.ID): Fu[SocketStatus] = {
import makeTimeout.large
socketHub ? Ask(gameId, GetSocketStatus) mapTo manifest[SocketStatus]
}
def getSocketStatus(gameId: Game.ID): Fu[SocketStatus] =
socketMap.ask[SocketStatus](gameId)(GetSocketStatus)
private def isUserPresent(game: Game, userId: lila.user.User.ID): Fu[Boolean] = {
import makeTimeout.large
socketHub ? Ask(game.id, HasUserId(userId)) mapTo manifest[Boolean]
}
private def isUserPresent(game: Game, userId: lila.user.User.ID): Fu[Boolean] =
socketMap.ask[Boolean](game.id)(HasUserId(userId, _))
lazy val jsonView = new JsonView(
noteApi = noteApi,
@ -253,7 +255,7 @@ final class Env(
)
bus.subscribe(system.actorOf(
Props(new CorresAlarm(db(CollectionAlarm), hub.socket.round)),
Props(new CorresAlarm(db(CollectionAlarm), socketMap)),
name = "corres-alarm"
), 'moveEventCorres, 'finishGame)

View File

@ -74,7 +74,7 @@ private[round] final class Round(
case ResignForce(playerId) => handle(playerId) { pov =>
(pov.game.resignable && !pov.game.hasAi && pov.game.hasClock) ?? {
socketHub ? Ask(pov.gameId, IsGone(!pov.color)) flatMap {
socketMap.ask[Boolean](pov.gameId)(IsGone(!pov.color, _)) flatMap {
case true => finisher.rageQuit(pov.game, Some(pov.color))
case _ => fuccess(List(Event.Reload))
}
@ -83,7 +83,7 @@ private[round] final class Round(
case DrawForce(playerId) => handle(playerId) { pov =>
(pov.game.drawable && !pov.game.hasAi && pov.game.hasClock) ?? {
socketHub ? Ask(pov.gameId, IsGone(!pov.color)) flatMap {
socketMap.ask[Boolean](pov.gameId)(IsGone(!pov.color, _)) flatMap {
case true => finisher.rageQuit(pov.game, None)
case _ => fuccess(List(Event.Reload))
}
@ -263,7 +263,7 @@ private[round] final class Round(
private[this] def publish[A](op: Fu[Events]): Funit = op.map { events =>
if (events.nonEmpty) {
socketHub ! Tell(gameId, EventList(events))
socketMap.tell(gameId, EventList(events))
if (events exists {
case e: Event.Move => e.threefold
case _ => false
@ -292,7 +292,7 @@ object Round {
player: Player,
drawer: Drawer,
forecastApi: ForecastApi,
socketHub: ActorRef,
socketMap: SocketMap,
moretimeDuration: FiniteDuration
)

View File

@ -17,34 +17,33 @@ import lila.hub.actorApi.Deploy
import lila.hub.actorApi.game.ChangeFeatured
import lila.hub.actorApi.round.{ IsOnGame, TourStanding }
import lila.hub.actorApi.tv.{ Select => TvSelect }
import lila.hub.TimeBomb
import lila.hub.Trouper
import lila.socket._
import lila.socket.actorApi.{ Connected => _, _ }
import lila.socket.Socket.Uid
import lila.socket.Socket
import makeTimeout.short
private[round] final class Socket(
private[round] final class RoundSocket(
system: ActorSystem,
gameId: Game.ID,
history: History,
lightUser: LightUser.Getter,
uidTimeout: FiniteDuration,
socketTimeout: FiniteDuration,
uidTtl: FiniteDuration,
disconnectTimeout: FiniteDuration,
ragequitTimeout: FiniteDuration,
simulActor: ActorSelection
) extends SocketActor[Member](uidTimeout) {
simulActor: ActorSelection,
keepMeAlive: () => Unit
) extends SocketTrouper[Member](system, uidTtl) {
private var hasAi = false
private var mightBeSimul = true // until proven false
private var chatIds = Socket.ChatIds(
private var chatIds = RoundSocket.ChatIds(
priv = Chat.Id(gameId), // until replaced with tourney/simul chat
pub = Chat.Id(s"$gameId/w")
)
private var tournamentId = none[String] // until set, to listen to standings
private val timeBomb = new TimeBomb(socketTimeout)
private[this] var delayedCrowdNotification = false
private var delayedCrowdNotification = false
private final class Player(color: Color) {
@ -68,7 +67,9 @@ private[round] final class Socket(
private def isBye = bye > 0
private def isHostingSimul: Fu[Boolean] = userId.ifTrue(mightBeSimul) ?? { u =>
simulActor ? lila.hub.actorApi.simul.GetHostIds mapTo manifest[Set[String]] map (_ contains u)
import akka.pattern.{ ask => actorAsk }
import makeTimeout.short
actorAsk(simulActor, lila.hub.actorApi.simul.GetHostIds) mapTo manifest[Set[String]] map (_ contains u)
}
def isGone: Fu[Boolean] = {
@ -85,15 +86,12 @@ private[round] final class Socket(
private val whitePlayer = new Player(White)
private val blackPlayer = new Player(Black)
override def preStart(): Unit = {
super.preStart()
buscriptions.subAll
GameRepo game gameId map SetGame.apply pipeTo self
}
buscriptions.subAll
GameRepo game gameId map SetGame.apply foreach this.!
override def postStop(): Unit = {
override def stop(): Unit = {
buscriptions.unsubAll
super.postStop()
super.stop()
}
private object buscriptions {
@ -101,12 +99,12 @@ private[round] final class Socket(
private var classifiers = collection.mutable.Set.empty[Symbol]
private def sub(classifier: Symbol) = {
lilaBus.subscribe(self, classifier)
lilaBus.subscribe(RoundSocket.this, classifier)
classifiers += classifier
}
def unsubAll = {
classifiers foreach { lilaBus.unsubscribe(self, _) }
lilaBus.unsubscribe(RoundSocket.this, classifiers.toSeq)
classifiers.clear
}
@ -129,7 +127,7 @@ private[round] final class Socket(
}
}
def receiveSpecific = ({
def receiveSpecific: Trouper.Receive = ({
case SetGame(Some(game)) =>
hasAi = game.hasAi
@ -144,19 +142,17 @@ private[round] final class Socket(
tournamentId = tourId.some
buscriptions.tournament
}
case SetGame(None) => self ! PoisonPill // should never happen but eh
// from lilaBus 'startGame
// sets definitive user ids
// in case one joined after the socket creation
case StartGame(game) => self ! SetGame(game.some)
case StartGame(game) => this ! SetGame(game.some)
case d: Deploy =>
onDeploy(d)
history.enablePersistence
case Ping(uid, vOpt, lagCentis) =>
timeBomb.delay
ping(uid, lagCentis)
ownerOf(uid) foreach { o =>
playerDo(o.color, _.ping)
@ -175,29 +171,22 @@ private[round] final class Socket(
case Bye(color) => playerDo(color, _.setBye)
case Broom =>
broom
if (timeBomb.boom) self ! PoisonPill
else if (!hasAi) Color.all foreach { c =>
playerGet(c, _.isGone) foreach { _ ?? notifyGone(c, true) }
}
case IsGone(color, promise) => promise completeWith playerGet(color, _.isGone)
case IsGone(color) => playerGet(color, _.isGone) pipeTo sender
case IsOnGame(color, promise) => promise success ownerIsHere(color)
case IsOnGame(color) => sender ! ownerIsHere(color)
case GetSocketStatus =>
playerGet(White, _.isGone) zip playerGet(Black, _.isGone) map {
case (whiteIsGone, blackIsGone) => SocketStatus(
case GetSocketStatus(promise) =>
playerGet(White, _.isGone) zip playerGet(Black, _.isGone) foreach {
case (whiteIsGone, blackIsGone) => promise success SocketStatus(
version = history.getVersion,
whiteOnGame = ownerIsHere(White),
whiteIsGone = whiteIsGone,
blackOnGame = ownerIsHere(Black),
blackIsGone = blackIsGone
)
} pipeTo sender
}
case Join(uid, user, color, playerId, ip, onTv, version) =>
case Join(uid, user, color, playerId, ip, onTv, version, promise) =>
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = Member(channel, user, color, playerId, ip, onTv.map(_.userId))
addMember(uid, member)
@ -224,7 +213,7 @@ private[round] final class Socket(
initialMsgs.fold(enumerator) { _ >>> enumerator }
)
sender ! Connected(fullEnumerator, member)
promise success Connected(fullEnumerator, member)
case Nil =>
case eventList: EventList => notify(eventList.events)
@ -271,11 +260,18 @@ private[round] final class Socket(
case TourStanding(json) => notifyOwners("tourStanding", json)
}: Actor.Receive) orElse lila.chat.Socket.out(
}: Trouper.Receive) orElse lila.chat.Socket.out(
send = (t, d, _) => notifyAll(t, d)
)
override def quit(uid: Uid) =
override def broom = {
super.broom
if (!hasAi) Color.all foreach { c =>
playerGet(c, _.isGone) foreach { _ ?? notifyGone(c, true) }
}
}
override def quit(uid: Socket.Uid) =
if (members contains uid.value) {
super.quit(uid)
notifyCrowd
@ -284,7 +280,7 @@ private[round] final class Socket(
def notifyCrowd: Unit = {
if (!delayedCrowdNotification) {
delayedCrowdNotification = true
context.system.scheduler.scheduleOnce(1 second, self, NotifyCrowd)
system.scheduler.scheduleOnce(1 second)(this ! NotifyCrowd)
}
}
@ -326,7 +322,7 @@ private[round] final class Socket(
m.owner && m.color == color
}
def ownerOf(uid: Uid): Option[Member] =
def ownerOf(uid: Socket.Uid): Option[Member] =
members get uid.value filter (_.owner)
def foreachWatcher(f: Member => Unit): Unit = members.foreachValue { m =>
@ -341,7 +337,7 @@ private[round] final class Socket(
}
}
object Socket {
object RoundSocket {
case class ChatIds(priv: Chat.Id, pub: Chat.Id) {
def all = Seq(priv, pub)

View File

@ -4,8 +4,6 @@ import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.util.Try
import akka.actor._
import akka.pattern.ask
import chess.format.Uci
import chess.{ Centis, MoveMetrics, Color }
import play.api.libs.json.{ JsObject, JsNumber, Json, Reads }
@ -26,7 +24,7 @@ import makeTimeout.short
private[round] final class SocketHandler(
roundMap: DuctMap[Round],
socketHub: ActorRef,
socketMap: SocketMap,
hub: lila.hub.Env,
messenger: Messenger,
evalCacheHandler: lila.evalCache.EvalCacheSocketHandler,
@ -40,7 +38,7 @@ private[round] final class SocketHandler(
private def controller(
gameId: Game.ID,
chat: Option[Chat.Setup], // if using a non-game chat (tournament, simul, ...)
socket: ActorRef,
socket: RoundSocket,
uid: Uid,
ref: PovRef,
member: Member,
@ -145,29 +143,34 @@ private[round] final class SocketHandler(
userTv: Option[UserTv],
version: Option[SocketVersion]
): Fu[JsSocketHandler] = {
val join = Join(
val socket = socketMap getOrMake pov.gameId
socket.ask[Connected](promise => Join(
uid = uid,
user = user,
color = pov.color,
playerId = playerId,
ip = ip,
userTv = userTv,
version = version
)
// non-game chat, for tournament or simul games; only for players
val chatSetup = playerId.isDefined ?? {
pov.game.tournamentId.map(Chat.tournamentSetup) orElse pov.game.simulId.map(Chat.simulSetup)
}
socketHub ? Get(pov.gameId) mapTo manifest[ActorRef] flatMap { socket =>
Handler.forActor(hub, socket, uid, join) {
case Connected(enum, member) =>
// register to the TV channel when watching TV
if (playerId.isEmpty && isRecentTv(pov.gameId)) bus.publish(
lila.socket.Channel.Sub(member),
'tvSelectChannel
)
(controller(pov.gameId, chatSetup, socket, uid, pov.ref, member, user), enum, member)
}
version = version,
promise = promise
)) map {
case Connected(enum, member) =>
// register to the TV channel when watching TV
if (playerId.isEmpty && isRecentTv(pov.gameId)) bus.publish(
lila.socket.Channel.Sub(member),
'tvSelectChannel
)
// non-game chat, for tournament or simul games; only for players
val chatSetup = playerId.isDefined ?? {
pov.game.tournamentId.map(Chat.tournamentSetup) orElse pov.game.simulId.map(Chat.simulSetup)
}
Handler.iteratee(
hub,
controller(pov.gameId, chatSetup, socket, uid, pov.ref, member, user),
member,
socket,
uid
) -> enum
}
}

View File

@ -77,13 +77,14 @@ case class Join(
playerId: Option[String],
ip: IpAddress,
userTv: Option[UserTv],
version: Option[SocketVersion]
version: Option[SocketVersion],
promise: Promise[Connected]
)
case class UserTv(userId: User.ID, reload: Fu[Boolean])
case class Connected(enumerator: JsEnumerator, member: Member)
case class Bye(color: Color)
case class IsGone(color: Color)
case object GetSocketStatus
case class IsGone(color: Color, promise: Promise[Boolean])
case class GetSocketStatus(promise: Promise[SocketStatus])
case class SocketStatus(
version: SocketVersion,
whiteOnGame: Boolean,

View File

@ -5,6 +5,8 @@ import lila.socket.WithSocket
package object round extends PackageObject with WithSocket {
private[round] type SocketMap = lila.hub.TrouperMap[RoundSocket]
private[round] type Events = List[Event]
private[round] type VersionedEvents = List[VersionedEvent]

View File

@ -13,15 +13,15 @@ import lila.socket.{ SocketTrouper, History, Historical }
import lila.chat.Chat
private[simul] final class Socket(
val system: ActorSystem,
system: ActorSystem,
simulId: String,
val history: History[Messadata],
protected val history: History[Messadata],
getSimul: Simul.ID => Fu[Option[Simul]],
jsonView: JsonView,
lightUser: lila.common.LightUser.Getter,
uidTtl: Duration,
keepMeAlive: () => Unit
) extends SocketTrouper[Member](uidTtl) with Historical[Member, Messadata] {
) extends SocketTrouper[Member](system, uidTtl) with Historical[Member, Messadata] {
lilaBus.subscribe(this, chatClassifier)
@ -72,7 +72,7 @@ private[simul] final class Socket(
case GetUserIdsP(promise) => promise success members.values.flatMap(_.userId)
case JoinP(uid, user, version, promise) =>
case Join(uid, user, version, promise) =>
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = Member(channel, user)
addMember(uid, member)

View File

@ -29,7 +29,7 @@ private[simul] final class SocketHandler(
exists(simulId) flatMap {
_ ?? {
val socket = socketMap getOrMake simulId
socket.ask[Connected](JoinP(uid, user, version, _)) map {
socket.ask[Connected](Join(uid, user, version, _)) map {
case Connected(enum, member) => Handler.iteratee(
hub,
controller(socket, simulId, uid, member),

View File

@ -24,7 +24,7 @@ private[simul] object Member {
private[simul] case class Messadata(trollish: Boolean = false)
private[simul] case class JoinP(
private[simul] case class Join(
uid: Uid,
user: Option[User],
version: Option[SocketVersion],

View File

@ -11,9 +11,9 @@ import lila.socket._
import lila.socket.actorApi.SendToFlag
private[site] final class Socket(
val system: akka.actor.ActorSystem,
system: akka.actor.ActorSystem,
uidTtl: Duration
) extends SocketTrouper[Member](uidTtl) with LoneSocket {
) extends SocketTrouper[Member](system, uidTtl) with LoneSocket {
def monitoringName = "site"
def broomFrequency = 4159 millis
@ -24,7 +24,7 @@ private[site] final class Socket(
def receiveSpecific = {
case JoinP(uid, userId, flag, promise) =>
case Join(uid, userId, flag, promise) =>
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = Member(channel, userId, flag)
addMember(uid, member)

View File

@ -15,7 +15,7 @@ private[site] final class SocketHandler(
userId: Option[String],
flag: Option[String]
): Fu[JsSocketHandler] =
socket.ask[Connected](JoinP(uid, userId, flag, _)) map {
socket.ask[Connected](Join(uid, userId, flag, _)) map {
case Connected(enum, member) => Handler.iteratee(
hub,
PartialFunction.empty,
@ -38,7 +38,7 @@ private[site] final class SocketHandler(
case _ => // not available on API socket
}
socket.ask[Connected](JoinP(uid, userId, flag, _)) map {
socket.ask[Connected](Join(uid, userId, flag, _)) map {
case Connected(enum, member) => Handler.iteratee(
hub,
controller(member),

View File

@ -17,5 +17,5 @@ case class Member(
def isApi = flag has "api"
}
case class JoinP(uid: Uid, userId: Option[String], flag: Option[String], promise: Promise[Connected])
private[site] case class Join(uid: Uid, userId: Option[String], flag: Option[String], promise: Promise[Connected])
private[site] case class Connected(enumerator: JsEnumerator, member: Member)

View File

@ -6,30 +6,23 @@ import com.typesafe.config.Config
import actorApi._
final class Env(
system: ActorSystem,
scheduler: lila.common.Scheduler
system: ActorSystem
) {
import scala.concurrent.duration._
private val socketHub = system.actorOf(Props[SocketHub])
private val population = new Population(system)
private val moveBroadcast = new MoveBroadcast(system)
private val userRegister = new UserRegister(system)
scheduler.once(10 seconds) {
scheduler.message(4 seconds) { socketHub -> actorApi.Broom }
}
system.scheduler.schedule(5 seconds, 1 seconds) { population ! PopulationTell }
}
object Env {
lazy val current = "socket" boot new Env(
system = lila.common.PlayApp.system,
scheduler = lila.common.PlayApp.scheduler
system = lila.common.PlayApp.system
)
}

View File

@ -3,9 +3,9 @@ package lila.socket
import play.api.libs.iteratee._
import play.api.libs.json._
trait Historical[M <: SocketMember, Metadata] { self: SocketBase[M] =>
trait Historical[M <: SocketMember, Metadata] { self: SocketTrouper[M] =>
val history: History[Metadata]
protected val history: History[Metadata]
protected type Message = History.Message[Metadata]

View File

@ -24,6 +24,8 @@ object Socket extends Socket {
case object GetVersion
case class GetVersionP(promise: Promise[SocketVersion])
val initialPong = makeMessage("n")
}
private[socket] trait Socket {
@ -32,6 +34,4 @@ private[socket] trait Socket {
JsObject(new Map.Map2("t", JsString(t), "d", writes.writes(data)))
def makeMessage(t: String): JsObject = JsObject(new Map.Map1("t", JsString(t)))
val initialPong = makeMessage("n")
}

View File

@ -1,28 +0,0 @@
package lila.socket
import scala.concurrent.duration._
import akka.actor.{ Deploy => _, _ }
import lila.hub.actorApi.HasUserId
abstract class SocketActor[M <: SocketMember](val uidTtl: Duration) extends SocketBase[M] with Actor {
protected val system = context.system
override def preStart: Unit = {
lilaBus.publish(lila.socket.SocketHub.Open(self), 'socket)
}
override def postStop(): Unit = {
super.postStop()
lilaBus.publish(lila.socket.SocketHub.Close(self), 'socket)
members foreachKey ejectUidString
}
protected val receiveActor: PartialFunction[Any, Unit] = {
case HasUserId(userId) => sender ! hasUserId(userId)
}
def receive = receiveSpecific orElse receiveActor orElse receiveGeneric
}

View File

@ -1,156 +0,0 @@
package lila.socket
import scala.concurrent.duration._
import scala.util.Random
import akka.actor.ActorSystem
import play.api.libs.json._
import actorApi._
import chess.Centis
import lila.common.LightUser
import lila.hub.actorApi.Deploy
import lila.memo.ExpireSetMemo
trait SocketBase[M <: SocketMember] extends Socket {
protected def uidTtl: Duration
protected def system: ActorSystem
protected val members = scala.collection.mutable.AnyRefMap.empty[String, M]
protected val aliveUids = new ExpireSetMemo(uidTtl)
protected var pong = initialPong
protected def lilaBus = system.lilaBus
// to be defined in subclassing socket
protected def receiveSpecific: PartialFunction[Any, Unit]
// generic message handler
protected def receiveGeneric: PartialFunction[Any, Unit] = {
case Ping(uid, _, lagCentis) => ping(uid, lagCentis)
case Broom => broom
// when a member quits
case Quit(uid) => quit(uid)
case Resync(uid) => resync(uid)
case d: Deploy => onDeploy(d)
}
protected def hasUserId(userId: String) = members.values.exists(_.userId contains userId)
protected def notifyAll[A: Writes](t: String, data: A): Unit =
notifyAll(makeMessage(t, data))
protected def notifyAll(t: String): Unit =
notifyAll(makeMessage(t))
protected def notifyAll(msg: JsObject): Unit =
members.foreachValue(_ push msg)
protected def notifyIf(msg: JsObject)(condition: M => Boolean): Unit =
members.foreachValue { member =>
if (condition(member)) member push msg
}
protected def notifyMember[A: Writes](t: String, data: A)(member: M): Unit = {
member push makeMessage(t, data)
}
protected def notifyUid[A: Writes](t: String, data: A)(uid: Socket.Uid): Unit = {
withMember(uid)(_ push makeMessage(t, data))
}
protected def ping(uid: Socket.Uid, lagCentis: Option[Centis]): Unit = {
setAlive(uid)
withMember(uid) { member =>
member push pong
for {
lc <- lagCentis
user <- member.userId
} UserLagCache.put(user, lc)
}
}
protected def broom: Unit =
members.keys foreach { uid =>
if (!aliveUids.get(uid)) ejectUidString(uid)
}
protected def ejectUidString(uid: String): Unit = eject(Socket.Uid(uid))
protected def eject(uid: Socket.Uid): Unit = withMember(uid) { member =>
member.end
quit(uid)
}
protected def quit(uid: Socket.Uid): Unit = withMember(uid) { member =>
members -= uid.value
lilaBus.publish(SocketLeave(uid, member), 'socketLeave)
}
protected def onDeploy(d: Deploy): Unit =
notifyAll(makeMessage(d.key))
protected val resyncMessage = makeMessage("resync")
protected def resync(member: M): Unit = {
import scala.concurrent.duration._
system.scheduler.scheduleOnce((Random nextInt 2000).milliseconds) {
resyncNow(member)
}
}
protected def resync(uid: Socket.Uid): Unit =
withMember(uid)(resync)
protected def resyncNow(member: M): Unit =
member push resyncMessage
protected def addMember(uid: Socket.Uid, member: M): Unit = {
eject(uid)
members += (uid.value -> member)
setAlive(uid)
lilaBus.publish(SocketEnter(uid, member), 'socketEnter)
}
protected def setAlive(uid: Socket.Uid): Unit = aliveUids put uid.value
protected def membersByUserId(userId: String): Iterable[M] = members collect {
case (_, member) if member.userId.contains(userId) => member
}
protected def firstMemberByUserId(userId: String): Option[M] = members collectFirst {
case (_, member) if member.userId.contains(userId) => member
}
protected def uidToUserId(uid: Socket.Uid): Option[String] = members get uid.value flatMap (_.userId)
protected val maxSpectatorUsers = 15
protected def showSpectators(lightUser: LightUser.Getter)(watchers: Iterable[SocketMember]): Fu[JsValue] = watchers.size match {
case 0 => fuccess(JsNull)
case s if s > maxSpectatorUsers => fuccess(Json.obj("nb" -> s))
case s => {
val userIdsWithDups = watchers.toSeq.flatMap(_.userId)
val anons = s - userIdsWithDups.size
val userIds = userIdsWithDups.distinct
val total = anons + userIds.size
userIds.map(lightUser).sequenceFu.map { users =>
Json.obj(
"nb" -> total,
"users" -> users.flatten.map(_.titleName),
"anons" -> anons
)
}
}
}
protected def withMember(uid: Socket.Uid)(f: M => Unit): Unit = members get uid.value foreach f
}

View File

@ -1,36 +0,0 @@
package lila.socket
import akka.actor._
final class SocketHub extends Actor {
private val sockets = collection.mutable.Set[ActorRef]()
override def preStart(): Unit = {
context.system.lilaBus.subscribe(self, 'deploy, 'socket)
}
override def postStop(): Unit = {
super.postStop()
context.system.lilaBus.unsubscribe(self)
}
import SocketHub._
def receive = {
case Open(socket) => sockets += socket
case Close(socket) => sockets -= socket
case lila.hub.actorApi.DeployPre => // ignore
case msg => sockets foreach (_ ! msg)
}
}
case object SocketHub {
case class Open(actor: ActorRef)
case class Close(actor: ActorRef)
}

View File

@ -1,25 +0,0 @@
package lila.socket
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import lila.hub.ActorMap
trait SocketHubActor[A <: SocketActor[_]] extends Socket with ActorMap {
override val supervisorStrategy =
OneForOneStrategy() {
// usually better to keep socket actors alive
case _: Exception => Resume
}
def socketHubReceive: Receive = actorMapReceive
}
object SocketHubActor {
trait Default[A <: SocketActor[_]] extends SocketHubActor[A] {
def receive = socketHubReceive
}
}

View File

@ -1,22 +1,25 @@
package lila.socket
import ornicar.scalalib.Random.approximatly
import play.api.libs.json._
import scala.concurrent.duration._
import scala.concurrent.Promise
import play.api.libs.iteratee._
import play.api.libs.json.JsValue
import ornicar.scalalib.Random.approximatly
import lila.hub.actorApi.HasUserIdP
import actorApi._
import chess.Centis
import lila.common.LightUser
import lila.hub.actorApi.Deploy
import lila.hub.actorApi.HasUserId
import lila.hub.Trouper
import lila.memo.ExpireSetMemo
abstract class SocketTrouper[M <: SocketMember](
val uidTtl: Duration
) extends SocketBase[M] with Trouper {
protected val system: akka.actor.ActorSystem,
protected val uidTtl: Duration
) extends Socket with Trouper {
import SocketTrouper._
case class AddMember(uid: Socket.Uid, member: M, promise: Promise[Unit])
override def stop() = {
super.stop()
members foreachKey ejectUidString
@ -24,22 +27,149 @@ abstract class SocketTrouper[M <: SocketMember](
protected val receiveTrouper: PartialFunction[Any, Unit] = {
case HasUserIdP(userId, promise) => promise success hasUserId(userId)
case AddMember(uid, member, promise) =>
addMember(uid, member)
promise.success(())
case HasUserId(userId, promise) => promise success hasUserId(userId)
case GetNbMembers(promise) => promise success members.size
}
val process = receiveSpecific orElse receiveTrouper orElse receiveGeneric
def addMember(uid: Socket.Uid)(make: JsChannel => M): Fu[(M, JsEnumerator)] = {
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = make(channel)
ask[Unit](AddMember(uid, member, _)) inject (member -> enumerator)
protected val members = scala.collection.mutable.AnyRefMap.empty[String, M]
protected val aliveUids = new ExpireSetMemo(uidTtl)
protected var pong = Socket.initialPong
protected def lilaBus = system.lilaBus
// to be defined in subclassing socket
protected def receiveSpecific: PartialFunction[Any, Unit]
// generic message handler
protected def receiveGeneric: PartialFunction[Any, Unit] = {
case Ping(uid, _, lagCentis) => ping(uid, lagCentis)
case Broom => broom
// when a member quits
case Quit(uid) => quit(uid)
case Resync(uid) => resync(uid)
case d: Deploy => onDeploy(d)
}
protected def hasUserId(userId: String) = members.values.exists(_.userId contains userId)
protected def notifyAll[A: Writes](t: String, data: A): Unit =
notifyAll(makeMessage(t, data))
protected def notifyAll(t: String): Unit =
notifyAll(makeMessage(t))
protected def notifyAll(msg: JsObject): Unit =
members.foreachValue(_ push msg)
protected def notifyIf(msg: JsObject)(condition: M => Boolean): Unit =
members.foreachValue { member =>
if (condition(member)) member push msg
}
protected def notifyMember[A: Writes](t: String, data: A)(member: M): Unit = {
member push makeMessage(t, data)
}
protected def notifyUid[A: Writes](t: String, data: A)(uid: Socket.Uid): Unit = {
withMember(uid)(_ push makeMessage(t, data))
}
protected def ping(uid: Socket.Uid, lagCentis: Option[Centis]): Unit = {
setAlive(uid)
withMember(uid) { member =>
member push pong
for {
lc <- lagCentis
user <- member.userId
} UserLagCache.put(user, lc)
}
}
protected def broom: Unit =
members.keys foreach { uid =>
if (!aliveUids.get(uid)) ejectUidString(uid)
}
protected def ejectUidString(uid: String): Unit = eject(Socket.Uid(uid))
protected def eject(uid: Socket.Uid): Unit = withMember(uid) { member =>
member.end
quit(uid)
}
protected def quit(uid: Socket.Uid): Unit = withMember(uid) { member =>
members -= uid.value
lilaBus.publish(SocketLeave(uid, member), 'socketLeave)
}
protected def onDeploy(d: Deploy): Unit =
notifyAll(makeMessage(d.key))
protected val resyncMessage = makeMessage("resync")
protected def resync(member: M): Unit = {
import scala.concurrent.duration._
system.scheduler.scheduleOnce((scala.util.Random nextInt 2000).milliseconds) {
resyncNow(member)
}
}
protected def resync(uid: Socket.Uid): Unit =
withMember(uid)(resync)
protected def resyncNow(member: M): Unit =
member push resyncMessage
protected def addMember(uid: Socket.Uid, member: M): Unit = {
eject(uid)
members += (uid.value -> member)
setAlive(uid)
lilaBus.publish(SocketEnter(uid, member), 'socketEnter)
}
protected def setAlive(uid: Socket.Uid): Unit = aliveUids put uid.value
protected def membersByUserId(userId: String): Iterable[M] = members collect {
case (_, member) if member.userId.contains(userId) => member
}
protected def firstMemberByUserId(userId: String): Option[M] = members collectFirst {
case (_, member) if member.userId.contains(userId) => member
}
protected def uidToUserId(uid: Socket.Uid): Option[String] = members get uid.value flatMap (_.userId)
protected val maxSpectatorUsers = 15
protected def showSpectators(lightUser: LightUser.Getter)(watchers: Iterable[SocketMember]): Fu[JsValue] = watchers.size match {
case 0 => fuccess(JsNull)
case s if s > maxSpectatorUsers => fuccess(Json.obj("nb" -> s))
case s => {
val userIdsWithDups = watchers.toSeq.flatMap(_.userId)
val anons = s - userIdsWithDups.size
val userIds = userIdsWithDups.distinct
val total = anons + userIds.size
userIds.map(lightUser).sequenceFu.map { users =>
Json.obj(
"nb" -> total,
"users" -> users.flatten.map(_.titleName),
"anons" -> anons
)
}
}
}
protected def withMember(uid: Socket.Uid)(f: M => Unit): Unit = members get uid.value foreach f
}
object SocketTrouper {
@ -52,7 +182,7 @@ trait LoneSocket { self: SocketTrouper[_] =>
def monitoringName: String
def broomFrequency: FiniteDuration
system.scheduler.schedule(approximatly(0.1f)(12.seconds.toMillis).millis.pp, broomFrequency) {
system.scheduler.schedule(approximatly(0.1f)(12.seconds.toMillis).millis, broomFrequency) {
this ! lila.socket.actorApi.Broom
lila.mon.socket.queueSize(monitoringName)(estimateQueueSize)
}

View File

@ -4,7 +4,7 @@ import akka.actor._
import com.typesafe.config.Config
import scala.concurrent.duration._
import lila.hub.actorApi.HasUserIdP
import lila.hub.actorApi.HasUserId
import lila.hub.actorApi.map.Ask
import lila.hub.{ Duct, DuctMap, TrouperMap }
import lila.socket.Socket.{ GetVersionP, SocketVersion }
@ -67,7 +67,7 @@ final class Env(
socketMap.askIfPresentOrZero[SocketVersion](studyId.value)(GetVersionP)
def isConnected(studyId: Study.Id, userId: User.ID): Fu[Boolean] =
socketMap.askIfPresentOrZero[Boolean](studyId.value)(HasUserIdP(userId, _))
socketMap.askIfPresentOrZero[Boolean](studyId.value)(HasUserId(userId, _))
lazy val socketHandler = new SocketHandler(
hub = hub,

View File

@ -299,7 +299,7 @@ final class SocketHandler(
controller: StudySocket.Member => Handler.Controller,
version: Option[SocketVersion]
): Fu[JsSocketHandler] =
socket.ask[StudySocket.Connected](StudySocket.JoinP(uid, user.map(_.id), user.??(_.troll), version, _)) map {
socket.ask[StudySocket.Connected](StudySocket.Join(uid, user.map(_.id), user.??(_.troll), version, _)) map {
case StudySocket.Connected(enum, member) => Handler.iteratee(
hub,
controller(member),

View File

@ -3,7 +3,7 @@ package lila.study
import akka.actor._
import akka.pattern.ask
import lila.hub.actorApi.HasUserIdP
import lila.hub.actorApi.HasUserId
import lila.notify.{ InvitedToStudy, NotifyApi, Notification }
import lila.pref.Pref
import lila.relation.{ Block, Follow }
@ -27,7 +27,7 @@ private final class StudyInvite(
_ <- study.members.contains(invited) ?? fufail[Unit]("Already a member")
relation <- getRelation(invited.id, byUserId)
_ <- relation.has(Block) ?? fufail[Unit]("This user does not want to join")
isPresent <- socket.ask[Boolean](HasUserIdP(invited.id, _))
isPresent <- socket.ask[Boolean](HasUserId(invited.id, _))
_ <- if (isPresent) funit else getPref(invited).map(_.studyInvite).flatMap {
case Pref.StudyInvite.ALWAYS => funit
case Pref.StudyInvite.NEVER => fufail("This user doesn't accept study invitations")

View File

@ -17,17 +17,17 @@ import lila.user.User
import lila.chat.Chat
final class StudySocket(
val system: ActorSystem,
system: ActorSystem,
studyId: Study.Id,
jsonView: JsonView,
studyRepo: StudyRepo,
chapterRepo: ChapterRepo,
lightUserApi: lila.user.LightUserApi,
val history: History[StudySocket.Messadata],
protected val history: History[StudySocket.Messadata],
uidTtl: Duration,
lightStudyCache: LightStudyCache,
keepMeAlive: () => Unit
) extends SocketTrouper[StudySocket.Member](uidTtl) with Historical[StudySocket.Member, StudySocket.Messadata] {
) extends SocketTrouper[StudySocket.Member](system, uidTtl) with Historical[StudySocket.Member, StudySocket.Messadata] {
import StudySocket._
import JsonView._
@ -199,7 +199,7 @@ final class StudySocket(
case GetVersionP(promise) => promise success history.version
case JoinP(uid, userId, troll, version, promise) =>
case Join(uid, userId, troll, version, promise) =>
import play.api.libs.iteratee.Concurrent
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = Member(channel, userId, troll = troll)
@ -284,7 +284,7 @@ object StudySocket {
import JsonView.uidWriter
implicit private val whoWriter = Json.writes[Who]
case class JoinP(uid: Uid, userId: Option[User.ID], troll: Boolean, version: Option[SocketVersion], promise: Promise[Connected])
case class Join(uid: Uid, userId: Option[User.ID], troll: Boolean, version: Option[SocketVersion], promise: Promise[Connected])
case class Connected(enumerator: JsEnumerator, member: Member)
case class ReloadUid(uid: Uid)

View File

@ -20,7 +20,6 @@ final class Env(
flood: lila.security.Flood,
hub: lila.hub.Env,
roundMap: DuctMap[_],
roundSocketHub: ActorSelection,
lightUserApi: lila.user.LightUserApi,
isOnline: String => Boolean,
onStart: String => Unit,
@ -191,7 +190,7 @@ final class Env(
// is that user playing a game of this tournament
// or hanging out in the tournament lobby (joined or not)
def hasUser(tourId: Tournament.ID, userId: User.ID): Fu[Boolean] =
socketMap.askIfPresentOrZero[Boolean](tourId)(lila.hub.actorApi.HasUserIdP(userId, _)) >>|
socketMap.askIfPresentOrZero[Boolean](tourId)(lila.hub.actorApi.HasUserId(userId, _)) >>|
PairingRepo.isPlaying(tourId, userId)
def cli = new lila.common.Cli {
@ -220,7 +219,6 @@ object Env {
flood = lila.security.Env.current.flood,
hub = lila.hub.Env.current,
roundMap = lila.round.Env.current.roundMap,
roundSocketHub = lila.hub.Env.current.socket.round,
lightUserApi = lila.user.Env.current.lightUserApi,
isOnline = lila.user.Env.current.isOnline,
onStart = lila.game.Env.current.onStart,

View File

@ -14,14 +14,14 @@ import lila.socket.actorApi.{ Connected => _, _ }
import lila.socket.{ SocketTrouper, History, Historical }
private[tournament] final class Socket(
val system: ActorSystem,
system: ActorSystem,
tournamentId: String,
val history: History[Messadata],
protected val history: History[Messadata],
jsonView: JsonView,
lightUser: lila.common.LightUser.Getter,
uidTtl: Duration,
keepMeAlive: () => Unit
) extends SocketTrouper[Member](uidTtl) with Historical[Member, Messadata] {
) extends SocketTrouper[Member](system, uidTtl) with Historical[Member, Messadata] {
private var delayedCrowdNotification = false
private var delayedReloadNotification = false
@ -69,7 +69,7 @@ private[tournament] final class Socket(
case lila.socket.Socket.GetVersionP(promise) => promise success history.version
case JoinP(uid, user, version, promise) =>
case Join(uid, user, version, promise) =>
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = Member(channel, user)
addMember(uid, member)

View File

@ -31,7 +31,7 @@ private[tournament] final class SocketHandler(
TournamentRepo exists tourId flatMap {
_ ?? {
val socket = socketMap getOrMake tourId
socket.ask[Connected](JoinP(uid, user, version, _)) map {
socket.ask[Connected](Join(uid, user, version, _)) map {
case Connected(enum, member) => Handler.iteratee(
hub,
controller(socket, tourId, uid, member),

View File

@ -24,7 +24,7 @@ private[tournament] object Member {
private[tournament] case class Messadata(trollish: Boolean = false)
private[tournament] case class JoinP(
private[tournament] case class Join(
uid: Uid,
user: Option[User],
version: Option[SocketVersion],

View File

@ -27,7 +27,6 @@ final class Env(
private val tvTrouper = new TvTrouper(
system,
hub.actor.renderer,
hub.socket.round,
selectChannel,
lightUser,
onSelect

View File

@ -13,7 +13,6 @@ import lila.hub.Trouper
private[tv] final class TvTrouper(
system: ActorSystem,
rendererActor: ActorSelection,
roundSocket: ActorSelection,
selectChannel: lila.socket.Channel,
lightUser: LightUser.GetterSync,
onSelect: Game => Unit