improve EarlyMultiThrottler interface
parent
98e8b6bacd
commit
bcd0e8da32
|
@ -2,18 +2,27 @@ package lila.hub
|
|||
|
||||
import akka.actor._
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
import lila.log.Logger
|
||||
|
||||
/** Runs the work then waits cooldown
|
||||
* only runs once at a time per id.
|
||||
* Guarantees that work is ran as early as possible.
|
||||
* Also saves work and runs it after cooldown.
|
||||
*/
|
||||
final class EarlyMultiThrottler(
|
||||
logger: lila.log.Logger
|
||||
)(implicit ec: scala.concurrent.ExecutionContext)
|
||||
extends Actor {
|
||||
final class EarlyMultiThrottler(logger: Logger)(implicit ec: ExecutionContext, system: ActorSystem) {
|
||||
|
||||
import EarlyMultiThrottler._
|
||||
private val actor = system.actorOf(Props(new EarlyMultiThrottlerActor(logger)))
|
||||
|
||||
def apply(id: String, cooldown: FiniteDuration)(run: => Funit) =
|
||||
actor ! EarlyMultiThrottlerActor.Work(id, run = () => run, cooldown)
|
||||
}
|
||||
|
||||
// actor based implementation
|
||||
final private class EarlyMultiThrottlerActor(logger: Logger)(implicit ec: ExecutionContext) extends Actor {
|
||||
|
||||
import EarlyMultiThrottlerActor._
|
||||
|
||||
var running = Set.empty[String]
|
||||
var planned = Map.empty[String, Work]
|
||||
|
@ -45,13 +54,11 @@ final class EarlyMultiThrottler(
|
|||
lila.common.Future.makeItLast(work.cooldown) { work.run() }
|
||||
}
|
||||
|
||||
object EarlyMultiThrottler {
|
||||
|
||||
private object EarlyMultiThrottlerActor {
|
||||
case class Work(
|
||||
id: String,
|
||||
run: () => Funit,
|
||||
cooldown: FiniteDuration // how long to wait after running, before next run
|
||||
)
|
||||
|
||||
private case class Done(id: String)
|
||||
}
|
||||
|
|
|
@ -4,25 +4,22 @@ import akka.actor._
|
|||
import scala.concurrent.duration._
|
||||
|
||||
import lila.study.MultiPgn
|
||||
import lila.hub.EarlyMultiThrottler
|
||||
|
||||
final class RelayPush(sync: RelaySync, api: RelayApi)(implicit
|
||||
system: ActorSystem,
|
||||
ec: scala.concurrent.ExecutionContext
|
||||
) {
|
||||
|
||||
private val throttler = system.actorOf(Props(new EarlyMultiThrottler(logger = logger)))
|
||||
private val throttler = new lila.hub.EarlyMultiThrottler(logger)
|
||||
|
||||
def apply(rt: RelayRound.WithTour, pgn: String): Fu[Option[String]] =
|
||||
if (rt.round.sync.hasUpstream)
|
||||
fuccess("The relay has an upstream URL, and cannot be pushed to.".some)
|
||||
else
|
||||
fuccess {
|
||||
throttler ! EarlyMultiThrottler.Work(
|
||||
id = rt.round.id.value,
|
||||
run = () => pushNow(rt, pgn),
|
||||
cooldown = if (rt.tour.official) 3.seconds else 7.seconds
|
||||
)
|
||||
throttler(rt.round.id.value, if (rt.tour.official) 3.seconds else 7.seconds) {
|
||||
pushNow(rt, pgn)
|
||||
}
|
||||
none
|
||||
}
|
||||
|
||||
|
|
|
@ -758,8 +758,6 @@ final class TournamentApi(
|
|||
|
||||
private object updateTournamentStanding {
|
||||
|
||||
import lila.hub.EarlyMultiThrottler
|
||||
|
||||
// last published top hashCode
|
||||
private val lastPublished = lila.memo.CacheApi.scaffeineNoScheduler
|
||||
.initialCapacity(16)
|
||||
|
@ -778,15 +776,10 @@ final class TournamentApi(
|
|||
}
|
||||
}
|
||||
|
||||
private val throttler = system.actorOf(Props(new EarlyMultiThrottler(logger = logger)))
|
||||
private val throttler = new lila.hub.EarlyMultiThrottler(logger)
|
||||
|
||||
def apply(tour: Tournament): Unit =
|
||||
if (!tour.isTeamBattle)
|
||||
throttler ! EarlyMultiThrottler.Work(
|
||||
id = tour.id,
|
||||
run = () => publishNow(tour.id),
|
||||
cooldown = 15.seconds
|
||||
)
|
||||
if (!tour.isTeamBattle) throttler(tour.id, 15.seconds) { publishNow(tour.id) }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue