use DuctSequencer instead of WorkQueue(parallelism = 1), everywhere

pull/6610/head
Thibault Duplessis 2020-05-10 11:03:20 -06:00
parent f2d810243d
commit 323eb1231c
18 changed files with 64 additions and 73 deletions

View File

@ -28,9 +28,9 @@ object Environment
// #TODO holy shit fix me
// requires injecting all the templates!!
private var envVar: Option[Env] = None
def setEnv(e: Env) = { envVar = Some(e) }
def destroy() = { envVar = None }
def env: Env = envVar.get
def setEnv(e: Env) = { envVar = Some(e) }
def destroy() = { envVar = None }
def env: Env = envVar.get
type FormWithCaptcha = (play.api.data.Form[_], lila.common.Captcha)

View File

@ -3,7 +3,7 @@ package lila.challenge
import org.joda.time.DateTime
import scala.concurrent.duration._
import lila.common.{ Bus, WorkQueue }
import lila.common.Bus
import lila.common.config.Max
import lila.game.{ Game, Pov }
import lila.hub.actorApi.socket.SendTo
@ -19,7 +19,7 @@ final class ChallengeApi(
gameCache: lila.game.Cached,
maxPlaying: Max,
cacheApi: lila.memo.CacheApi
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) {
import Challenge._
@ -66,7 +66,7 @@ final class ChallengeApi(
def decline(c: Challenge) = (repo decline c) >>- uncacheAndNotify(c)
private val acceptQueue = new WorkQueue(buffer = 64, timeout = 5 seconds, "challengeAccept")
private val acceptQueue = new lila.hub.DuctSequencer(maxSize = 64, timeout = 5 seconds, "challengeAccept")
def accept(
c: Challenge,
@ -150,6 +150,6 @@ final class ChallengeApi(
)
// work around circular dependency
private var socket: Option[ChallengeSocket] = None
private var socket: Option[ChallengeSocket] = None
private[challenge] def registerSocket(s: ChallengeSocket) = { socket = s.some }
}

View File

@ -15,8 +15,11 @@ import java.util.concurrent.TimeoutException
*
* If the buffer is full, the new task is dropped,
* and `run` returns a failed future.
*
* This is known to work poorly with parallelism=1
* because the queue is used by multiple threads
*/
final class WorkQueue(buffer: Int, timeout: FiniteDuration, name: String, parallelism: Int = 1)(implicit
final class WorkQueue(buffer: Int, timeout: FiniteDuration, name: String, parallelism: Int)(implicit
ec: ExecutionContext,
mat: Materializer
) {
@ -56,24 +59,6 @@ final class WorkQueue(buffer: Int, timeout: FiniteDuration, name: String, parall
.run
}
// Distributes tasks to many sequencers
final class WorkQueues(buffer: Int, expiration: FiniteDuration, timeout: FiniteDuration, name: String)(
implicit
ec: ExecutionContext,
mat: Materializer,
mode: play.api.Mode
) {
def apply[A](key: String)(task: => Fu[A]): Fu[A] =
queues.get(key).run(() => task)
private val queues: LoadingCache[String, WorkQueue] =
LilaCache
.scaffeine(mode)
.expireAfterAccess(expiration)
.build(key => new WorkQueue(buffer, timeout, s"$name:$key"))
}
object WorkQueue {
final class EnqueueException(msg: String) extends Exception(msg)

View File

@ -11,8 +11,8 @@ case class ApiVersion(value: Int) extends AnyVal with IntValue with Ordered[ApiV
case class AssetVersion(value: String) extends AnyVal with StringValue
object AssetVersion {
var current = random
def change() = { current = random }
var current = random
def change() = { current = random }
private def random = AssetVersion(ornicar.scalalib.Random secureString 6)
}

View File

@ -117,7 +117,7 @@ final private class ExplorerIndexer(
case Bullet if rating >= 2000 => 1 / 4f
case Bullet if rating >= 1800 => 1 / 7f
case Bullet => 1 / 20f
case _ if rating >= 1600 => 1 // variant games
case _ if rating >= 1600 => 1 // variant games
case _ => 1 / 2f // noob variant games
}
}

View File

@ -6,7 +6,6 @@ import scala.concurrent.duration._
import lila.analyse.AnalysisRepo
import lila.game.{ Game, UciMemo }
import lila.common.WorkQueue
final class Analyser(
repo: FishnetRepo,
@ -15,11 +14,11 @@ final class Analyser(
uciMemo: UciMemo,
evalCache: FishnetEvalCache,
limiter: Limiter
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) {
val maxPlies = 200
private val workQueue = new WorkQueue(buffer = 256, timeout = 5 seconds, "fishnetAnalyser")
private val workQueue = new lila.hub.DuctSequencer(maxSize = 256, timeout = 5 seconds, "fishnetAnalyser")
def apply(game: Game, sender: Work.Sender): Fu[Boolean] =
(game.metadata.analysed ?? analysisRepo.exists(game.id)) flatMap {

View File

@ -7,7 +7,6 @@ import scala.util.{ Failure, Success, Try }
import Client.Skill
import lila.common.IpAddress
import lila.common.WorkQueue
import lila.db.dsl._
final class FishnetApi(
@ -19,13 +18,13 @@ final class FishnetApi(
socketExists: String => Fu[Boolean],
clientVersion: Client.ClientVersion,
config: FishnetApi.Config
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) {
import FishnetApi._
import JsonApi.Request.{ CompleteAnalysis, PartialAnalysis }
import BSONHandlers._
private val workQueue = new WorkQueue(buffer = 128, timeout = 5 seconds, name = "fishnetApi")
private val workQueue = new lila.hub.DuctSequencer(maxSize = 128, timeout = 5 seconds, name = "fishnetApi")
def keyExists(key: Client.Key) = repo.getEnabledClient(key).map(_.isDefined)

View File

@ -5,14 +5,12 @@ import java.util.function.UnaryOperator
import scala.collection.immutable.Queue
import scala.concurrent.Promise
import lila.base.LilaException
/*
* Sequential like an actor, but for async functions,
* and using an atomic backend instead of akka actor.
*/
final class BoundedDuct(maxSize: Int, name: String)(process: Duct.ReceiveAsync)(implicit
ec: scala.concurrent.ExecutionContext
final class BoundedDuct(maxSize: Int, name: String, logging: Boolean = true)(process: Duct.ReceiveAsync)(
implicit ec: scala.concurrent.ExecutionContext
) {
import BoundedDuct._
@ -31,14 +29,14 @@ final class BoundedDuct(maxSize: Int, name: String)(process: Duct.ReceiveAsync)(
true
case Some(q) =>
val success = q.size < maxSize
if (!success) lila.log("duct").warn(s"[$name] queue is full ($maxSize)")
if (!success && logging) lila.log("duct").warn(s"[$name] queue is full ($maxSize)")
success
}
def ask[A](makeMsg: Promise[A] => Any): Fu[A] = {
val promise = Promise[A]
val success = this ! makeMsg(promise)
if (!success) promise failure LilaException(s"The $name duct queue is full ($maxSize)")
if (!success) promise failure new EnqueueException(s"The $name duct queue is full ($maxSize)")
promise.future
}
@ -65,13 +63,15 @@ final class BoundedDuct(maxSize: Int, name: String)(process: Duct.ReceiveAsync)(
object BoundedDuct {
case class SizedQueue(queue: Queue[Any], size: Int) {
final class EnqueueException(msg: String) extends Exception(msg)
private case class SizedQueue(queue: Queue[Any], size: Int) {
def enqueue(a: Any) = SizedQueue(queue enqueue a, size + 1)
def isEmpty = size == 0
def tailOption = !isEmpty option SizedQueue(queue.tail, size - 1)
def headOption = queue.headOption
}
val emptyQueue = SizedQueue(Queue.empty, 0)
private val emptyQueue = SizedQueue(Queue.empty, 0)
private type State = Option[SizedQueue]

View File

@ -4,7 +4,8 @@ import com.github.blemale.scaffeine.LoadingCache
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Promise }
final class DuctSequencer(maxSize: Int, timeout: FiniteDuration, name: String)(implicit
final class DuctSequencer(maxSize: Int, timeout: FiniteDuration, name: String, logging: Boolean = true)(
implicit
system: akka.actor.ActorSystem,
ec: ExecutionContext
) {
@ -15,7 +16,7 @@ final class DuctSequencer(maxSize: Int, timeout: FiniteDuration, name: String)(i
def run[A](task: Task[A]): Fu[A] = duct.ask[A](TaskWithPromise(task, _))
private[this] val duct = new BoundedDuct(maxSize, name)({
private[this] val duct = new BoundedDuct(maxSize, name, logging)({
case TaskWithPromise(task, promise) =>
promise.completeWith {
task().withTimeout(timeout)

View File

@ -9,16 +9,17 @@ import scala.concurrent.duration._
import lila.db.dsl._
import lila.game.BSONHandlers.gameBSONHandler
import lila.game.{ Game, GameRepo, Query }
import lila.common.{ LilaStream, WorkQueue }
import lila.common.LilaStream
import lila.user.User
final private class Indexer(
povToEntry: PovToEntry,
gameRepo: GameRepo,
storage: Storage
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) {
private val workQueue = new WorkQueue(buffer = 64, timeout = 1 minute, name = "insightIndexer")
private val workQueue =
new lila.hub.DuctSequencer(maxSize = 64, timeout = 1 minute, name = "insightIndexer")
def all(user: User): Funit =
workQueue {

View File

@ -6,15 +6,14 @@ import reactivemongo.api.ReadPreference
import lila.game.{ Game, GameRepo, Pov, Query }
import lila.rating.PerfType
import lila.user.User
import lila.common.WorkQueue
final class PerfStatIndexer(
gameRepo: GameRepo,
storage: PerfStatStorage
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) {
private val workQueue =
new WorkQueue(buffer = 64, timeout = 10 seconds, name = "perfStatIndexer")
new lila.hub.DuctSequencer(maxSize = 64, timeout = 10 seconds, name = "perfStatIndexer")
private[perfStat] def userPerf(user: User, perfType: PerfType): Fu[PerfStat] =
workQueue {

View File

@ -3,7 +3,6 @@ package lila.pool
import scala.concurrent.duration._
import lila.game.{ Game, GameRepo, IdGenerator, Player }
import lila.common.WorkQueue
import lila.rating.Perf
import lila.user.{ User, UserRepo }
@ -12,11 +11,11 @@ final private class GameStarter(
gameRepo: GameRepo,
idGenerator: IdGenerator,
onStart: Game.Id => Unit
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) {
import PoolApi._
private val workQueue = new WorkQueue(buffer = 16, timeout = 10 seconds, name = "gameStarter")
private val workQueue = new lila.hub.DuctSequencer(maxSize = 16, timeout = 10 seconds, name = "gameStarter")
def apply(pool: PoolConfig, couples: Vector[MatchMaking.Couple]): Funit =
couples.nonEmpty ?? {

View File

@ -7,7 +7,7 @@ import play.api.libs.ws.WSClient
import scala.concurrent.{ blocking, Future }
import scala.concurrent.duration._
import lila.common.{ Chronometer, WorkQueue }
import lila.common.Chronometer
import lila.user.User
final private class FirebasePush(
@ -17,7 +17,8 @@ final private class FirebasePush(
config: FirebasePush.Config
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) {
private val workQueue = new WorkQueue(buffer = 512, timeout = 10 seconds, name = "firebasePush")
private val workQueue =
new lila.hub.DuctSequencer(maxSize = 512, timeout = 10 seconds, name = "firebasePush")
def apply(userId: User.ID, data: => PushApi.Data): Funit =
credentialsOpt ?? { creds =>

View File

@ -6,7 +6,7 @@ import play.api.libs.json.Json
import scala.concurrent.duration._
import chess.variant.Variant
import lila.common.{ Bus, Debouncer, WorkQueues }
import lila.common.{ Bus, Debouncer }
import lila.game.{ Game, GameRepo, PerfPicker }
import lila.hub.actorApi.lobby.ReloadSimuls
import lila.hub.actorApi.timeline.{ Propagate, SimulCreate, SimulJoin }
@ -16,7 +16,6 @@ import lila.user.{ User, UserRepo }
import makeTimeout.short
final class SimulApi(
system: ActorSystem,
userRepo: UserRepo,
gameRepo: GameRepo,
onGameStart: lila.round.OnStart,
@ -25,10 +24,15 @@ final class SimulApi(
timeline: lila.hub.actors.Timeline,
repo: SimulRepo,
cacheApi: lila.memo.CacheApi
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer, mode: play.api.Mode) {
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem, mode: play.api.Mode) {
private val workQueue =
new WorkQueues(buffer = 128, expiration = 10 minutes, timeout = 5 seconds, name = "simulApi")
new lila.hub.DuctSequencers(
maxSize = 128,
expiration = 10 minutes,
timeout = 10 seconds,
name = "simulApi"
)
def currentHostIds: Fu[Set[String]] = currentHostIdsCache.get {}

View File

@ -842,7 +842,7 @@ final class StudyApi(
if (study canContribute userId) f else default.zero
// work around circular dependency
private var socket: Option[StudySocket] = None
private var socket: Option[StudySocket] = None
private[study] def registerSocket(s: StudySocket) = { socket = s.some }
private def sendTo(studyId: Study.Id)(f: StudySocket => Study.Id => Unit): Unit =
socket foreach { s =>

View File

@ -5,7 +5,7 @@ import reactivemongo.api.bson._
import scala.concurrent.duration._
import play.api.libs.json._
import lila.common.{ Future, WorkQueue }
import lila.common.Future
import lila.db.dsl._
import lila.user.User
@ -58,8 +58,7 @@ final private class StudyUserTopicRepo(val coll: Coll)
final class StudyTopicApi(topicRepo: StudyTopicRepo, userTopicRepo: StudyUserTopicRepo, studyRepo: StudyRepo)(
implicit
ec: scala.concurrent.ExecutionContext,
system: akka.actor.ActorSystem,
mat: akka.stream.Materializer
system: akka.actor.ActorSystem
) {
import BSONHandlers.{ StudyTopicBSONHandler, StudyTopicsBSONHandler }
@ -136,17 +135,17 @@ final class StudyTopicApi(topicRepo: StudyTopicRepo, userTopicRepo: StudyUserTop
private def docTopic(doc: Bdoc): Option[StudyTopic] =
doc.getAsOpt[StudyTopic]("_id")
private val recomputeWorkQueue = new WorkQueue(
buffer = 1,
private val recomputeWorkQueue = new lila.hub.DuctSequencer(
maxSize = 1,
timeout = 61 seconds,
name = "studyTopicAggregation",
parallelism = 1
logging = false
)
def recompute(): Unit =
recomputeWorkQueue(Future.makeItLast(60 seconds)(recomputeNow)).recover {
case _: WorkQueue.EnqueueException => ()
case e: Exception => logger.warn("Can't recompute study topics!", e)
case _: lila.hub.BoundedDuct.EnqueueException => ()
case e: Exception => logger.warn("Can't recompute study topics!", e)
}
private def recomputeNow: Funit =

View File

@ -43,7 +43,7 @@ final class Env(
case false =>
entryApi.channelUserIdRecentExists(channel, userId) map {
case true => Some(false) // subed
case false => None // not applicable
case false => None // not applicable
}
}

View File

@ -11,7 +11,7 @@ import scala.util.chaining._
import lila.common.config.{ MaxPerPage, MaxPerSecond }
import lila.common.paginator.Paginator
import lila.common.{ Bus, Debouncer, LightUser, WorkQueues }
import lila.common.{ Bus, Debouncer, LightUser }
import lila.game.{ Game, GameRepo, LightPov, Pov }
import lila.hub.actorApi.lobby.ReloadTournaments
import lila.hub.LightTeam
@ -45,12 +45,16 @@ final class TournamentApi(
)(implicit
ec: scala.concurrent.ExecutionContext,
system: ActorSystem,
mat: akka.stream.Materializer,
mode: play.api.Mode
) {
private val workQueue =
new WorkQueues(buffer = 256, expiration = 1 minute, timeout = 10 seconds, name = "tournament")
new lila.hub.DuctSequencers(
maxSize = 256,
expiration = 1 minute,
timeout = 10 seconds,
name = "tournament"
)
def get(id: Tournament.ID) = tournamentRepo byId id