coordinated shutdown WIP

pull/5836/head
Thibault Duplessis 2019-12-27 10:18:45 -06:00
parent 9c830723e3
commit 17b34cfa8a
17 changed files with 104 additions and 171 deletions

View File

@ -8,7 +8,7 @@ import play.api.{ Configuration, Environment, Mode }
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
import lila.common.Bus
import lila.common.{ Bus, Lilakka }
import lila.common.config._
import lila.user.User
@ -153,8 +153,6 @@ final class Env(
}
system.actorOf(Props(new actor.Renderer), name = config.get[String]("app.renderer.name"))
scheduler.scheduleOnce(5 seconds) { slack.api.publishRestart }
}
final class EnvBoot(
@ -252,7 +250,7 @@ final class EnvBoot(
// free memory for reload workflow
if (env.isDev)
shutdown.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "Freeing dev memory") { () =>
Lilakka.shutdown(shutdown, _.PhaseBeforeActorSystemTerminate, "Freeing dev memory") { () =>
templating.Environment.destroy()
lila.common.Bus.destroy()
lila.mon.destroy()

View File

@ -1,7 +1,6 @@
package lila.api
import lila.common.Bus
import lila.hub.actorApi.Deploy
final private[api] class Cli(
userRepo: lila.user.UserRepo,
@ -31,9 +30,7 @@ final private[api] class Cli(
}
def process = {
case "uptime" :: Nil => fuccess(s"${lila.common.Uptime.seconds} seconds")
case "deploy" :: "pre" :: Nil => remindDeploy(lila.hub.actorApi.DeployPre)
case "deploy" :: "post" :: Nil => remindDeploy(lila.hub.actorApi.DeployPost)
case "uptime" :: Nil => fuccess(s"${lila.common.Uptime.seconds} seconds")
case "change" :: ("asset" | "assets") :: "version" :: Nil =>
import lila.common.AssetVersion
AssetVersion.change
@ -62,11 +59,6 @@ final private[api] class Cli(
}
}
private def remindDeploy(event: Deploy): Fu[String] = {
Bus.publish(event, "deploy")
fuccess("Deploy in progress")
}
private def run(args: List[String]): Fu[String] = {
(processors lift args) | fufail("Unknown command: " + args.mkString(" "))
} recover {

View File

@ -47,9 +47,9 @@ object Chronometer {
this
}
def pp: Fu[A] = lap dmap (_.pp)
def pp(msg: String): Fu[A] = lap dmap (_ pp msg)
def ppIfGt(msg: String, duration: FiniteDuration): Fu[A] = lap dmap (_.ppIfGt(msg, duration))
def pp: Fu[A] = lap.dmap(_.pp)
def pp(msg: String): Fu[A] = lap.dmap(_ pp msg)
def ppIfGt(msg: String, duration: FiniteDuration): Fu[A] = lap.dmap(_.ppIfGt(msg, duration))
def result = lap.dmap(_.result)
}

View File

@ -0,0 +1,14 @@
package lila.common
import akka.actor._
object Lilakka {
def shutdown(cs: CoordinatedShutdown, phase: CoordinatedShutdown.type => String, name: String)(
f: () => Funit
): Unit =
cs.addTask(phase(CoordinatedShutdown), name) { () =>
lila.log("shutdown").info(name)
Chronometer(f()) pp name inject akka.Done
}
}

View File

@ -5,6 +5,8 @@ import com.typesafe.config.Config
import play.api.{ ConfigLoader, Configuration }
import reactivemongo.api._
import lila.common.Lilakka
final class Env(
appConfig: Configuration,
shutdown: CoordinatedShutdown
@ -24,7 +26,7 @@ final class Env(
driver = driver
)
shutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "Closing mongodb driver") { () =>
Lilakka.shutdown(shutdown, _.PhaseServiceStop, "Closing mongodb driver") { () =>
driver.close() inject akka.Done
}
}

View File

@ -7,7 +7,7 @@ import scala.concurrent.Future
import lila.hub.actorApi.map.Tell
import lila.hub.actorApi.round.{ FishnetPlay, FishnetStart }
import lila.common.Bus
import lila.common.{ Bus, Lilakka }
import akka.actor.CoordinatedShutdown
final class FishnetRedis(
@ -38,12 +38,8 @@ final class FishnetRedis(
}
})
shutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "Stopping the fishnet redis pool") { () =>
Future {
client.shutdown()
logger.info("Stopped the fishnet redis pool.")
akka.Done
}
Lilakka.shutdown(shutdown, _.PhaseServiceUnbind, "Stopping the fishnet redis pool") { () =>
Future { client.shutdown() }
}
private def writeWork(work: Work.Move): String =

View File

@ -7,7 +7,7 @@ import scala.concurrent.Promise
/*
* Sequential like an actor, but for async functions,
* and using an STM backend instead of akka actor.
* and using an atomic backend instead of akka actor.
*/
abstract class Duct(implicit ec: scala.concurrent.ExecutionContext) extends lila.common.Tellable {

View File

@ -1,9 +1,9 @@
package lila.hub
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ BiFunction, Consumer, Function }
import java.util.function.Function
import ornicar.scalalib.Zero
import scala.concurrent.Promise
import scala.concurrent.{ ExecutionContext, Promise }
final class DuctConcMap[D <: Duct](
mkDuct: String => D,
@ -19,9 +19,7 @@ final class DuctConcMap[D <: Duct](
def tellIfPresent(id: String, msg: Any): Unit = getIfPresent(id) foreach (_ ! msg)
def tellAll(msg: Any) =
ducts.forEachValue(16, new Consumer[D] {
def accept(duct: D) = duct ! msg
})
ducts.forEachValue(16, _ ! msg)
def tellIds(ids: Seq[String], msg: Any): Unit = ids foreach { tell(_, msg) }
@ -37,26 +35,21 @@ final class DuctConcMap[D <: Duct](
def exists(id: String): Boolean = ducts.get(id) != null
def foreachKey(f: String => Unit): Unit =
ducts.forEachKey(16, new Consumer[String] {
def accept(key: String) = f(key)
})
ducts.forEachKey(16, k => f(k))
def tellAllWithAck(makeMsg: Promise[Unit] => Any): Fu[Int] =
ducts.reduce[Fu[Int]](
16,
(_, d) => d ask makeMsg inject 1,
(acc, fu) => acc.flatMap(nb => fu.dmap(_ => nb + 1))(ExecutionContext.parasitic)
)
def size: Int = ducts.size()
def count(f: D => Boolean): Int = {
var nb = 0
ducts.forEachValue(16, new Consumer[D] {
def accept(duct: D) = if (f(duct)) nb += 1
})
nb
}
def terminate(id: String, lastWill: Duct => Unit): Unit =
ducts.computeIfPresent(id, new BiFunction[String, D, D] {
def apply(k: String, duct: D) = {
lastWill(duct)
nullD
}
ducts.computeIfPresent(id, (_, d) => {
lastWill(d)
nullD
})
def touchOrMake(id: String): Unit = ducts get id

View File

@ -6,10 +6,6 @@ import org.joda.time.DateTime
import play.api.libs.json._
import scala.concurrent.Promise
sealed abstract class Deploy(val key: String)
case object DeployPre extends Deploy("deployPre")
case object DeployPost extends Deploy("deployPost")
// announce something to all clients
case class Announce(msg: String, date: DateTime, json: JsObject)

View File

@ -1,35 +0,0 @@
package lila.round
import akka.actor.{ Cancellable, Scheduler }
import scala.concurrent.duration._
import lila.hub.actorApi.Deploy
final private class DeployPersistence(scheduler: Scheduler)(implicit ec: scala.concurrent.ExecutionContext) {
private var ongoing: Option[Cancellable] = None
def isEnabled() = ongoing.isDefined
def enable(): Unit = {
cancel()
logger.warn("Enabling round persistence")
ongoing = scheduler
.scheduleOnce(7.minutes) {
logger.warn("Expiring round persistence")
ongoing = none
}
.some
}
def cancel(): Unit =
ongoing foreach { o =>
logger.warn("Cancelling round persistence")
o.cancel()
ongoing = none
}
lila.common.Bus.subscribeFun("deploy") {
case _: Deploy => enable()
}
}

View File

@ -52,15 +52,14 @@ final class Env(
remoteSocketApi: lila.socket.RemoteSocket,
isBotSync: lila.common.LightUser.IsBotSync,
slackApi: lila.slack.SlackApi,
ratingFactors: () => lila.rating.RatingFactors
ratingFactors: () => lila.rating.RatingFactors,
shutdown: akka.actor.CoordinatedShutdown
)(implicit ec: scala.concurrent.ExecutionContext, system: ActorSystem, scheduler: akka.actor.Scheduler) {
implicit private val moretimeLoader = durationLoader(MoretimeDuration.apply)
implicit private val animationLoader = durationLoader(AnimationDuration.apply)
private val config = appConfig.get[RoundConfig]("round")(AutoConfig.loader)
private val deployPersistence: DeployPersistence = wire[DeployPersistence]
private val defaultGoneWeight = fuccess(1f)
private def goneWeight(userId: User.ID): Fu[Float] = playban.getRageSit(userId).dmap(_.goneWeight)
private val goneWeightsFor = (game: Game) =>
@ -83,7 +82,7 @@ final class Env(
})
private lazy val proxyDependencies =
new GameProxy.Dependencies(gameRepo, deployPersistence.isEnabled _, scheduler)
new GameProxy.Dependencies(gameRepo, scheduler)
private lazy val roundDependencies = wire[RoundDuct.Dependencies]
lazy val roundSocket: RoundSocket = wire[RoundSocket]

View File

@ -25,6 +25,12 @@ final private class GameProxy(
else fuccess(scheduleFlushProgress)
}
private[round] def saveAndFlush(progress: Progress): Funit = {
set(progress.game)
dirtyProgress = dirtyProgress.fold(progress.dropEvents)(_ withGame progress.game).some
flushProgress
}
private def set(game: Game): Unit = {
cache = fuccess(game.some)
}
@ -59,7 +65,7 @@ final private class GameProxy(
private var scheduledFlush: Cancellable = emptyCancellable
private def shouldFlushProgress(p: Progress) =
alwaysPersist() || p.statusChanged || p.game.isSimul || (
p.statusChanged || p.game.isSimul || (
p.game.hasCorrespondenceClock && !p.game.hasAi && p.game.rated
)
@ -84,7 +90,6 @@ private object GameProxy {
class Dependencies(
val gameRepo: GameRepo,
val alwaysPersist: () => Boolean,
val scheduler: Scheduler
)

View File

@ -5,6 +5,7 @@ import ornicar.scalalib.Zero
import play.api.libs.json._
import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.util.chaining._
import actorApi._, round._
import chess.{ Black, Color, Speed, White }
@ -13,7 +14,6 @@ import lila.common.Bus
import lila.game.actorApi.UserStartGame
import lila.game.Game.{ FullId, PlayerId }
import lila.game.{ Game, GameRepo, Pov, Event, Player => GamePlayer }
import lila.hub.actorApi.DeployPost
import lila.hub.actorApi.round.{
Abort,
BotPlay,
@ -404,7 +404,14 @@ final private[round] class RoundDuct(
}
}
case DeployPost =>
case LilaStop(promise) =>
proxy.withGame { g =>
g.playable ?? {
proxy saveAndFlush moretimer.give(g, Color.all, MoretimeDuration(20 seconds))
}
} tap promise.completeWith
case WsBoot =>
handle { game =>
game.playable ?? {
messenger.system(game, (_.untranslated("Lichess has been updated! Sorry for the inconvenience.")))
@ -551,6 +558,8 @@ object RoundDuct {
case class SetGameInfo(game: lila.game.Game, goneWeights: (Float, Float))
case object Tick
case object Stop
case object WsBoot
case class LilaStop(promise: Promise[Unit])
case class ChatIds(priv: Either[Chat.Id, Chat.Setup], pub: Chat.Id) {
def allIds = Seq(priv.fold(identity, _.id), pub)

View File

@ -1,18 +1,18 @@
package lila.round
import akka.actor.{ ActorSystem, Cancellable, Scheduler }
import akka.actor.{ ActorSystem, Cancellable, CoordinatedShutdown, Scheduler }
import play.api.libs.json._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import actorApi._
import actorApi.round._
import chess.format.Uci
import chess.{ Black, Centis, Color, MoveMetrics, Speed, White }
import lila.chat.Chat
import lila.common.{ Bus, IpAddress }
import lila.common.{ Bus, IpAddress, Lilakka }
import lila.game.Game.{ FullId, PlayerId }
import lila.game.{ Event, Game }
import lila.hub.actorApi.DeployPost
import lila.hub.actorApi.map.{ Exists, Tell, TellIfExists }
import lila.hub.actorApi.round.{ Abort, Berserk, RematchNo, RematchYes, Resign, TourStanding }
import lila.hub.actorApi.socket.remote.TellSriIn
@ -30,11 +30,21 @@ final class RoundSocket(
scheduleExpiration: ScheduleExpiration,
tournamentActor: lila.hub.actors.TournamentApi,
messenger: Messenger,
goneWeightsFor: Game => Fu[(Float, Float)]
)(implicit ec: scala.concurrent.ExecutionContext, system: ActorSystem) {
goneWeightsFor: Game => Fu[(Float, Float)],
shutdown: CoordinatedShutdown
)(implicit ec: ExecutionContext, system: ActorSystem) {
import RoundSocket._
private var stopping = false
Lilakka.shutdown(shutdown, _.PhaseServiceUnbind, "Stop round socket") { () =>
stopping = true
rounds.tellAllWithAck(RoundDuct.LilaStop.apply) dmap { nb =>
logger.info(s"$nb round ducts have stopped")
}
}
def getGame(gameId: Game.ID): Fu[Option[Game]] = rounds.getOrMake(gameId).getGame addEffect { g =>
if (!g.isDefined) finishRound(Game.Id(gameId))
}
@ -69,7 +79,9 @@ final class RoundSocket(
def tellRound(gameId: Game.Id, msg: Any): Unit = rounds.tell(gameId.value, msg)
private lazy val roundHandler: Handler = {
case Protocol.In.PlayerDo(id, tpe) =>
case Protocol.In.PlayerMove(fullId, uci, blur, lag) if !stopping =>
tellRound(fullId.gameId, HumanPlay(fullId.playerId, uci, blur, lag, none))
case Protocol.In.PlayerDo(id, tpe) if !stopping =>
tpe match {
case "moretime" => tellRound(id.gameId, Moretime(id.playerId))
case "rematch-yes" => tellRound(id.gameId, RematchYes(id.playerId.value))
@ -92,8 +104,6 @@ final class RoundSocket(
messenger.watcher(Chat.Id(gameId.value), userId, msg)
case RP.In.ChatTimeout(roomId, modId, suspect, reason) =>
messenger.timeout(Chat.Id(s"$roomId/w"), modId, suspect, reason)
case Protocol.In.PlayerMove(fullId, uci, blur, lag) =>
tellRound(fullId.gameId, HumanPlay(fullId.playerId, uci, blur, lag, none))
case Protocol.In.Berserk(gameId, userId) => tournamentActor ! Berserk(gameId.value, userId)
case Protocol.In.PlayerOnlines(onlines) =>
onlines foreach {
@ -122,7 +132,7 @@ final class RoundSocket(
rounds foreachKey { id =>
terminationDelay schedule Game.Id(id)
}
rounds.tellAll(DeployPost)
rounds.tellAll(RoundDuct.WsBoot)
}
private def finishRound(gameId: Game.Id): Unit =

View File

@ -1,21 +1,22 @@
package lila.slack
import com.softwaremill.macwire._
import play.api.Configuration
import play.api.{ Configuration, Mode }
import play.api.libs.ws.WSClient
import lila.common.Lilakka
import lila.common.config._
import lila.hub.actorApi.plan.ChargeEvent
import lila.hub.actorApi.slack.Event
import lila.hub.actorApi.user.Note
import lila.hub.actorApi.{ DeployPost, DeployPre }
@Module
final class Env(
appConfig: Configuration,
getLightUser: lila.common.LightUser.Getter,
mode: play.api.Mode,
ws: WSClient
ws: WSClient,
shutdown: akka.actor.CoordinatedShutdown,
mode: Mode
)(implicit ec: scala.concurrent.ExecutionContext) {
private val incomingUrl = appConfig.get[Secret]("slack.incoming.url")
@ -24,10 +25,13 @@ final class Env(
lazy val api: SlackApi = wire[SlackApi]
lila.common.Bus.subscribeFun("deploy", "slack", "plan", "userNote") {
if (mode == Mode.Prod) {
api.publishInfo("Lichess has started!")
Lilakka.shutdown(shutdown, _.PhaseBeforeServiceUnbind, "Tell slack")(api.stop _)
}
lila.common.Bus.subscribeFun("slack", "plan", "userNote") {
case d: ChargeEvent => api charge d
case DeployPre => api.deployPre
case DeployPost => api.deployPost
case Note(from, to, text, true) if from != "Irwin" => api.userModNote(from, to, text)
case e: Event => api publishEvent e
}

View File

@ -180,19 +180,6 @@ final class SlackApi(
)
)
def publishRestart =
if (isProd) publishInfo("Lichess has restarted!")
else
client(
SlackMessage(
username = stage.name,
icon = stage.icon,
text = "stage has restarted.",
channel = rooms.devNoise
)
)
private val isProd = mode == Mode.Prod
private def link(url: String, name: String) = s"<$url|$name>"
private def lichessLink(path: String, name: String) = s"<https://lichess.org$path|$name>"
private def userLink(name: String): String = lichessLink(s"/@/$name?mod", name)
@ -229,45 +216,15 @@ final class SlackApi(
)
)
def deployPre: Funit =
if (isProd)
client(
SlackMessage(
username = "deployment",
icon = "rocket",
text = "Lichess will be updated in a minute! Fasten your seatbelts.",
channel = rooms.general
)
)
else
client(
SlackMessage(
username = stage.name,
icon = stage.icon,
text = "stage will be updated in a minute.",
channel = rooms.general
)
)
def deployPost: Funit =
if (isProd)
client(
SlackMessage(
username = "deployment",
icon = "rocket",
text = "Lichess is being updated! Brace for impact.",
channel = rooms.general
)
)
else
client(
SlackMessage(
username = "stage.lichess.org",
icon = "volcano",
text = "stage has been updated!",
channel = rooms.devNoise
)
def stop(): Funit =
client(
SlackMessage(
username = "deployment",
icon = "rocket",
text = "Lichess is being updated! Brace for impact.",
channel = rooms.general
)
)
def signup(user: User, email: EmailAddress, ip: IpAddress, fp: Option[String], susp: Boolean) =
client(

View File

@ -10,7 +10,7 @@ import play.api.libs.json._
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration._
import lila.common.Bus
import lila.common.{ Bus, Lilakka }
import lila.hub.actorApi.Announce
import lila.hub.actorApi.relation.ReloadOnlineFriends
import lila.hub.actorApi.round.Mlat
@ -69,7 +69,6 @@ final class RemoteSocket(
Bus.subscribeFun(
"socketUsers",
"deploy",
"announce",
"mlat",
"sendToFlag",
@ -122,23 +121,17 @@ final class RemoteSocket(
subPromise.future
}
shutdown.addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "Telling lila-ws we're stopping") { () =>
Lilakka.shutdown(shutdown, _.PhaseBeforeServiceUnbind, "Telling lila-ws we're stopping") { () =>
request[Unit](
id => send(Protocol.Out.stop(id)),
res => logger.info(s"lila-ws says: $res")
).withTimeout(1 second)
.addFailureEffect(e => logger.error("lila-ws stop", e))
.nevermind
.inject(akka.Done)
}
shutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "Stopping the socket redis pool") { () =>
logger.info("Stopping the socket redis pool...")
Future {
redisClient.shutdown()
logger.info("Stopped the socket redis pool.")
akka.Done
}
Lilakka.shutdown(shutdown, _.PhaseServiceUnbind, "Stopping the socket redis pool") { () =>
Future { redisClient.shutdown() }
}
}