rewrite tournament organizer actors for resilience

pull/1750/head
Thibault Duplessis 2016-03-21 14:07:22 +07:00
parent 8fbebfbe76
commit c5435edee3
9 changed files with 168 additions and 124 deletions

View File

@ -441,8 +441,7 @@ tournament {
name = tournament-socket
timeout = 2 minutes
}
organizer.name = tournament-organizer
reminder.name = tournament-reminder
api_actor.name = tournament-api
sequencer {
timeout = 10 minutes
}
@ -606,7 +605,7 @@ hub {
messenger = ${message.actor.name}
router = ${app.router.name}
fishnet = ${fishnet.actor.name}
tournament.organizer = ${tournament.organizer.name}
tournament.api = ${tournament.api_actor.name}
timeline {
user = ${timeline.user.actor.name}
}

View File

@ -15,7 +15,7 @@ final class Env(config: Config, system: ActorSystem) {
val messenger = select("actor.messenger")
val router = select("actor.router")
val fishnet = select("actor.fishnet")
val tournamentOrganizer = select("actor.tournament.organizer")
val tournamentApi = select("actor.tournament.api")
val simul = select("actor.simul")
val timeline = select("actor.timeline.user")
val bookmark = select("actor.bookmark")

View File

@ -89,7 +89,7 @@ private[round] final class SocketHandler(
sd d int "sd"
} send(HoldAlert(playerId, mean, sd, member.ip))
case ("berserk", _) => member.userId foreach { userId =>
hub.actor.tournamentOrganizer ! Berserk(gameId, userId)
hub.actor.tournamentApi ! Berserk(gameId, userId)
}
}
}

View File

@ -0,0 +1,24 @@
package lila.tournament
import scala.concurrent.duration._
import akka.actor._
import lila.game.actorApi.FinishGame
private[tournament] final class ApiActor(api: TournamentApi) extends Actor {
override def preStart {
context.system.lilaBus.subscribe(self, 'finishGame, 'adjustCheater, 'adjustBooster)
}
def receive = {
case FinishGame(game, _, _) => api finishGame game
case lila.hub.actorApi.mod.MarkCheater(userId) => api ejectLame userId
case lila.hub.actorApi.mod.MarkBooster(userId) => api ejectLame userId
case lila.hub.actorApi.round.Berserk(gameId, userId) => api.berserk(gameId, userId)
}
}

View File

@ -0,0 +1,56 @@
package lila.tournament
import akka.actor._
import scala.concurrent.duration._
import actorApi._
private[tournament] final class CreatedOrganizer(
api: TournamentApi,
isOnline: String => Boolean) extends Actor {
override def preStart {
pairingLogger.info("Start CreatedOrganizer")
context setReceiveTimeout 15.seconds
scheduleNext
}
case object AllCreatedTournaments
def scheduleNext =
context.system.scheduler.scheduleOnce(2 seconds, self, AllCreatedTournaments)
def receive = {
case ReceiveTimeout =>
val msg = "tournament.CreatedOrganizer timed out!"
pairingLogger.error(msg)
throw new RuntimeException(msg)
case AllCreatedTournaments =>
val myself = self
TournamentRepo.allCreated(30).map { tours =>
tours foreach { tour =>
tour.schedule match {
case None => PlayerRepo count tour.id foreach {
case 0 => api wipe tour
case nb if tour.hasWaitedEnough =>
if (nb >= Tournament.minPlayers) api start tour
else api wipe tour
case _ =>
}
case Some(schedule) if tour.hasWaitedEnough => api start tour
case _ => ejectLeavers(tour)
}
}
val nbTours = tours.size
pairingLogger.debug(s"AllCreatedTournaments - tours: $nbTours")
lila.mon.tournament.created(nbTours)
} andThenAnyway scheduleNext
}
private def ejectLeavers(tour: Tournament) =
PlayerRepo userIds tour.id foreach {
_ filterNot isOnline foreach { api.withdraw(tour.id, _) }
}
}

View File

@ -39,8 +39,7 @@ final class Env(
val UidTimeout = config duration "uid.timeout"
val SocketTimeout = config duration "socket.timeout"
val SocketName = config getString "socket.name"
val OrganizerName = config getString "organizer.name"
val ReminderName = config getString "reminder.name"
val ApiActorName = config getString "api_actor.name"
val SequencerTimeout = config duration "sequencer.timeout"
val NetDomain = config getString "net.domain"
}
@ -114,14 +113,23 @@ final class Env(
logger = logger)
}))
system.actorOf(Props(new Organizer(
system.actorOf(Props(new ApiActor(api = api)), name = ApiActorName)
system.actorOf(Props(new CreatedOrganizer(
api = api,
reminder = system.actorOf(Props(new Reminder(
renderer = hub.actor.renderer
)), name = ReminderName),
isOnline = isOnline
)))
private val reminder = system.actorOf(Props(new Reminder(
renderer = hub.actor.renderer
)))
system.actorOf(Props(new StartedOrganizer(
api = api,
reminder = reminder,
isOnline = isOnline,
socketHub = socketHub
)), name = OrganizerName)
)))
system.actorOf(Props(new Scheduler(api)))

