add timeout to work queues to fix studies and tournaments maybe

pull/5918/head
Thibault Duplessis 2020-01-14 20:30:51 -06:00
parent 189d52581b
commit 61cc149e14
13 changed files with 41 additions and 19 deletions

View File

@ -6,6 +6,7 @@ import com.github.blemale.scaffeine.LoadingCache
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.util.chaining._
import java.util.concurrent.TimeoutException
/* Sequences async tasks, so that:
* queue.run(() => task1); queue.run(() => task2)
@ -15,7 +16,7 @@ import scala.util.chaining._
* If the buffer is full, the new task is dropped,
* and `run` returns a failed future.
*/
final class WorkQueue(buffer: Int, name: String, parallelism: Int = 1)(
final class WorkQueue(buffer: Int, timeout: FiniteDuration, name: String, parallelism: Int = 1)(
implicit ec: ExecutionContext,
mat: Materializer
) {
@ -41,17 +42,23 @@ final class WorkQueue(buffer: Int, name: String, parallelism: Int = 1)(
.queue[TaskWithPromise[_]](buffer, OverflowStrategy.dropNew)
.mapAsyncUnordered(parallelism) {
case (task, promise) =>
task() tap promise.completeWith recover {
case e: Exception =>
lila.log(s"WorkQueue:$name").info("task failed", e)
}
task()
.withTimeout(timeout, new TimeoutException)(ec, mat.system)
.tap(promise.completeWith)
.recover {
case e: TimeoutException =>
lila.mon.workQueue.timeout(name).increment()
lila.log(s"WorkQueue:$name").warn(s"task timed out after $timeout", e)
case e: Exception =>
lila.log(s"WorkQueue:$name").info("task failed", e)
}
}
.toMat(Sink.ignore)(Keep.left)
.run
}
// Distributes tasks to many sequencers
final class WorkQueues(buffer: Int, expiration: FiniteDuration, name: String)(
final class WorkQueues(buffer: Int, expiration: FiniteDuration, timeout: FiniteDuration, name: String)(
implicit ec: ExecutionContext,
mat: Materializer,
mode: play.api.Mode
@ -64,5 +71,5 @@ final class WorkQueues(buffer: Int, expiration: FiniteDuration, name: String)(
LilaCache
.scaffeine(mode)
.expireAfterAccess(expiration)
.build(key => new WorkQueue(buffer, s"$name:$key"))
.build(key => new WorkQueue(buffer, timeout, s"$name:$key"))
}

View File

@ -552,6 +552,7 @@ object mon {
"result" -> result
)
)
def timeout(name: String) = counter("workQueue.timeout").withTag("name", name)
}
def chronoSync[A] = lila.common.Chronometer.syncMon[A] _

View File

@ -2,6 +2,7 @@ package lila.fishnet
import org.joda.time.DateTime
import chess.format.Forsyth
import scala.concurrent.duration._
import lila.analyse.AnalysisRepo
import lila.game.{ Game, UciMemo }
@ -18,7 +19,7 @@ final class Analyser(
val maxPlies = 200
private val workQueue = new WorkQueue(256, "fishnetAnalyser")
private val workQueue = new WorkQueue(buffer = 256, timeout = 5 seconds, "fishnetAnalyser")
def apply(game: Game, sender: Work.Sender): Fu[Boolean] =
(game.metadata.analysed ?? analysisRepo.exists(game.id)) flatMap {

View File

@ -3,6 +3,7 @@ package lila.fishnet
import org.joda.time.DateTime
import reactivemongo.api.bson._
import scala.util.{ Failure, Success, Try }
import scala.concurrent.duration._
import Client.Skill
import lila.common.IpAddress
@ -24,7 +25,7 @@ final class FishnetApi(
import JsonApi.Request.{ CompleteAnalysis, PartialAnalysis }
import BSONHandlers._
private val workQueue = new WorkQueue(128, "fishnetApi")
private val workQueue = new WorkQueue(buffer = 128, timeout = 5 seconds, name = "fishnetApi")
def keyExists(key: Client.Key) = repo.getEnabledClient(key).map(_.isDefined)

View File

@ -20,7 +20,7 @@ final class PlayTimeApi(
import Game.{ BSONFields => F }
private val workQueue = new WorkQueue(512, "playTime", parallelism = 4)
private val workQueue = new WorkQueue(buffer = 512, timeout = 1 minute, name = "playTime", parallelism = 4)
def apply(user: User): Fu[Option[User.PlayTime]] =
fuccess(user.playTime) orElse

View File

@ -18,7 +18,7 @@ final private class Indexer(
storage: Storage
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
private val workQueue = new WorkQueue(64, "insightIndexer")
private val workQueue = new WorkQueue(buffer = 64, timeout = 1 minute, name = "insightIndexer")
def all(user: User): Funit = workQueue {
storage.fetchLast(user.id) flatMap {

View File

@ -1,5 +1,7 @@
package lila.perfStat
import scala.concurrent.duration._
import lila.game.{ Game, GameRepo, Pov, Query }
import lila.rating.PerfType
import lila.user.User
@ -10,7 +12,7 @@ final class PerfStatIndexer(
storage: PerfStatStorage
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
private val workQueue = new WorkQueue(64, "perfStatIndexer")
private val workQueue = new WorkQueue(buffer = 64, timeout = 1 minute, "perfStatIndexer")
private[perfStat] def userPerf(user: User, perfType: PerfType): Fu[PerfStat] = workQueue {
storage.find(user.id, perfType) getOrElse gameRepo

View File

@ -1,5 +1,7 @@
package lila.pool
import scala.concurrent.duration._
import lila.game.{ Game, GameRepo, IdGenerator, Player }
import lila.common.WorkQueue
import lila.rating.Perf
@ -14,7 +16,7 @@ final private class GameStarter(
import PoolApi._
private val workQueue = new WorkQueue(16, "gameStarter")
private val workQueue = new WorkQueue(buffer = 16, timeout = 5 seconds, name = "gameStarter")
def apply(pool: PoolConfig, couples: Vector[MatchMaking.Couple]): Funit = couples.nonEmpty ?? {
workQueue {

View File

@ -17,7 +17,7 @@ final private class FirebasePush(
config: OneSignalPush.Config
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) {
private val workQueue = new WorkQueue(512, "firebasePush")
private val workQueue = new WorkQueue(buffer = 512, timeout = 10 seconds, name = "firebasePush")
def apply(userId: User.ID)(data: => PushApi.Data): Funit =
credentialsOpt ?? { creds =>
@ -33,7 +33,7 @@ final private class FirebasePush(
creds.getAccessToken()
}
}
} withTimeout 10.seconds
}
}.chronometer.mon(_.push.googleTokenTime).result flatMap { token =>
// TODO http batch request is possible using a multipart/mixed content
// unfortuntely it doesn't seem easily doable with play WS

View File

@ -27,7 +27,8 @@ final class SimulApi(
cacheApi: lila.memo.CacheApi
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer, mode: play.api.Mode) {
private val workQueue = new WorkQueues(128, 10 minutes, "simulApi")
private val workQueue =
new WorkQueues(buffer = 128, expiration = 10 minutes, timeout = 5 seconds, name = "simulApi")
def currentHostIds: Fu[Set[String]] = currentHostIdsCache.get({})

View File

@ -7,9 +7,14 @@ import lila.common.WorkQueues
final private class StudySequencer(
studyRepo: StudyRepo,
chapterRepo: ChapterRepo
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer, mode: play.api.Mode) {
)(
implicit ec: scala.concurrent.ExecutionContext,
mat: akka.stream.Materializer,
mode: play.api.Mode
) {
private val workQueue = new WorkQueues(256, 5 minutes, "study")
private val workQueue =
new WorkQueues(buffer = 256, expiration = 5 minutes, timeout = 10 seconds, name = "study")
def sequenceStudy(studyId: Study.Id)(f: Study => Funit): Funit =
workQueue(studyId.value) {

View File

@ -48,7 +48,8 @@ final class TournamentApi(
mode: play.api.Mode
) {
private val workQueue = new WorkQueues(256, 1 minute, "tournament")
private val workQueue =
new WorkQueues(buffer = 256, expiration = 1 minute, timeout = 10 seconds, name = "tournament")
def createTournament(
setup: TournamentSetup,

View File

@ -22,6 +22,7 @@ final class TournamentStandingApi(
private val workQueue = new WorkQueue(
buffer = 512,
timeout = 5 seconds,
name = "tournamentStandingApi",
parallelism = 6
)