name work queues
parent
b9f3d316f0
commit
0cc6ec7866
|
@ -15,7 +15,7 @@ import scala.util.chaining._
|
|||
* If the buffer is full, the new task is dropped,
|
||||
* and `run` returns a failed future.
|
||||
*/
|
||||
final class WorkQueue(buffer: Int)(implicit ec: ExecutionContext, mat: Materializer) {
|
||||
final class WorkQueue(buffer: Int, name: String)(implicit ec: ExecutionContext, mat: Materializer) {
|
||||
|
||||
type Task[A] = () => Fu[A]
|
||||
private type TaskWithPromise[A] = (Task[A], Promise[A])
|
||||
|
@ -26,7 +26,7 @@ final class WorkQueue(buffer: Int)(implicit ec: ExecutionContext, mat: Materiali
|
|||
val promise = Promise[A]
|
||||
queue.offer(task -> promise) flatMap {
|
||||
case QueueOfferResult.Enqueued => promise.future
|
||||
case result => Future failed new Exception(s"Can't enqueue: $result")
|
||||
case result => Future failed new Exception(s"Can't enqueue in $name: $result")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -36,14 +36,14 @@ final class WorkQueue(buffer: Int)(implicit ec: ExecutionContext, mat: Materiali
|
|||
case (task, promise) => task() tap promise.completeWith
|
||||
}
|
||||
.recover {
|
||||
case _: Exception => () // keep processing tasks
|
||||
case e: Exception => () // keep processing tasks
|
||||
}
|
||||
.toMat(Sink.ignore)(Keep.left)
|
||||
.run
|
||||
}
|
||||
|
||||
// Distributes tasks to many sequencers
|
||||
final class WorkQueues(buffer: Int, expiration: FiniteDuration)(
|
||||
final class WorkQueues(buffer: Int, expiration: FiniteDuration, name: String)(
|
||||
implicit ec: ExecutionContext,
|
||||
mat: Materializer
|
||||
) {
|
||||
|
@ -53,5 +53,5 @@ final class WorkQueues(buffer: Int, expiration: FiniteDuration)(
|
|||
|
||||
private val queues: LoadingCache[String, WorkQueue] = Scaffeine()
|
||||
.expireAfterAccess(expiration)
|
||||
.build(_ => new WorkQueue(buffer))
|
||||
.build(key => new WorkQueue(buffer, s"$name:$key"))
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ final class Analyser(
|
|||
|
||||
val maxPlies = 200
|
||||
|
||||
private val workQueue = new WorkQueue(256)
|
||||
private val workQueue = new WorkQueue(256, "fishnetAnalyser")
|
||||
|
||||
def apply(game: Game, sender: Work.Sender): Fu[Boolean] =
|
||||
(game.metadata.analysed ?? analysisRepo.exists(game.id)) flatMap {
|
||||
|
|
|
@ -24,7 +24,7 @@ final class FishnetApi(
|
|||
import JsonApi.Request.{ CompleteAnalysis, PartialAnalysis }
|
||||
import BSONHandlers._
|
||||
|
||||
private val workQueue = new WorkQueue(64)
|
||||
private val workQueue = new WorkQueue(64, "fishnetApi")
|
||||
|
||||
def keyExists(key: Client.Key) = repo.getEnabledClient(key).map(_.isDefined)
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ final private class Indexer(
|
|||
storage: Storage
|
||||
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
|
||||
|
||||
private val workQueue = new WorkQueue(64)
|
||||
private val workQueue = new WorkQueue(64, "insightIndexer")
|
||||
|
||||
def all(user: User): Funit = workQueue {
|
||||
storage.fetchLast(user.id) flatMap {
|
||||
|
|
|
@ -10,7 +10,7 @@ final class PerfStatIndexer(
|
|||
storage: PerfStatStorage
|
||||
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
|
||||
|
||||
private val workQueue = new WorkQueue(64)
|
||||
private val workQueue = new WorkQueue(64, "perfStatIndexer")
|
||||
|
||||
def userPerf(user: User, perfType: PerfType): Funit = workQueue {
|
||||
gameRepo
|
||||
|
|
|
@ -14,7 +14,7 @@ final private class GameStarter(
|
|||
|
||||
import PoolApi._
|
||||
|
||||
private val workQueue = new WorkQueue(16)
|
||||
private val workQueue = new WorkQueue(16, "gameStarter")
|
||||
|
||||
def apply(pool: PoolConfig, couples: Vector[MatchMaking.Couple]): Funit = couples.nonEmpty ?? {
|
||||
workQueue {
|
||||
|
|
|
@ -17,7 +17,7 @@ final private class FirebasePush(
|
|||
config: OneSignalPush.Config
|
||||
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) {
|
||||
|
||||
private val workQueue = new WorkQueue(512)
|
||||
private val workQueue = new WorkQueue(512, "firebasePush")
|
||||
|
||||
def apply(userId: User.ID)(data: => PushApi.Data): Funit =
|
||||
credentialsOpt ?? { creds =>
|
||||
|
|
|
@ -26,7 +26,7 @@ final class SimulApi(
|
|||
asyncCache: lila.memo.AsyncCache.Builder
|
||||
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
|
||||
|
||||
private val workQueue = new WorkQueues(128, 10 minutes)
|
||||
private val workQueue = new WorkQueues(128, 10 minutes, "simulApi")
|
||||
|
||||
def currentHostIds: Fu[Set[String]] = currentHostIdsCache.get
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@ final private class StudySequencer(
|
|||
chapterRepo: ChapterRepo
|
||||
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
|
||||
|
||||
private val workQueue = new WorkQueues(256, 10 minutes)
|
||||
private val workQueue = new WorkQueues(256, 10 minutes, "study")
|
||||
|
||||
def sequenceStudy(studyId: Study.Id)(f: Study => Funit): Funit =
|
||||
workQueue(studyId.value) {
|
||||
|
|
|
@ -43,7 +43,7 @@ final class TournamentApi(
|
|||
proxyRepo: lila.round.GameProxyRepo
|
||||
)(implicit ec: scala.concurrent.ExecutionContext, system: ActorSystem, mat: akka.stream.Materializer) {
|
||||
|
||||
private val workQueue = new WorkQueues(256, 1 minute)
|
||||
private val workQueue = new WorkQueues(256, 1 minute, "tournament")
|
||||
|
||||
def createTournament(
|
||||
setup: TournamentSetup,
|
||||
|
|
Loading…
Reference in New Issue