View File

@ -1,109 +0,0 @@
package lila.tournament
import akka.actor.{ Scheduler => ActorScheduler, _ }
import akka.pattern.{ ask, pipe }
import scala.concurrent.duration._
import actorApi._
import lila.common.LilaException
import lila.game.actorApi.FinishGame
import lila.hub.actorApi.map.Ask
import lila.hub.actorApi.WithUserIds
import makeTimeout.short
private[tournament] final class Organizer(
api: TournamentApi,
reminder: ActorRef,
isOnline: String => Boolean,
socketHub: ActorRef) extends Actor {
override def preStart {
context.system.lilaBus.subscribe(self, 'finishGame, 'adjustCheater, 'adjustBooster)
context.system.scheduler.scheduleOnce(5 seconds, self, AllCreatedTournaments)
context.system.scheduler.scheduleOnce(6 seconds, self, StartedTournaments)
}
def receive = {
case AllCreatedTournaments => TimeoutReschedule(
name = "AllCreatedTournaments",
timeout = 10 seconds,
reschedule = _.scheduleOnce(2 seconds, self, AllCreatedTournaments)) {
TournamentRepo.allCreated(30).map { tours =>
tours foreach { tour =>
tour.schedule match {
case None => PlayerRepo count tour.id foreach {
case 0 => api wipe tour
case nb if tour.hasWaitedEnough =>
if (nb >= Tournament.minPlayers) api start tour
else api wipe tour
case _ =>
}
case Some(schedule) if tour.hasWaitedEnough => api start tour
case _ => ejectLeavers(tour)
}
}
lila.mon.tournament.created(tours.size)
}
}
case StartedTournaments => TimeoutReschedule(
name = "StartedTournaments",
timeout = 15 seconds,
reschedule = _.scheduleOnce(3 seconds, self, StartedTournaments)) {
val startAt = nowMillis
TournamentRepo.started.flatMap { started =>
lila.common.Future.traverseSequentially(started) { tour =>
PlayerRepo activeUserIds tour.id flatMap { activeUserIds =>
val nb = activeUserIds.size
val result: Funit =
if (tour.secondsToFinish == 0) fuccess(api finish tour)
else if (!tour.scheduled && nb < 2) fuccess(api finish tour)
else if (!tour.isAlmostFinished) startPairing(tour, activeUserIds, startAt)
else funit
result >>- {
reminder ! RemindTournament(tour, activeUserIds)
} inject nb
}
}.addEffect { playerCounts =>
val nbPlayers = playerCounts.sum
pairingLogger.debug(s"paired - players: $nbPlayers")
lila.mon.tournament.player(nbPlayers)
lila.mon.tournament.started(started.size)
if (nbPlayers > 1) Thread sleep 20000
}
}
}
case FinishGame(game, _, _) => api finishGame game
case lila.hub.actorApi.mod.MarkCheater(userId) => api ejectLame userId
case lila.hub.actorApi.mod.MarkBooster(userId) => api ejectLame userId
case lila.hub.actorApi.round.Berserk(gameId, userId) => api.berserk(gameId, userId)
}
private def TimeoutReschedule[A](
name: String,
timeout: FiniteDuration,
reschedule: ActorScheduler => Unit)(f: Fu[A]) = f
.withTimeout(timeout, LilaException(s"Organizer.$name timed out after $timeout"))(context.system)
.addFailureEffect { e => pairingLogger.error(e.getMessage, e) }
.andThenAnyway(reschedule(context.system.scheduler))
private def ejectLeavers(tour: Tournament) =
PlayerRepo userIds tour.id foreach {
_ filterNot isOnline foreach { api.withdraw(tour.id, _) }
}
private def startPairing(tour: Tournament, activeUserIds: List[String], startAt: Long): Funit =
getWaitingUsers(tour) zip PairingRepo.playingUserIds(tour) map {
case (waitingUsers, playingUserIds) =>
val users = waitingUsers intersect activeUserIds diff playingUserIds
api.makePairings(tour, users, startAt)
}
private def getWaitingUsers(tour: Tournament): Fu[WaitingUsers] =
socketHub ? Ask(tour.id, GetWaitingUsers) mapTo manifest[WaitingUsers]
}

View File

@ -0,0 +1,69 @@
package lila.tournament
import akka.actor._
import akka.pattern.{ ask, pipe }
import scala.concurrent.duration._
import actorApi._
import lila.hub.actorApi.map.Ask
import makeTimeout.short
private[tournament] final class StartedOrganizer(
api: TournamentApi,
reminder: ActorRef,
isOnline: String => Boolean,
socketHub: ActorRef) extends Actor {
override def preStart {
pairingLogger.info("Start StartedOrganizer")
context setReceiveTimeout 15.seconds
scheduleNext
}
case object AllStartedTournaments
def scheduleNext =
context.system.scheduler.scheduleOnce(3 seconds, self, AllStartedTournaments)
def receive = {
case ReceiveTimeout =>
val msg = "tournament.StartedOrganizer timed out!"
pairingLogger.error(msg)
throw new RuntimeException(msg)
case AllStartedTournaments =>
val myself = self
val startAt = nowMillis
TournamentRepo.started.flatMap { started =>
lila.common.Future.traverseSequentially(started) { tour =>
PlayerRepo activeUserIds tour.id flatMap { activeUserIds =>
val nb = activeUserIds.size
val result: Funit =
if (tour.secondsToFinish == 0) fuccess(api finish tour)
else if (!tour.scheduled && nb < 2) fuccess(api finish tour)
else if (!tour.isAlmostFinished) startPairing(tour, activeUserIds, startAt)
else funit
result >>- {
reminder ! RemindTournament(tour, activeUserIds)
} inject nb
}
}.addEffect { playerCounts =>
val nbPlayers = playerCounts.sum
pairingLogger.debug(s"AllStartedTournaments - players: $nbPlayers")
lila.mon.tournament.player(nbPlayers)
lila.mon.tournament.started(started.size)
}
} andThenAnyway scheduleNext
}
private def startPairing(tour: Tournament, activeUserIds: List[String], startAt: Long): Funit =
getWaitingUsers(tour) zip PairingRepo.playingUserIds(tour) map {
case (waitingUsers, playingUserIds) =>
val users = waitingUsers intersect activeUserIds diff playingUserIds
api.makePairings(tour, users, startAt)
}
private def getWaitingUsers(tour: Tournament): Fu[WaitingUsers] =
socketHub ? Ask(tour.id, GetWaitingUsers) mapTo manifest[WaitingUsers]
}

View File

@ -27,9 +27,6 @@ private[tournament] case object Reload
private[tournament] case class StartGame(game: Game)
private[tournament] case class Connected(enumerator: JsEnumerator, member: Member)
// organizer
private[tournament] case object AllCreatedTournaments
private[tournament] case object StartedTournaments
case class RemindTournament(tour: Tournament, activeUserIds: List[String])
case class TournamentTable(tours: List[Tournament])