diff --git a/modules/challenge/src/main/ChallengeApi.scala b/modules/challenge/src/main/ChallengeApi.scala index ceb1120ece..b16b5e9064 100644 --- a/modules/challenge/src/main/ChallengeApi.scala +++ b/modules/challenge/src/main/ChallengeApi.scala @@ -80,7 +80,8 @@ final class ChallengeApi( Bus.publish(Event.Decline(c declineWith reason), "challenge") } - private val acceptQueue = new lila.hub.DuctSequencer(maxSize = 64, timeout = 5 seconds, "challengeAccept") + private val acceptQueue = + new lila.hub.AsyncActorSequencer(maxSize = 64, timeout = 5 seconds, "challengeAccept") def accept( c: Challenge, diff --git a/modules/challenge/src/main/ChallengeBulk.scala b/modules/challenge/src/main/ChallengeBulk.scala index 7f0574e2dc..e4653b3775 100644 --- a/modules/challenge/src/main/ChallengeBulk.scala +++ b/modules/challenge/src/main/ChallengeBulk.scala @@ -13,7 +13,7 @@ import lila.common.Template import lila.db.dsl._ import lila.game.{ Game, Player } import lila.hub.actorApi.map.TellMany -import lila.hub.DuctSequencers +import lila.hub.AsyncActorSequencers import lila.rating.PerfType import lila.setup.SetupBulk.{ ScheduledBulk, ScheduledGame } import lila.user.User @@ -40,7 +40,7 @@ final class ChallengeBulkApi( private val coll = colls.bulk private val workQueue = - new DuctSequencers(maxSize = 16, expiration = 10 minutes, timeout = 10 seconds, name = "challenge.bulk") + new AsyncActorSequencers(maxSize = 16, expiration = 10 minutes, timeout = 10 seconds, name = "challenge.bulk") def scheduledBy(me: User): Fu[List[ScheduledBulk]] = coll.list[ScheduledBulk]($doc("by" -> me.id)) diff --git a/modules/common/src/main/mon.scala b/modules/common/src/main/mon.scala index 542baf773d..71ab42f0e8 100644 --- a/modules/common/src/main/mon.scala +++ b/modules/common/src/main/mon.scala @@ -169,7 +169,7 @@ object mon { object expiration { val count = counter("round.expiration.count").withoutTags() } - val ductCount = gauge("round.duct.count").withoutTags() + val asyncActorCount = gauge("round.asyncActor.count").withoutTags() } object playban { def outcome(out: String) = counter("playban.outcome").withTag("outcome", out) @@ -201,8 +201,8 @@ object mon { ) ) } - object duct { - def overflow(name: String) = counter("duct.overflow").withTag("name", name) + object asyncActor { + def overflow(name: String) = counter("asyncActor.overflow").withTag("name", name) } object irc { object zulip { diff --git a/modules/fishnet/src/main/Analyser.scala b/modules/fishnet/src/main/Analyser.scala index e5baffa55c..694f684fdd 100644 --- a/modules/fishnet/src/main/Analyser.scala +++ b/modules/fishnet/src/main/Analyser.scala @@ -20,7 +20,7 @@ final class Analyser( val maxPlies = 200 - private val workQueue = new lila.hub.DuctSequencer(maxSize = 256, timeout = 5 seconds, "fishnetAnalyser") + private val workQueue = new lila.hub.AsyncActorSequencer(maxSize = 256, timeout = 5 seconds, "fishnetAnalyser") def apply(game: Game, sender: Work.Sender): Fu[Boolean] = (game.metadata.analysed ?? analysisRepo.exists(game.id)) flatMap { diff --git a/modules/fishnet/src/main/FishnetApi.scala b/modules/fishnet/src/main/FishnetApi.scala index bcfe6e00af..3bab29cecd 100644 --- a/modules/fishnet/src/main/FishnetApi.scala +++ b/modules/fishnet/src/main/FishnetApi.scala @@ -27,7 +27,7 @@ final class FishnetApi( import JsonApi.Request.{ CompleteAnalysis, PartialAnalysis } import BSONHandlers._ - private val workQueue = new lila.hub.DuctSequencer(maxSize = 256, timeout = 5 seconds, name = "fishnetApi") + private val workQueue = new lila.hub.AsyncActorSequencer(maxSize = 256, timeout = 5 seconds, name = "fishnetApi") def keyExists(key: Client.Key) = repo.getEnabledClient(key).map(_.isDefined) diff --git a/modules/hub/src/main/AskPipeline.scala b/modules/hub/src/main/AskPipeline.scala index 776afd6f85..aa1c5c0da6 100644 --- a/modules/hub/src/main/AskPipeline.scala +++ b/modules/hub/src/main/AskPipeline.scala @@ -11,11 +11,11 @@ import scala.concurrent.{ ExecutionContext, Promise } final class AskPipeline[A](compute: () => Fu[A], timeout: FiniteDuration, name: String)(implicit system: akka.actor.ActorSystem, ec: scala.concurrent.ExecutionContext -) extends Trouper { +) extends SyncActor { private var state: State = Idle - protected val process: Trouper.Receive = { + protected val process: SyncActor.Receive = { case Get(promise) => state match { diff --git a/modules/hub/src/main/Duct.scala b/modules/hub/src/main/AsyncActor.scala similarity index 83% rename from modules/hub/src/main/Duct.scala rename to modules/hub/src/main/AsyncActor.scala index 040fd1067b..a7047ff891 100644 --- a/modules/hub/src/main/Duct.scala +++ b/modules/hub/src/main/AsyncActor.scala @@ -9,9 +9,9 @@ import scala.concurrent.Promise * Sequential like an actor, but for async functions, * and using an atomic backend instead of akka actor. */ -abstract class Duct(implicit ec: scala.concurrent.ExecutionContext) extends lila.common.Tellable { +abstract class AsyncActor(implicit ec: scala.concurrent.ExecutionContext) extends lila.common.Tellable { - import Duct._ + import AsyncActor._ // implement async behaviour here protected val process: ReceiveAsync @@ -33,13 +33,13 @@ abstract class Duct(implicit ec: scala.concurrent.ExecutionContext) extends lila private[this] val stateRef: AtomicReference[State] = new AtomicReference(None) private[this] def run(msg: Any): Unit = - process.applyOrElse(msg, Duct.fallback) onComplete postRun + process.applyOrElse(msg, AsyncActor.fallback) onComplete postRun private[this] val postRun = (_: Any) => stateRef.getAndUpdate(postRunUpdate) flatMap (_.headOption) foreach run } -object Duct { +object AsyncActor { type ReceiveAsync = PartialFunction[Any, Fu[Any]] @@ -53,7 +53,7 @@ object Duct { } private val fallback = { msg: Any => - lila.log("Duct").warn(s"unhandled msg: $msg") + lila.log("asyncActor").warn(s"unhandled msg: $msg") funit } } diff --git a/modules/hub/src/main/DuctConcMap.scala b/modules/hub/src/main/AsyncActorConcMap.scala similarity index 67% rename from modules/hub/src/main/DuctConcMap.scala rename to modules/hub/src/main/AsyncActorConcMap.scala index d1091ce921..f5123fd76f 100644 --- a/modules/hub/src/main/DuctConcMap.scala +++ b/modules/hub/src/main/AsyncActorConcMap.scala @@ -6,21 +6,21 @@ import ornicar.scalalib.Zero import scala.concurrent.{ ExecutionContext, Promise } import scala.jdk.CollectionConverters._ -final class DuctConcMap[D <: Duct]( - mkDuct: String => D, +final class AsyncActorConcMap[D <: AsyncActor]( + mkAsyncActor: String => D, initialCapacity: Int ) extends TellMap { - def getOrMake(id: String): D = ducts.computeIfAbsent(id, loadFunction) + def getOrMake(id: String): D = asyncActors.computeIfAbsent(id, loadFunction) - def getIfPresent(id: String): Option[D] = Option(ducts get id) + def getIfPresent(id: String): Option[D] = Option(asyncActors get id) def tell(id: String, msg: Any): Unit = getOrMake(id) ! msg def tellIfPresent(id: String, msg: => Any): Unit = getIfPresent(id) foreach (_ ! msg) def tellAll(msg: Any) = - ducts.forEachValue(16, _ ! msg) + asyncActors.forEachValue(16, _ ! msg) def tellIds(ids: Seq[String], msg: Any): Unit = ids foreach { tell(_, msg) } @@ -34,21 +34,21 @@ final class DuctConcMap[D <: Duct]( def askIfPresentOrZero[A: Zero](id: String)(makeMsg: Promise[A] => Any): Fu[A] = askIfPresent(id)(makeMsg) dmap (~_) - def exists(id: String): Boolean = ducts.get(id) != null + def exists(id: String): Boolean = asyncActors.get(id) != null def foreachKey(f: String => Unit): Unit = - ducts.forEachKey(16, k => f(k)) + asyncActors.forEachKey(16, k => f(k)) def tellAllWithAck(makeMsg: Promise[Unit] => Any)(implicit ec: ExecutionContext): Fu[Int] = - ducts.values.asScala + asyncActors.values.asScala .map(_ ask makeMsg) .sequenceFu .map(_.size) - def size: Int = ducts.size() + def size: Int = asyncActors.size() - def terminate(id: String, lastWill: Duct => Unit): Unit = - ducts + def terminate(id: String, lastWill: AsyncActor => Unit): Unit = + asyncActors .computeIfPresent( id, (_, d) => { @@ -58,10 +58,10 @@ final class DuctConcMap[D <: Duct]( ) .unit - private[this] val ducts = new ConcurrentHashMap[String, D](initialCapacity) + private[this] val asyncActors = new ConcurrentHashMap[String, D](initialCapacity) private val loadFunction = new Function[String, D] { - def apply(k: String) = mkDuct(k) + def apply(k: String) = mkAsyncActor(k) } // used to remove entries diff --git a/modules/hub/src/main/AsyncActorSequencer.scala b/modules/hub/src/main/AsyncActorSequencer.scala new file mode 100644 index 0000000000..0401509899 --- /dev/null +++ b/modules/hub/src/main/AsyncActorSequencer.scala @@ -0,0 +1,67 @@ +package lila.hub + +import com.github.blemale.scaffeine.LoadingCache +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ ExecutionContext, Promise } + +import lila.base.LilaTimeout + +final class AsyncActorSequencer(maxSize: Int, timeout: FiniteDuration, name: String, logging: Boolean = true)( + implicit + system: akka.actor.ActorSystem, + ec: ExecutionContext +) { + + import AsyncActorSequencer._ + + def apply[A](fu: => Fu[A]): Fu[A] = run(() => fu) + + def run[A](task: Task[A]): Fu[A] = asyncActor.ask[A](TaskWithPromise(task, _)) + + private[this] val asyncActor = + new BoundedAsyncActor(maxSize, name, logging)({ case TaskWithPromise(task, promise) => + promise.completeWith { + task() + .withTimeout(timeout) + .transform( + identity, + { + case LilaTimeout(msg) => + val fullMsg = s"$name AsyncActorSequencer $msg" + if (logging) lila.log("asyncActor").warn(fullMsg) + LilaTimeout(fullMsg) + case e => e + } + ) + }.future + }) +} + +// Distributes tasks to many sequencers +final class AsyncActorSequencers( + maxSize: Int, + expiration: FiniteDuration, + timeout: FiniteDuration, + name: String, + logging: Boolean = true +)(implicit + system: akka.actor.ActorSystem, + ec: ExecutionContext, + mode: play.api.Mode +) { + + def apply[A](key: String)(task: => Fu[A]): Fu[A] = + sequencers.get(key).run(() => task) + + private val sequencers: LoadingCache[String, AsyncActorSequencer] = + lila.common.LilaCache + .scaffeine(mode) + .expireAfterAccess(expiration) + .build(key => new AsyncActorSequencer(maxSize, timeout, s"$name:$key", logging)) +} + +object AsyncActorSequencer { + + private type Task[A] = () => Fu[A] + private case class TaskWithPromise[A](task: Task[A], promise: Promise[A]) +} diff --git a/modules/hub/src/main/BoundedDuct.scala b/modules/hub/src/main/BoundedAsyncActor.scala similarity index 81% rename from modules/hub/src/main/BoundedDuct.scala rename to modules/hub/src/main/BoundedAsyncActor.scala index dba7ca5ea2..5dfa31a988 100644 --- a/modules/hub/src/main/BoundedDuct.scala +++ b/modules/hub/src/main/BoundedAsyncActor.scala @@ -9,11 +9,13 @@ import scala.concurrent.Promise * Sequential like an actor, but for async functions, * and using an atomic backend instead of akka actor. */ -final class BoundedDuct(maxSize: Int, name: String, logging: Boolean = true)(process: Duct.ReceiveAsync)( - implicit ec: scala.concurrent.ExecutionContext +final class BoundedAsyncActor(maxSize: Int, name: String, logging: Boolean = true)( + process: AsyncActor.ReceiveAsync +)(implicit + ec: scala.concurrent.ExecutionContext ) { - import BoundedDuct._ + import BoundedAsyncActor._ def !(msg: Any): Boolean = stateRef.getAndUpdate { state => @@ -30,8 +32,8 @@ final class BoundedDuct(maxSize: Int, name: String, logging: Boolean = true)(pro case Some(q) => val success = q.size < maxSize if (!success) { - lila.mon.duct.overflow(name).increment() - if (logging) lila.log("duct").warn(s"[$name] queue is full ($maxSize)") + lila.mon.asyncActor.overflow(name).increment() + if (logging) lila.log("asyncActor").warn(s"[$name] queue is full ($maxSize)") } success } @@ -39,7 +41,7 @@ final class BoundedDuct(maxSize: Int, name: String, logging: Boolean = true)(pro def ask[A](makeMsg: Promise[A] => Any): Fu[A] = { val promise = Promise[A]() val success = this ! makeMsg(promise) - if (!success) promise failure new EnqueueException(s"The $name duct queue is full ($maxSize)") + if (!success) promise failure new EnqueueException(s"The $name asyncActor queue is full ($maxSize)") promise.future } @@ -59,12 +61,12 @@ final class BoundedDuct(maxSize: Int, name: String, logging: Boolean = true)(pro stateRef.getAndUpdate(postRunUpdate) flatMap (_.headOption) foreach run private[this] lazy val fallback = { msg: Any => - lila.log("duct").warn(s"[$name] unhandled msg: $msg") + lila.log("asyncActor").warn(s"[$name] unhandled msg: $msg") funit } } -object BoundedDuct { +object BoundedAsyncActor { final class EnqueueException(msg: String) extends Exception(msg) diff --git a/modules/hub/src/main/DuctSequencer.scala b/modules/hub/src/main/DuctSequencer.scala deleted file mode 100644 index 1bcbb94db9..0000000000 --- a/modules/hub/src/main/DuctSequencer.scala +++ /dev/null @@ -1,66 +0,0 @@ -package lila.hub - -import com.github.blemale.scaffeine.LoadingCache -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ ExecutionContext, Promise } - -import lila.base.LilaTimeout - -final class DuctSequencer(maxSize: Int, timeout: FiniteDuration, name: String, logging: Boolean = true)( - implicit - system: akka.actor.ActorSystem, - ec: ExecutionContext -) { - - import DuctSequencer._ - - def apply[A](fu: => Fu[A]): Fu[A] = run(() => fu) - - def run[A](task: Task[A]): Fu[A] = duct.ask[A](TaskWithPromise(task, _)) - - private[this] val duct = new BoundedDuct(maxSize, name, logging)({ case TaskWithPromise(task, promise) => - promise.completeWith { - task() - .withTimeout(timeout) - .transform( - identity, - { - case LilaTimeout(msg) => - val fullMsg = s"$name DuctSequencer $msg" - if (logging) lila.log("duct").warn(fullMsg) - LilaTimeout(fullMsg) - case e => e - } - ) - }.future - }) -} - -// Distributes tasks to many sequencers -final class DuctSequencers( - maxSize: Int, - expiration: FiniteDuration, - timeout: FiniteDuration, - name: String, - logging: Boolean = true -)(implicit - system: akka.actor.ActorSystem, - ec: ExecutionContext, - mode: play.api.Mode -) { - - def apply[A](key: String)(task: => Fu[A]): Fu[A] = - sequencers.get(key).run(() => task) - - private val sequencers: LoadingCache[String, DuctSequencer] = - lila.common.LilaCache - .scaffeine(mode) - .expireAfterAccess(expiration) - .build(key => new DuctSequencer(maxSize, timeout, s"$name:$key", logging)) -} - -object DuctSequencer { - - private type Task[A] = () => Fu[A] - private case class TaskWithPromise[A](task: Task[A], promise: Promise[A]) -} diff --git a/modules/hub/src/main/Trouper.scala b/modules/hub/src/main/SyncActor.scala similarity index 90% rename from modules/hub/src/main/Trouper.scala rename to modules/hub/src/main/SyncActor.scala index 7815efcd25..d05f7ec11b 100644 --- a/modules/hub/src/main/Trouper.scala +++ b/modules/hub/src/main/SyncActor.scala @@ -9,11 +9,10 @@ import scala.concurrent.{ ExecutionContext, Future, Promise } * Like an actor, but not an actor. * Uses an Atomic Reference backend for sequentiality. * Has an unbounded (!) Queue of messages. - * Like Duct, but for synchronous message processors. */ -abstract class Trouper(implicit ec: ExecutionContext) extends lila.common.Tellable { +abstract class SyncActor(implicit ec: ExecutionContext) extends lila.common.Tellable { - import Trouper._ + import SyncActor._ // implement async behaviour here protected val process: Receive @@ -58,7 +57,7 @@ abstract class Trouper(implicit ec: ExecutionContext) extends lila.common.Tellab } } -object Trouper { +object SyncActor { type Receive = PartialFunction[Any, Unit] @@ -72,7 +71,7 @@ object Trouper { } def stub(implicit ec: ExecutionContext) = - new Trouper { + new SyncActor { val process: Receive = { case msg => lila.log("trouper").warn(s"stub trouper received: $msg") } diff --git a/modules/hub/src/main/TrouperMap.scala b/modules/hub/src/main/SyncActorMap.scala similarity index 55% rename from modules/hub/src/main/TrouperMap.scala rename to modules/hub/src/main/SyncActorMap.scala index 404cf802a4..b5db43715d 100644 --- a/modules/hub/src/main/TrouperMap.scala +++ b/modules/hub/src/main/SyncActorMap.scala @@ -8,20 +8,20 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.Promise import scala.jdk.CollectionConverters._ -final class TrouperMap[T <: Trouper]( - mkTrouper: String => T, +final class SyncActorMap[T <: SyncActor]( + mkActor: String => T, accessTimeout: FiniteDuration )(implicit mode: Mode) { - def getOrMake(id: String): T = troupers get id + def getOrMake(id: String): T = actors get id - def getIfPresent(id: String): Option[T] = Option(troupers getIfPresent id) + def getIfPresent(id: String): Option[T] = Option(actors getIfPresent id) def tell(id: String, msg: Any): Unit = getOrMake(id) ! msg def tellIfPresent(id: String, msg: => Any): Unit = getIfPresent(id) foreach (_ ! msg) - def tellAll(msg: Any) = troupers.asMap.asScala.foreach(_._2 ! msg) + def tellAll(msg: Any) = actors.asMap.asScala.foreach(_._2 ! msg) def tellIds(ids: Seq[String], msg: Any): Unit = ids foreach { tell(_, msg) } @@ -35,32 +35,32 @@ final class TrouperMap[T <: Trouper]( def askIfPresentOrZero[A: Zero](id: String)(makeMsg: Promise[A] => Any): Fu[A] = askIfPresent(id)(makeMsg) dmap (~_) - def exists(id: String): Boolean = troupers.getIfPresent(id) != null + def exists(id: String): Boolean = actors.getIfPresent(id) != null - def size: Int = troupers.estimatedSize().toInt + def size: Int = actors.estimatedSize().toInt - def kill(id: String): Unit = troupers invalidate id + def kill(id: String): Unit = actors invalidate id - def killAll(): Unit = troupers.invalidateAll() + def killAll(): Unit = actors.invalidateAll() - def touch(id: String): Unit = troupers.getIfPresent(id).unit + def touch(id: String): Unit = actors.getIfPresent(id).unit - def touchOrMake(id: String): Unit = troupers.get(id).unit + def touchOrMake(id: String): Unit = actors.get(id).unit - private[this] val troupers: LoadingCache[String, T] = + private[this] val actors: LoadingCache[String, T] = lila.common.LilaCache .caffeine(mode) .recordStats .expireAfterAccess(accessTimeout.toMillis, TimeUnit.MILLISECONDS) .removalListener(new RemovalListener[String, T] { - def onRemoval(id: String, trouper: T, cause: RemovalCause): Unit = - trouper.stop() + def onRemoval(id: String, actor: T, cause: RemovalCause): Unit = + actor.stop() }) .build[String, T](new CacheLoader[String, T] { - def load(id: String): T = mkTrouper(id) + def load(id: String): T = mkActor(id) }) - def monitor(name: String) = lila.mon.caffeineStats(troupers, name) + def monitor(name: String) = lila.mon.caffeineStats(actors, name) - def keys: Set[String] = troupers.asMap.asScala.keySet.toSet + def keys: Set[String] = actors.asMap.asScala.keySet.toSet } diff --git a/modules/insight/src/main/InsightIndexer.scala b/modules/insight/src/main/InsightIndexer.scala index 73a3becd40..0636038318 100644 --- a/modules/insight/src/main/InsightIndexer.scala +++ b/modules/insight/src/main/InsightIndexer.scala @@ -23,7 +23,7 @@ final private class InsightIndexer( ) { private val workQueue = - new lila.hub.DuctSequencer(maxSize = 128, timeout = 2 minutes, name = "insightIndexer") + new lila.hub.AsyncActorSequencer(maxSize = 128, timeout = 2 minutes, name = "insightIndexer") def all(userId: User.ID): Funit = workQueue { diff --git a/modules/lobby/src/main/AbortListener.scala b/modules/lobby/src/main/AbortListener.scala index b5398a1fc5..375561ffab 100644 --- a/modules/lobby/src/main/AbortListener.scala +++ b/modules/lobby/src/main/AbortListener.scala @@ -5,7 +5,7 @@ import lila.game.{ Pov, Source } final private class AbortListener( userRepo: lila.user.UserRepo, seekApi: SeekApi, - lobbyTrouper: LobbyTrouper + lobbyTrouper: LobbySyncActor )(implicit ec: scala.concurrent.ExecutionContext) { def apply(pov: Pov): Funit = diff --git a/modules/lobby/src/main/BoardApiHookStream.scala b/modules/lobby/src/main/BoardApiHookStream.scala index 91ad59b8de..e2f02fba1f 100644 --- a/modules/lobby/src/main/BoardApiHookStream.scala +++ b/modules/lobby/src/main/BoardApiHookStream.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import lila.common.Bus final class BoardApiHookStream( - trouper: LobbyTrouper + trouper: LobbySyncActor )(implicit ec: scala.concurrent.ExecutionContext, system: ActorSystem) { private case object SetOnline diff --git a/modules/lobby/src/main/Env.scala b/modules/lobby/src/main/Env.scala index 9d1932831f..239e6100fb 100644 --- a/modules/lobby/src/main/Env.scala +++ b/modules/lobby/src/main/Env.scala @@ -38,11 +38,11 @@ final class Env( lazy val boardApiHookStream = wire[BoardApiHookStream] - private lazy val lobbyTrouper = LobbyTrouper.start( + private lazy val lobbySyncActor = LobbySyncActor.start( broomPeriod = 2 seconds, resyncIdsPeriod = 25 seconds ) { () => - wire[LobbyTrouper] + wire[LobbySyncActor] } private lazy val abortListener = wire[AbortListener] diff --git a/modules/lobby/src/main/LobbySocket.scala b/modules/lobby/src/main/LobbySocket.scala index 426e00d443..9a407f9bf1 100644 --- a/modules/lobby/src/main/LobbySocket.scala +++ b/modules/lobby/src/main/LobbySocket.scala @@ -7,7 +7,7 @@ import scala.concurrent.Promise import lila.game.Pov import lila.hub.actorApi.timeline._ -import lila.hub.Trouper +import lila.hub.SyncActor import lila.i18n.defaultLang import lila.pool.{ PoolApi, PoolConfig } import lila.rating.RatingRange @@ -22,7 +22,7 @@ final class LobbySocket( biter: Biter, userRepo: lila.user.UserRepo, remoteSocketApi: lila.socket.RemoteSocket, - lobby: LobbyTrouper, + lobby: LobbySyncActor, relationApi: lila.relation.RelationApi, poolApi: PoolApi, system: akka.actor.ActorSystem @@ -35,14 +35,14 @@ final class LobbySocket( private var lastCounters = LobbyCounters(0, 0) def counters = lastCounters - val trouper: Trouper = new Trouper { + val trouper: SyncActor = new SyncActor { private val members = scala.collection.mutable.AnyRefMap.empty[SriStr, Member] private val idleSris = collection.mutable.Set[SriStr]() private val hookSubscriberSris = collection.mutable.Set[SriStr]() private val removedHookIds = new collection.mutable.StringBuilder(1024) - val process: Trouper.Receive = { + val process: SyncActor.Receive = { case GetMember(sri, promise) => promise success members.get(sri.value) @@ -141,7 +141,7 @@ final class LobbySocket( } // solve circular reference - lobby ! LobbyTrouper.SetSocket(trouper) + lobby ! LobbySyncActor.SetSocket(trouper) private val poolLimitPerSri = new lila.memo.RateLimit[SriStr]( credits = 14, diff --git a/modules/lobby/src/main/LobbyTrouper.scala b/modules/lobby/src/main/LobbySyncActor.scala similarity index 95% rename from modules/lobby/src/main/LobbyTrouper.scala rename to modules/lobby/src/main/LobbySyncActor.scala index 8f774c4bba..e44bf591eb 100644 --- a/modules/lobby/src/main/LobbyTrouper.scala +++ b/modules/lobby/src/main/LobbySyncActor.scala @@ -9,11 +9,11 @@ import scala.concurrent.Promise import lila.common.config.Max import lila.common.{ AtMost, Bus, Every } import lila.game.Game -import lila.hub.Trouper +import lila.hub.SyncActor import lila.socket.Socket.{ Sri, Sris } import lila.user.User -final private class LobbyTrouper( +final private class LobbySyncActor( seekApi: SeekApi, biter: Biter, gameCache: lila.game.Cached, @@ -22,17 +22,17 @@ final private class LobbyTrouper( poolApi: lila.pool.PoolApi, onStart: lila.round.OnStart )(implicit ec: scala.concurrent.ExecutionContext) - extends Trouper { + extends SyncActor { - import LobbyTrouper._ + import LobbySyncActor._ private val hookRepo = new HookRepo private var remoteDisconnectAllAt = DateTime.now - private var socket: Trouper = Trouper.stub + private var socket: SyncActor = SyncActor.stub - val process: Trouper.Receive = { + val process: SyncActor.Receive = { // solve circular reference case SetSocket(trouper) => socket = trouper @@ -194,9 +194,9 @@ final private class LobbyTrouper( } } -private object LobbyTrouper { +private object LobbySyncActor { - case class SetSocket(trouper: Trouper) + case class SetSocket(trouper: SyncActor) private case class Tick(promise: Promise[Unit]) @@ -206,7 +206,7 @@ private object LobbyTrouper { broomPeriod: FiniteDuration, resyncIdsPeriod: FiniteDuration )( - makeTrouper: () => LobbyTrouper + makeTrouper: () => LobbySyncActor )(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) = { val trouper = makeTrouper() Bus.subscribe(trouper, "lobbyTrouper") diff --git a/modules/perfStat/src/main/PerfStatIndexer.scala b/modules/perfStat/src/main/PerfStatIndexer.scala index ab16890fae..01a5b61f44 100644 --- a/modules/perfStat/src/main/PerfStatIndexer.scala +++ b/modules/perfStat/src/main/PerfStatIndexer.scala @@ -16,7 +16,7 @@ final class PerfStatIndexer( ) { private val workQueue = - new lila.hub.DuctSequencer(maxSize = 64, timeout = 10 seconds, name = "perfStatIndexer") + new lila.hub.AsyncActorSequencer(maxSize = 64, timeout = 10 seconds, name = "perfStatIndexer") private[perfStat] def userPerf(user: User, perfType: PerfType): Fu[PerfStat] = workQueue { diff --git a/modules/pool/src/main/GameStarter.scala b/modules/pool/src/main/GameStarter.scala index ebd2e01f0c..259efc1c1f 100644 --- a/modules/pool/src/main/GameStarter.scala +++ b/modules/pool/src/main/GameStarter.scala @@ -18,7 +18,7 @@ final private class GameStarter( import PoolApi._ - private val workQueue = new lila.hub.DuctSequencer(maxSize = 32, timeout = 10 seconds, name = "gameStarter") + private val workQueue = new lila.hub.AsyncActorSequencer(maxSize = 32, timeout = 10 seconds, name = "gameStarter") def apply(pool: PoolConfig, couples: Vector[MatchMaking.Couple]): Funit = couples.nonEmpty ?? { diff --git a/modules/push/src/main/FirebasePush.scala b/modules/push/src/main/FirebasePush.scala index 0da1c3c40e..ad78c0e91a 100644 --- a/modules/push/src/main/FirebasePush.scala +++ b/modules/push/src/main/FirebasePush.scala @@ -22,7 +22,7 @@ final private class FirebasePush( ) { private val workQueue = - new lila.hub.DuctSequencer(maxSize = 512, timeout = 10 seconds, name = "firebasePush") + new lila.hub.AsyncActorSequencer(maxSize = 512, timeout = 10 seconds, name = "firebasePush") def apply(userId: User.ID, data: => PushApi.Data): Funit = credentialsOpt ?? { creds => diff --git a/modules/puzzle/src/main/PuzzleApi.scala b/modules/puzzle/src/main/PuzzleApi.scala index be3d6d149d..6b827d7ca2 100644 --- a/modules/puzzle/src/main/PuzzleApi.scala +++ b/modules/puzzle/src/main/PuzzleApi.scala @@ -60,7 +60,7 @@ final class PuzzleApi( object vote { private val sequencer = - new lila.hub.DuctSequencers( + new lila.hub.AsyncActorSequencers( maxSize = 16, expiration = 5 minutes, timeout = 3 seconds, diff --git a/modules/puzzle/src/main/PuzzleFinisher.scala b/modules/puzzle/src/main/PuzzleFinisher.scala index 5a628089c3..81c4196d03 100644 --- a/modules/puzzle/src/main/PuzzleFinisher.scala +++ b/modules/puzzle/src/main/PuzzleFinisher.scala @@ -22,7 +22,7 @@ final private[puzzle] class PuzzleFinisher( import BsonHandlers._ private val sequencer = - new lila.hub.DuctSequencers( + new lila.hub.AsyncActorSequencers( maxSize = 64, expiration = 5 minutes, timeout = 5 seconds, diff --git a/modules/racer/src/main/RacerApi.scala b/modules/racer/src/main/RacerApi.scala index 3a76d3c635..cb4a153891 100644 --- a/modules/racer/src/main/RacerApi.scala +++ b/modules/racer/src/main/RacerApi.scala @@ -47,7 +47,7 @@ final class RacerApi(colls: RacerColls, selector: StormSelector, userRepo: UserR } private val rematchQueue = - new lila.hub.DuctSequencer( + new lila.hub.AsyncActorSequencer( maxSize = 32, timeout = 20 seconds, name = "racer.rematch" diff --git a/modules/racer/src/main/RacerLobby.scala b/modules/racer/src/main/RacerLobby.scala index 3c1c8fdf09..786ce136fa 100644 --- a/modules/racer/src/main/RacerLobby.scala +++ b/modules/racer/src/main/RacerLobby.scala @@ -17,7 +17,7 @@ final class RacerLobby(api: RacerApi)(implicit ec: ExecutionContext, system: akk } private val workQueue = - new lila.hub.DuctSequencer( + new lila.hub.AsyncActorSequencer( maxSize = 128, timeout = 20 seconds, name = "racer.lobby" diff --git a/modules/report/src/main/ReportApi.scala b/modules/report/src/main/ReportApi.scala index f2972b658c..d447e476a2 100644 --- a/modules/report/src/main/ReportApi.scala +++ b/modules/report/src/main/ReportApi.scala @@ -536,7 +536,7 @@ final class ReportApi( object inquiries { private val workQueue = - new lila.hub.DuctSequencer( + new lila.hub.AsyncActorSequencer( maxSize = 32, timeout = 20 seconds, name = "report.inquiries" diff --git a/modules/room/src/main/RoomSocket.scala b/modules/room/src/main/RoomSocket.scala index 405b382551..342be99279 100644 --- a/modules/room/src/main/RoomSocket.scala +++ b/modules/room/src/main/RoomSocket.scala @@ -2,7 +2,7 @@ package lila.room import lila.chat.{ BusChan, Chat, ChatApi, ChatTimeout, UserLine } import lila.hub.actorApi.shutup.PublicSource -import lila.hub.{ Trouper, TrouperMap } +import lila.hub.{ SyncActor, SyncActorMap } import lila.log.Logger import lila.socket.RemoteSocket.{ Protocol => P, _ } import lila.socket.Socket.{ makeMessage, GetVersion, SocketVersion } @@ -22,11 +22,11 @@ object RoomSocket { final class RoomState(roomId: RoomId, send: Send)(implicit ec: ExecutionContext - ) extends Trouper { + ) extends SyncActor { private var version = SocketVersion(0) - val process: Trouper.Receive = { + val process: SyncActor.Receive = { case GetVersion(promise) => promise success version case SetVersion(v) => version = v case nv: NotifyVersion[_] => @@ -48,8 +48,8 @@ object RoomSocket { ec: ExecutionContext, mode: play.api.Mode ) = - new TrouperMap( - mkTrouper = roomId => + new SyncActorMap( + mkActor = roomId => new RoomState( RoomId(roomId), send @@ -58,7 +58,7 @@ object RoomSocket { ) def roomHandler( - rooms: TrouperMap[RoomState], + rooms: SyncActorMap[RoomState], chat: ChatApi, logger: Logger, publicSource: RoomId => PublicSource.type => Option[PublicSource], @@ -93,7 +93,7 @@ object RoomSocket { } }: Handler) orElse minRoomHandler(rooms, logger) - def minRoomHandler(rooms: TrouperMap[RoomState], logger: Logger): Handler = { + def minRoomHandler(rooms: SyncActorMap[RoomState], logger: Logger): Handler = { case Protocol.In.KeepAlives(roomIds) => roomIds foreach { roomId => rooms touchOrMake roomId.value @@ -109,7 +109,7 @@ object RoomSocket { private val chatMsgs = Set("message", "chat_timeout", "chat_reinstate") - def subscribeChat(rooms: TrouperMap[RoomState], busChan: BusChan.Select) = { + def subscribeChat(rooms: SyncActorMap[RoomState], busChan: BusChan.Select) = { import lila.chat.actorApi._ lila.common.Bus.subscribeFun(busChan(BusChan).chan, BusChan.Global.chan) { case ChatLine(id, line: UserLine) => diff --git a/modules/round/src/main/Env.scala b/modules/round/src/main/Env.scala index 72beb99c0f..cdf55c9633 100644 --- a/modules/round/src/main/Env.scala +++ b/modules/round/src/main/Env.scala @@ -84,7 +84,7 @@ final class Env( private lazy val proxyDependencies = new GameProxy.Dependencies(gameRepo, scheduler) - private lazy val roundDependencies = wire[RoundDuct.Dependencies] + private lazy val roundDependencies = wire[RoundAsyncActor.Dependencies] lazy val roundSocket: RoundSocket = wire[RoundSocket] @@ -159,7 +159,7 @@ final class Env( lazy val getSocketStatus = (game: Game) => roundSocket.rounds.ask[SocketStatus](game.id)(GetSocketStatus) private def isUserPresent(game: Game, userId: lila.user.User.ID): Fu[Boolean] = - roundSocket.rounds.askIfPresentOrZero[Boolean](game.id)(RoundDuct.HasUserId(userId, _)) + roundSocket.rounds.askIfPresentOrZero[Boolean](game.id)(RoundAsyncActor.HasUserId(userId, _)) lazy val jsonView = wire[JsonView] diff --git a/modules/round/src/main/GameProxy.scala b/modules/round/src/main/GameProxy.scala index de52de933c..9032b07352 100644 --- a/modules/round/src/main/GameProxy.scala +++ b/modules/round/src/main/GameProxy.scala @@ -107,7 +107,7 @@ private object GameProxy { val scheduler: Scheduler ) - // must be way under the round duct termination delay (60s) + // must be way under the round asyncActor termination delay (60s) private val scheduleDelay = 30.seconds private val emptyCancellable = new Cancellable { diff --git a/modules/round/src/main/Player.scala b/modules/round/src/main/Player.scala index eb9f328667..cf20b38a76 100644 --- a/modules/round/src/main/Player.scala +++ b/modules/round/src/main/Player.scala @@ -21,7 +21,7 @@ final private class Player( private case object Flagged extends MoveResult private case class MoveApplied(progress: Progress, move: MoveOrDrop) extends MoveResult - private[round] def human(play: HumanPlay, round: RoundDuct)( + private[round] def human(play: HumanPlay, round: RoundAsyncActor)( pov: Pov )(implicit proxy: GameProxy): Fu[Events] = play match { @@ -47,7 +47,7 @@ final private class Player( } } - private[round] def bot(uci: Uci, round: RoundDuct)(pov: Pov)(implicit proxy: GameProxy): Fu[Events] = + private[round] def bot(uci: Uci, round: RoundAsyncActor)(pov: Pov)(implicit proxy: GameProxy): Fu[Events] = pov match { case Pov(game, _) if game.turns > Game.maxPlies => round ! TooManyPlies @@ -67,7 +67,7 @@ final private class Player( } private def postHumanOrBotPlay( - round: RoundDuct, + round: RoundAsyncActor, pov: Pov, progress: Progress, moveOrDrop: MoveOrDrop @@ -110,7 +110,7 @@ final private class Player( ) ) - private[round] def requestFishnet(game: Game, round: RoundDuct): Funit = + private[round] def requestFishnet(game: Game, round: RoundAsyncActor): Funit = game.playableByAi ?? { if (game.turns <= fishnetPlayer.maxPlies) fishnetPlayer(game) else fuccess(round ! actorApi.round.ResignAi) diff --git a/modules/round/src/main/RoundDuct.scala b/modules/round/src/main/RoundAsyncActor.scala similarity index 98% rename from modules/round/src/main/RoundDuct.scala rename to modules/round/src/main/RoundAsyncActor.scala index 351ccd6430..bf9f29eb5b 100644 --- a/modules/round/src/main/RoundDuct.scala +++ b/modules/round/src/main/RoundAsyncActor.scala @@ -21,23 +21,23 @@ import lila.hub.actorApi.round.{ RematchYes, Resign } -import lila.hub.Duct +import lila.hub.AsyncActor import lila.room.RoomSocket.{ Protocol => RP, _ } import lila.socket.Socket.{ makeMessage, GetVersion, SocketVersion } import lila.socket.UserLagCache import lila.user.User -final private[round] class RoundDuct( - dependencies: RoundDuct.Dependencies, +final private[round] class RoundAsyncActor( + dependencies: RoundAsyncActor.Dependencies, gameId: Game.ID, socketSend: String => Unit )(implicit ec: scala.concurrent.ExecutionContext, proxy: GameProxy -) extends Duct { +) extends AsyncActor { import RoundSocket.Protocol - import RoundDuct._ + import RoundAsyncActor._ import dependencies._ private var takebackSituation: Option[TakebackSituation] = None @@ -109,7 +109,7 @@ final private[round] class RoundDuct( def getGame: Fu[Option[Game]] = proxy.game def updateGame(f: Game => Game): Funit = proxy update f - val process: Duct.ReceiveAsync = { + val process: AsyncActor.ReceiveAsync = { case SetGameInfo(game, (whiteGoneWeight, blackGoneWeight)) => fuccess { @@ -541,7 +541,7 @@ final private[round] class RoundDuct( def roomId = RoomId(gameId) } -object RoundDuct { +object RoundAsyncActor { case class HasUserId(userId: User.ID, promise: Promise[Boolean]) case class SetGameInfo(game: lila.game.Game, goneWeights: (Float, Float)) diff --git a/modules/round/src/main/RoundSocket.scala b/modules/round/src/main/RoundSocket.scala index 8d7648f2c4..518927b6ab 100644 --- a/modules/round/src/main/RoundSocket.scala +++ b/modules/round/src/main/RoundSocket.scala @@ -17,7 +17,7 @@ import lila.hub.actorApi.map.{ Exists, Tell, TellAll, TellIfExists, TellMany } import lila.hub.actorApi.round.{ Abort, Berserk, RematchNo, RematchYes, Resign, TourStanding } import lila.hub.actorApi.socket.remote.TellSriIn import lila.hub.actorApi.tv.TvSelect -import lila.hub.DuctConcMap +import lila.hub.AsyncActorConcMap import lila.room.RoomSocket.{ Protocol => RP, _ } import lila.socket.RemoteSocket.{ Protocol => P, _ } import lila.socket.Socket.{ makeMessage, SocketVersion } @@ -25,7 +25,7 @@ import lila.user.User final class RoundSocket( remoteSocketApi: lila.socket.RemoteSocket, - roundDependencies: RoundDuct.Dependencies, + roundDependencies: RoundAsyncActor.Dependencies, proxyDependencies: GameProxy.Dependencies, scheduleExpiration: ScheduleExpiration, tournamentActor: lila.hub.actors.TournamentApi, @@ -43,8 +43,8 @@ final class RoundSocket( Lilakka.shutdown(shutdown, _.PhaseServiceUnbind, "Stop round socket") { () => stopping = true - rounds.tellAllWithAck(RoundDuct.LilaStop.apply) map { nb => - Lilakka.logger.info(s"$nb round ducts have stopped") + rounds.tellAllWithAck(RoundAsyncActor.LilaStop.apply) map { nb => + Lilakka.logger.info(s"$nb round asyncActors have stopped") } } @@ -69,24 +69,24 @@ final class RoundSocket( _ updateGame f } - val rounds = new DuctConcMap[RoundDuct]( - mkDuct = id => { + val rounds = new AsyncActorConcMap[RoundAsyncActor]( + mkAsyncActor = id => { val proxy = new GameProxy(id, proxyDependencies) - val duct = new RoundDuct( + val asyncActor = new RoundAsyncActor( dependencies = roundDependencies, gameId = id, socketSend = sendForGameId(id) )(ec, proxy) terminationDelay schedule Game.Id(id) - duct.getGame dforeach { + asyncActor.getGame dforeach { _ foreach { game => scheduleExpiration(game) goneWeightsFor(game) dforeach { w => - duct ! RoundDuct.SetGameInfo(game, w) + asyncActor ! RoundAsyncActor.SetGameInfo(game, w) } } } - duct + asyncActor }, initialCapacity = 65536 ) @@ -150,16 +150,16 @@ final class RoundSocket( case P.In.Ping(id) => send(P.Out.pong(id)) case P.In.WsBoot => logger.warn("Remote socket boot") - // schedule termination for all game ducts + // schedule termination for all game asyncActors // until players actually reconnect rounds foreachKey { id => terminationDelay schedule Game.Id(id) } - rounds.tellAll(RoundDuct.WsBoot) + rounds.tellAll(RoundAsyncActor.WsBoot) } private def finishRound(gameId: Game.Id): Unit = - rounds.terminate(gameId.value, _ ! RoundDuct.Stop) + rounds.terminate(gameId.value, _ ! RoundAsyncActor.Stop) private lazy val send: Sender = remoteSocketApi.makeSender("r-out", parallelism = 8) @@ -204,10 +204,10 @@ final class RoundSocket( } system.scheduler.scheduleWithFixedDelay(25 seconds, tickInterval) { () => - rounds.tellAll(RoundDuct.Tick) + rounds.tellAll(RoundAsyncActor.Tick) } system.scheduler.scheduleWithFixedDelay(60 seconds, 60 seconds) { () => - lila.mon.round.ductCount.update(rounds.size).unit + lila.mon.round.asyncActorCount.update(rounds.size).unit } private val terminationDelay = new TerminationDelay(system.scheduler, 1 minute, finishRound) diff --git a/modules/round/src/main/Takebacker.scala b/modules/round/src/main/Takebacker.scala index 38dd3c93b9..a45c82834d 100644 --- a/modules/round/src/main/Takebacker.scala +++ b/modules/round/src/main/Takebacker.scala @@ -5,7 +5,7 @@ import lila.common.Bus import lila.game.{ Event, Game, GameRepo, Pov, Progress, Rewind, UciMemo } import lila.pref.{ Pref, PrefApi } import lila.i18n.{ I18nKeys => trans, defaultLang } -import RoundDuct.TakebackSituation +import RoundAsyncActor.TakebackSituation final private class Takebacker( messenger: Messenger, diff --git a/modules/simul/src/main/SimulApi.scala b/modules/simul/src/main/SimulApi.scala index 2fa030805d..e4e8d7f0c6 100644 --- a/modules/simul/src/main/SimulApi.scala +++ b/modules/simul/src/main/SimulApi.scala @@ -30,7 +30,7 @@ final class SimulApi( ) { private val workQueue = - new lila.hub.DuctSequencers( + new lila.hub.AsyncActorSequencers( maxSize = 128, expiration = 10 minutes, timeout = 10 seconds, diff --git a/modules/study/src/main/StudySequencer.scala b/modules/study/src/main/StudySequencer.scala index 4ce507f7e2..66c14e96a0 100644 --- a/modules/study/src/main/StudySequencer.scala +++ b/modules/study/src/main/StudySequencer.scala @@ -3,7 +3,7 @@ package lila.study import ornicar.scalalib.Zero import scala.concurrent.duration._ -import lila.hub.DuctSequencers +import lila.hub.AsyncActorSequencers final private class StudySequencer( studyRepo: StudyRepo, @@ -15,7 +15,7 @@ final private class StudySequencer( ) { private val workQueue = - new DuctSequencers(maxSize = 64, expiration = 1 minute, timeout = 10 seconds, name = "study") + new AsyncActorSequencers(maxSize = 64, expiration = 1 minute, timeout = 10 seconds, name = "study") def sequenceStudy[A: Zero](studyId: Study.Id)(f: Study => Fu[A]): Fu[A] = workQueue(studyId.value) { diff --git a/modules/study/src/main/StudyTopic.scala b/modules/study/src/main/StudyTopic.scala index 363b280c98..67bce88dab 100644 --- a/modules/study/src/main/StudyTopic.scala +++ b/modules/study/src/main/StudyTopic.scala @@ -140,7 +140,7 @@ final class StudyTopicApi(topicRepo: StudyTopicRepo, userTopicRepo: StudyUserTop private def docTopic(doc: Bdoc): Option[StudyTopic] = doc.getAsOpt[StudyTopic]("_id") - private val recomputeWorkQueue = new lila.hub.DuctSequencer( + private val recomputeWorkQueue = new lila.hub.AsyncActorSequencer( maxSize = 1, timeout = 61 seconds, name = "studyTopicAggregation", @@ -149,7 +149,7 @@ final class StudyTopicApi(topicRepo: StudyTopicRepo, userTopicRepo: StudyUserTop def recompute(): Unit = recomputeWorkQueue(Future.makeItLast(60 seconds)(recomputeNow)).recover { - case _: lila.hub.BoundedDuct.EnqueueException => () + case _: lila.hub.BoundedAsyncActor.EnqueueException => () case e: Exception => logger.warn("Can't recompute study topics!", e) }.unit diff --git a/modules/swiss/src/main/SwissApi.scala b/modules/swiss/src/main/SwissApi.scala index 4cfcdfb694..c01d8c3c53 100644 --- a/modules/swiss/src/main/SwissApi.scala +++ b/modules/swiss/src/main/SwissApi.scala @@ -39,7 +39,7 @@ final class SwissApi( ) { private val sequencer = - new lila.hub.DuctSequencers( + new lila.hub.AsyncActorSequencers( maxSize = 1024, // queue many game finished events expiration = 20 minutes, timeout = 10 seconds, diff --git a/modules/tournament/src/main/TournamentApi.scala b/modules/tournament/src/main/TournamentApi.scala index c7af7b116b..d02b2ce577 100644 --- a/modules/tournament/src/main/TournamentApi.scala +++ b/modules/tournament/src/main/TournamentApi.scala @@ -48,7 +48,7 @@ final class TournamentApi( ) { private val workQueue = - new lila.hub.DuctSequencers( + new lila.hub.AsyncActorSequencers( maxSize = 256, expiration = 1 minute, timeout = 10 seconds, diff --git a/modules/tv/src/main/ChannelTrouper.scala b/modules/tv/src/main/ChannelSyncActor.scala similarity index 91% rename from modules/tv/src/main/ChannelTrouper.scala rename to modules/tv/src/main/ChannelSyncActor.scala index 48a2cca9be..5264da5dc0 100644 --- a/modules/tv/src/main/ChannelTrouper.scala +++ b/modules/tv/src/main/ChannelSyncActor.scala @@ -6,18 +6,18 @@ import scala.concurrent.Promise import lila.common.LightUser import lila.game.Game -import lila.hub.Trouper +import lila.hub.SyncActor -final private[tv] class ChannelTrouper( +final private[tv] class ChannelSyncActor( channel: Tv.Channel, - onSelect: TvTrouper.Selected => Unit, + onSelect: TvSyncActor.Selected => Unit, proxyGame: Game.ID => Fu[Option[Game]], rematchOf: Game.ID => Option[Game.ID], lightUserSync: LightUser.GetterSync )(implicit ec: scala.concurrent.ExecutionContext) - extends Trouper { + extends SyncActor { - import ChannelTrouper._ + import ChannelSyncActor._ // games featured on this channel // first entry is the current game @@ -30,7 +30,7 @@ final private[tv] class ChannelTrouper( private val candidateIds = new lila.memo.ExpireSetMemo(3 minutes) - protected val process: Trouper.Receive = { + protected val process: SyncActor.Receive = { case GetGameId(promise) => promise success oneId @@ -39,10 +39,10 @@ final private[tv] class ChannelTrouper( case GetGameIds(max, promise) => promise success manyIds.take(max) case SetGame(game) => - onSelect(TvTrouper.Selected(channel, game)) + onSelect(TvSyncActor.Selected(channel, game)) history = game.id :: history.take(2) - case TvTrouper.Select => + case TvSyncActor.Select => candidateIds.keys .map(proxyGame) .sequenceFu @@ -110,7 +110,7 @@ final private[tv] class ChannelTrouper( .flatMap(Tv.titleScores.get) } -object ChannelTrouper { +object ChannelSyncActor { case class GetGameId(promise: Promise[Option[Game.ID]]) case class GetGameIds(max: Int, promise: Promise[List[Game.ID]]) diff --git a/modules/tv/src/main/Env.scala b/modules/tv/src/main/Env.scala index 8b86518518..3d902df1ce 100644 --- a/modules/tv/src/main/Env.scala +++ b/modules/tv/src/main/Env.scala @@ -15,11 +15,11 @@ final class Env( rematches: lila.game.Rematches )(implicit ec: scala.concurrent.ExecutionContext) { - private val tvTrouper = wire[TvTrouper] + private val tvSyncActor = wire[TvSyncActor] lazy val tv = wire[Tv] system.scheduler.scheduleWithFixedDelay(12 seconds, 3 seconds) { () => - tvTrouper ! TvTrouper.Select + tvSyncActor ! TvSyncActor.Select } } diff --git a/modules/tv/src/main/Tv.scala b/modules/tv/src/main/Tv.scala index 55c386e306..cd62ca137f 100644 --- a/modules/tv/src/main/Tv.scala +++ b/modules/tv/src/main/Tv.scala @@ -2,24 +2,24 @@ package lila.tv import lila.common.LightUser import lila.game.{ Game, GameRepo, Pov } -import lila.hub.Trouper +import lila.hub.SyncActor final class Tv( gameRepo: GameRepo, - trouper: Trouper, + trouper: SyncActor, gameProxyRepo: lila.round.GameProxyRepo )(implicit ec: scala.concurrent.ExecutionContext) { import Tv._ - import ChannelTrouper._ + import ChannelSyncActor._ private def roundProxyGame = gameProxyRepo.game _ def getGame(channel: Tv.Channel): Fu[Option[Game]] = - trouper.ask[Option[Game.ID]](TvTrouper.GetGameId(channel, _)) flatMap { _ ?? roundProxyGame } + trouper.ask[Option[Game.ID]](TvSyncActor.GetGameId(channel, _)) flatMap { _ ?? roundProxyGame } def getGameAndHistory(channel: Tv.Channel): Fu[Option[(Game, List[Pov])]] = - trouper.ask[GameIdAndHistory](TvTrouper.GetGameIdAndHistory(channel, _)) flatMap { + trouper.ask[GameIdAndHistory](TvSyncActor.GetGameIdAndHistory(channel, _)) flatMap { case GameIdAndHistory(gameId, historyIds) => for { game <- gameId ?? roundProxyGame @@ -40,14 +40,14 @@ final class Tv( } def getGameIds(channel: Tv.Channel, max: Int): Fu[List[Game.ID]] = - trouper.ask[List[Game.ID]](TvTrouper.GetGameIds(channel, max, _)) + trouper.ask[List[Game.ID]](TvSyncActor.GetGameIds(channel, max, _)) def getBestGame = getGame(Tv.Channel.Best) orElse gameRepo.random def getBestAndHistory = getGameAndHistory(Tv.Channel.Best) def getChampions: Fu[Champions] = - trouper.ask[Champions](TvTrouper.GetChampions.apply) + trouper.ask[Champions](TvSyncActor.GetChampions.apply) } object Tv { diff --git a/modules/tv/src/main/TvTrouper.scala b/modules/tv/src/main/TvSyncActor.scala similarity index 81% rename from modules/tv/src/main/TvTrouper.scala rename to modules/tv/src/main/TvSyncActor.scala index aadde5280c..64fe1e63ae 100644 --- a/modules/tv/src/main/TvTrouper.scala +++ b/modules/tv/src/main/TvSyncActor.scala @@ -7,23 +7,23 @@ import scala.concurrent.Promise import lila.common.{ Bus, LightUser } import lila.game.{ Game, Pov } -import lila.hub.Trouper +import lila.hub.SyncActor -final private[tv] class TvTrouper( +final private[tv] class TvSyncActor( renderer: lila.hub.actors.Renderer, lightUserSync: LightUser.GetterSync, recentTvGames: lila.round.RecentTvGames, gameProxyRepo: lila.round.GameProxyRepo, rematches: lila.game.Rematches )(implicit ec: scala.concurrent.ExecutionContext) - extends Trouper { + extends SyncActor { - import TvTrouper._ + import TvSyncActor._ Bus.subscribe(this, "startGame") - private val channelTroupers: Map[Tv.Channel, ChannelTrouper] = Tv.Channel.all.map { c => - c -> new ChannelTrouper(c, onSelect = this.!, gameProxyRepo.game, rematches.of, lightUserSync) + private val channelTroupers: Map[Tv.Channel, ChannelSyncActor] = Tv.Channel.all.map { c => + c -> new ChannelSyncActor(c, onSelect = this.!, gameProxyRepo.game, rematches.of, lightUserSync) }.toMap private var channelChampions = Map[Tv.Channel, Tv.Champion]() @@ -31,16 +31,16 @@ final private[tv] class TvTrouper( private def forward[A](channel: Tv.Channel, msg: Any) = channelTroupers get channel foreach { _ ! msg } - protected val process: Trouper.Receive = { + protected val process: SyncActor.Receive = { case GetGameId(channel, promise) => - forward(channel, ChannelTrouper.GetGameId(promise)) + forward(channel, ChannelSyncActor.GetGameId(promise)) case GetGameIdAndHistory(channel, promise) => - forward(channel, ChannelTrouper.GetGameIdAndHistory(promise)) + forward(channel, ChannelSyncActor.GetGameIdAndHistory(promise)) case GetGameIds(channel, max, promise) => - forward(channel, ChannelTrouper.GetGameIds(max, promise)) + forward(channel, ChannelSyncActor.GetGameIds(max, promise)) case GetChampions(promise) => promise success Tv.Champions(channelChampions) @@ -52,7 +52,7 @@ final private[tv] class TvTrouper( } foreach (_ addCandidate g) } - case s @ TvTrouper.Select => channelTroupers.foreach(_._2 ! s) + case s @ TvSyncActor.Select => channelTroupers.foreach(_._2 ! s) case Selected(channel, game) => import lila.socket.Socket.makeMessage @@ -99,12 +99,12 @@ final private[tv] class TvTrouper( } } -private[tv] object TvTrouper { +private[tv] object TvSyncActor { case class GetGameId(channel: Tv.Channel, promise: Promise[Option[Game.ID]]) case class GetGameIds(channel: Tv.Channel, max: Int, promise: Promise[List[Game.ID]]) - case class GetGameIdAndHistory(channel: Tv.Channel, promise: Promise[ChannelTrouper.GameIdAndHistory]) + case class GetGameIdAndHistory(channel: Tv.Channel, promise: Promise[ChannelSyncActor.GameIdAndHistory]) case object Select case class Selected(channel: Tv.Channel, game: Game)