make broadcasts faster and more resilient
This commit is contained in:
parent
f9042e9ee6
commit
2b25883128
|
@ -49,7 +49,6 @@ object Relay extends LilaController {
|
|||
env.forms.edit(relay).bindFromRequest.fold(
|
||||
err => BadRequest(html.relay.edit(relay, err)).fuccess,
|
||||
data => env.api.update(relay) { data.update(_, me) } map { r =>
|
||||
env clearFormat r.sync.upstream.url
|
||||
Redirect(showRoute(r))
|
||||
}
|
||||
)
|
||||
|
|
|
@ -24,7 +24,7 @@ help = Html("Feature on /broadcast - for admins only").some) { field =>
|
|||
@base.form.flatpickr(field)
|
||||
}
|
||||
@if(isGranted(_.Relay)) {
|
||||
@base.form.group(form("throttle"), Html("Throttle in seconds"), help = Html("Optional, to manually throttle requests. Max 60s.").some, half = true) { field =>
|
||||
@base.form.group(form("throttle"), Html("Throttle in seconds"), help = Html("Optional, to manually throttle requests. Min 2s, max 60s.").some, half = true) { field =>
|
||||
@base.form.input(field, typ = "number")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ final class Env(
|
|||
repo = repo,
|
||||
studyApi = studyEnv.api,
|
||||
withStudy = withStudy,
|
||||
clearFormatCache = formatApi.refresh,
|
||||
system = system
|
||||
)
|
||||
|
||||
|
@ -47,8 +48,6 @@ final class Env(
|
|||
|
||||
private lazy val formatApi = new RelayFormatApi(asyncCache)
|
||||
|
||||
def clearFormat(url: String) = formatApi refresh io.lemonlabs.uri.Url.parse(url)
|
||||
|
||||
system.actorOf(Props(new RelayFetch(
|
||||
sync = sync,
|
||||
api = api,
|
||||
|
|
|
@ -1,21 +1,22 @@
|
|||
package lila.relay
|
||||
|
||||
import akka.actor._
|
||||
import io.lemonlabs.uri.Url
|
||||
import org.joda.time.DateTime
|
||||
import ornicar.scalalib.Zero
|
||||
import play.api.libs.json._
|
||||
import reactivemongo.bson._
|
||||
import io.lemonlabs.uri.Url
|
||||
|
||||
import lila.db.dsl._
|
||||
import lila.security.Granter
|
||||
import lila.study.{ StudyApi, Study, StudyMaker, Settings }
|
||||
import lila.user.User
|
||||
import lila.security.Granter
|
||||
|
||||
final class RelayApi(
|
||||
repo: RelayRepo,
|
||||
studyApi: StudyApi,
|
||||
withStudy: RelayWithStudy,
|
||||
clearFormatCache: Url => Unit,
|
||||
system: ActorSystem
|
||||
) {
|
||||
|
||||
|
@ -65,6 +66,7 @@ final class RelayApi(
|
|||
}
|
||||
|
||||
def requestPlay(id: Relay.Id, v: Boolean): Funit = WithRelay(id) { relay =>
|
||||
clearFormatCache(Url parse relay.sync.upstream.url)
|
||||
update(relay) { r =>
|
||||
if (v) r.withSync(_.play) else r.withSync(_.pause)
|
||||
} void
|
||||
|
|
|
@ -22,8 +22,6 @@ private final class RelayFetch(
|
|||
chapterRepo: lila.study.ChapterRepo
|
||||
) extends Actor {
|
||||
|
||||
val frequency = 1.seconds
|
||||
|
||||
override def preStart: Unit = {
|
||||
logger.info("Start RelaySync")
|
||||
context setReceiveTimeout 20.seconds
|
||||
|
@ -33,7 +31,7 @@ private final class RelayFetch(
|
|||
case object Tick
|
||||
|
||||
def scheduleNext =
|
||||
context.system.scheduler.scheduleOnce(frequency, self, Tick)
|
||||
context.system.scheduler.scheduleOnce(600 millis, self, Tick)
|
||||
|
||||
def receive = {
|
||||
|
||||
|
@ -100,11 +98,11 @@ private final class RelayFetch(
|
|||
(if (r.sync.log.alwaysFails) fuccess(60) else (r.sync.delay match {
|
||||
case Some(delay) => fuccess(delay)
|
||||
case None => api.getNbViewers(r) map { nb =>
|
||||
(18 - nb) atLeast 8
|
||||
(18 - nb) atLeast 7
|
||||
}
|
||||
})) map { seconds =>
|
||||
r.withSync(_.copy(nextAt = DateTime.now plusSeconds {
|
||||
seconds atLeast { if (r.sync.log.isOk) 3 else 12 }
|
||||
seconds atLeast { if (r.sync.log.alwaysFails) 10 else 2 }
|
||||
} some))
|
||||
}
|
||||
|
||||
|
@ -128,27 +126,25 @@ private final class RelayFetch(
|
|||
private def doFetch(upstream: Upstream, max: Int): Fu[RelayGames] = {
|
||||
import RelayFetch.DgtJson._
|
||||
formatApi.get(upstream.url) flatMap {
|
||||
_.fold[Fu[MultiPgn]](fufail("Cannot find any DGT compatible files")) {
|
||||
case RelayFormat.SingleFile(doc) => doc.format match {
|
||||
// all games in a single PGN file
|
||||
case RelayFormat.DocFormat.Pgn => httpGet(doc.url) map { MultiPgn.split(_, max) }
|
||||
// maybe a single JSON game? Why not
|
||||
case RelayFormat.DocFormat.Json => httpGetJson[GameJson](doc.url)(gameReads) map { game =>
|
||||
MultiPgn(List(game.toPgn()))
|
||||
}
|
||||
case RelayFormat.SingleFile(doc) => doc.format match {
|
||||
// all games in a single PGN file
|
||||
case RelayFormat.DocFormat.Pgn => httpGet(doc.url) map { MultiPgn.split(_, max) }
|
||||
// maybe a single JSON game? Why not
|
||||
case RelayFormat.DocFormat.Json => httpGetJson[GameJson](doc.url)(gameReads) map { game =>
|
||||
MultiPgn(List(game.toPgn()))
|
||||
}
|
||||
case RelayFormat.ManyFiles(indexUrl, makeGameDoc) => httpGetJson[RoundJson](indexUrl) flatMap { round =>
|
||||
round.pairings.zipWithIndex.map {
|
||||
case (pairing, i) =>
|
||||
val number = i + 1
|
||||
val gameDoc = makeGameDoc(number)
|
||||
(gameDoc.format match {
|
||||
case RelayFormat.DocFormat.Pgn => httpGet(gameDoc.url)
|
||||
case RelayFormat.DocFormat.Json => httpGetJson[GameJson](gameDoc.url) map { _.toPgn(pairing.tags) }
|
||||
}) map (number -> _)
|
||||
}.sequenceFu.map { results =>
|
||||
MultiPgn(results.sortBy(_._1).map(_._2).toList)
|
||||
}
|
||||
}
|
||||
case RelayFormat.ManyFiles(indexUrl, makeGameDoc) => httpGetJson[RoundJson](indexUrl) flatMap { round =>
|
||||
round.pairings.zipWithIndex.map {
|
||||
case (pairing, i) =>
|
||||
val number = i + 1
|
||||
val gameDoc = makeGameDoc(number)
|
||||
(gameDoc.format match {
|
||||
case RelayFormat.DocFormat.Pgn => httpGet(gameDoc.url)
|
||||
case RelayFormat.DocFormat.Json => httpGetJson[GameJson](gameDoc.url) map { _.toPgn(pairing.tags) }
|
||||
}) map (number -> _)
|
||||
}.sequenceFu.map { results =>
|
||||
MultiPgn(results.sortBy(_._1).map(_._2).toList)
|
||||
}
|
||||
}
|
||||
} flatMap RelayFetch.multiPgnToGames.apply
|
||||
|
|
|
@ -18,7 +18,7 @@ object RelayForm {
|
|||
"official" -> boolean,
|
||||
"syncUrl" -> nonEmptyText,
|
||||
"startsAt" -> optional(utcDate),
|
||||
"throttle" -> optional(number(min = 0, max = 60))
|
||||
"throttle" -> optional(number(min = 2, max = 60))
|
||||
)(Data.apply)(Data.unapply))
|
||||
|
||||
def create = form
|
||||
|
|
|
@ -15,11 +15,11 @@ private final class RelayFormatApi(
|
|||
|
||||
import RelayFormat._
|
||||
|
||||
def get(url: String): Fu[Option[RelayFormat]] = cache get Url.parse(url)
|
||||
def get(url: String): Fu[RelayFormat] = cache get Url.parse(url)
|
||||
|
||||
def refresh(url: Url): Unit = cache refresh url
|
||||
|
||||
private def guessFormat(url: Url): Fu[Option[RelayFormat]] = {
|
||||
private def guessFormat(url: Url): Fu[RelayFormat] = {
|
||||
|
||||
def guessSingleFile: Fu[Option[RelayFormat]] =
|
||||
lila.common.Future.find(List(
|
||||
|
@ -43,12 +43,14 @@ private final class RelayFormatApi(
|
|||
}
|
||||
}
|
||||
|
||||
guessManyFiles orElse guessSingleFile
|
||||
guessManyFiles orElse guessSingleFile flatten "Cannot find any DGT compatible files"
|
||||
} addEffect { format =>
|
||||
logger.info(s"guessed format of $url: ${format.fold("???")(_.toString)}")
|
||||
logger.info(s"guessed format of $url: $format")
|
||||
} addFailureEffect { err =>
|
||||
logger.info(s"can't guess format of $url: $err")
|
||||
}
|
||||
|
||||
private val cache = asyncCache.multi[Url, Option[RelayFormat]](
|
||||
private val cache = asyncCache.multi[Url, RelayFormat](
|
||||
name = "relayFormat",
|
||||
f = guessFormat,
|
||||
expireAfter = _.ExpireAfterWrite(10 minutes)
|
||||
|
|
Loading…
Reference in a new issue