broadcast streaming API

pull/9874/head
Thibault Duplessis 2021-09-23 10:18:10 +02:00
parent d701b25b01
commit 9aef32ee88
8 changed files with 73 additions and 15 deletions

View File

@ -11,10 +11,12 @@ import lila.app._
import lila.relay.{ RelayRound => RoundModel, RelayTour => TourModel, RelayRoundForm }
import lila.user.{ User => UserModel }
import views._
import lila.common.HTTPRequest
final class RelayRound(
env: Env,
studyC: => Study
studyC: => Study,
apiC: => Api
) extends LilaController(env) {
def form(tourId: String) =
@ -161,6 +163,22 @@ final class RelayRound(
}
)
def stream(id: String) = AnonOrScoped() { req => me =>
env.relay.api.byIdWithStudy(id) flatMap {
_ ?? { rt =>
studyC.CanView(rt.study, me) {
apiC
.GlobalConcurrencyLimitPerIP(HTTPRequest ipAddress req)(
env.relay.pgnStream.streamRoundGames(rt)
) { source =>
noProxyBuffer(Ok chunked source.keepAlive(60.seconds, () => " ") as pgnContentType)
}
.fuccess
}(Unauthorized.fuccess, Forbidden.fuccess)
}
}
}
def chapter(ts: String, rs: String, id: String, chapterId: String) =
Open { implicit ctx =>
WithRoundAndTour(ts, rs, id) { rt =>
@ -250,7 +268,7 @@ final class RelayRound(
else if (me.hasTitle || me.isVerified) 5
else 10
CreateLimitPerUser(me.id, cost = cost) {
CreateLimitPerIP(lila.common.HTTPRequest ipAddress req, cost = cost) {
CreateLimitPerIP(HTTPRequest ipAddress req, cost = cost) {
create
}(fail.fuccess)
}(fail.fuccess)

View File

@ -131,7 +131,7 @@ final class RelayTour(env: Env, apiC: => Api) extends LilaController(env) {
env.relay.api tourById TourModel.Id(id) map {
_ ?? { tour =>
apiC.GlobalConcurrencyLimitPerIP(HTTPRequest ipAddress req)(
env.relay.pgnStream(tour)
env.relay.pgnStream.exportFullTour(tour)
) { source =>
asAttachmentStream(s"${env.relay.pgnStream filename tour}.pgn")(
Ok chunked source as pgnContentType

View File

@ -213,6 +213,7 @@ GET /broadcast/round/$roundId<\w{8}>/edit controllers.RelayRound.edit(roundI
POST /broadcast/round/$roundId<\w{8}>/edit controllers.RelayRound.update(roundId: String)
POST /broadcast/round/$roundId<\w{8}>/reset controllers.RelayRound.reset(roundId: String)
POST /broadcast/round/$roundId<\w{8}>/push controllers.RelayRound.push(roundId: String)
GET /api/stream/broadcast/round/$roundId<\w{8}>.pgn controllers.RelayRound.stream(roundId: String)
# Learn
GET /learn controllers.Learn.index

View File

@ -82,7 +82,7 @@ final private class RelayFetch(
.mon(_.relay.syncTime(rt.tour.official, rt.round.slug))
.map { res =>
res -> rt.round
.withSync(_ addLog SyncLog.event(res.moves, none))
.withSync(_ addLog SyncLog.event(res.nbMoves, none))
.copy(finished = games.forall(_.end.isDefined))
}
}
@ -102,9 +102,10 @@ final private class RelayFetch(
def afterSync(result: SyncResult, rt: RelayRound.WithTour): RelayRound =
result match {
case SyncResult.Ok(0, _) => continueRelay(rt)
case SyncResult.Ok(nbMoves, _) =>
lila.mon.relay.moves(rt.tour.official, rt.round.slug).increment(nbMoves)
case result: SyncResult.Ok if result.nbMoves == 0 => continueRelay(rt)
case result: SyncResult.Ok =>
continueRelay(rt)
lila.mon.relay.moves(rt.tour.official, rt.round.slug).increment(result.nbMoves)
continueRelay(rt.round.ensureStarted.resume withTour rt.tour)
case _ => continueRelay(rt)
}

View File

@ -5,15 +5,17 @@ import org.joda.time.format.DateTimeFormat
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import lila.study.{ PgnDump, Study, StudyRepo }
import lila.study.{ Chapter, ChapterRepo, PgnDump, Study, StudyRepo }
import lila.common.Bus
final class RelayPgnStream(
roundRepo: RelayRoundRepo,
studyRepo: StudyRepo,
studyChapterRepo: ChapterRepo,
studyPgnDump: PgnDump
)(implicit ec: ExecutionContext) {
def apply(tour: RelayTour): Source[String, _] =
def exportFullTour(tour: RelayTour): Source[String, _] =
Source futureSource {
roundRepo.idsByTourOrdered(tour) flatMap { ids =>
studyRepo.byOrderedIds(ids.map(_.value).map(Study.Id)) map { studies =>
@ -30,4 +32,26 @@ final class RelayPgnStream(
val date = dateFormat.print(tour.syncedAt | tour.createdAt)
fileR.replaceAllIn(s"lichess_broadcast_${tour.slug}_${tour.id}_$date", "")
}
def streamRoundGames(rt: RelayRound.WithTourAndStudy): Source[String, _] = {
if (rt.relay.hasStarted) studyPgnDump(rt.study, flags)
else Source.empty[String]
} concat Source
.queue[Set[Chapter.Id]](8, akka.stream.OverflowStrategy.dropHead)
.mapMaterializedValue { queue =>
val chan = SyncResult busChannel rt.relay.id
val sub = Bus.subscribeFun(chan) { case SyncResult.Ok(moves, _) =>
queue
.offer(moves.collect {
case (id, nb) if nb > 0 => id
}.toSet)
.unit
}
queue.watchCompletion().foreach { _ =>
Bus.unsubscribe(sub, chan)
}
}
.flatMapConcat(studyChapterRepo.byIdsSource)
.throttle(16, 1 second)
.mapAsync(1)(studyPgnDump.ofChapter(rt.study, flags))
}

View File

@ -32,7 +32,7 @@ final class RelayPush(sync: RelaySync, api: RelayApi)(implicit
.flatMap { games =>
sync(rt, games)
.map { res =>
SyncLog.event(res.moves, none)
SyncLog.event(res.nbMoves, none)
}
.recover { case e: Exception =>
SyncLog.event(0, e.some)

View File

@ -23,18 +23,20 @@ final private class RelaySync(
case None =>
lila.common.Future.linear(games) { game =>
findCorrespondingChapter(game, chapters, games.size) match {
case Some(chapter) => updateChapter(rt.tour, study, chapter, game)
case Some(chapter) => updateChapter(rt.tour, study, chapter, game).dmap(chapter.id -> _)
case None =>
createChapter(study, game) flatMap { chapter =>
chapters.find(_.isEmptyInitial).ifTrue(chapter.order == 2).?? { initial =>
studyApi.deleteChapter(study.id, initial.id) {
actorApi.Who(study.ownerId, sri)
}
} inject chapter.root.mainline.size
} inject (chapter.id -> chapter.root.mainline.size)
}
}
} map { _.sum } flatMap { moves =>
tourRepo.setSyncedNow(rt.tour) inject SyncResult.Ok(moves, games)
} flatMap { moves =>
val result = SyncResult.Ok(moves.toMap, games)
lila.common.Bus.publish(result, SyncResult busChannel rt.round.id)
tourRepo.setSyncedNow(rt.tour) inject result
}
}
}
@ -212,7 +214,8 @@ sealed trait SyncResult {
val reportKey: String
}
object SyncResult {
case class Ok(moves: Int, games: RelayGames) extends SyncResult {
case class Ok(moves: Map[Chapter.Id, Int], games: RelayGames) extends SyncResult {
def nbMoves = moves.values.sum
val reportKey = "ok"
}
case object Timeout extends Exception with SyncResult {
@ -222,4 +225,6 @@ object SyncResult {
case class Error(msg: String) extends SyncResult {
val reportKey = "error"
}
def busChannel(roundId: RelayRound.Id) = s"relaySyncResult:$roundId"
}

View File

@ -62,6 +62,15 @@ final class ChapterRepo(val coll: AsyncColl)(implicit
}
}
def byIdsSource(ids: Iterable[Chapter.Id]): Source[Chapter, _] =
Source futureSource {
coll map {
_.find($inIds(ids))
.cursor[Chapter](readPreference = readPref)
.documentSource()
}
}
// loads all study chapters in memory!
def orderedByStudy(studyId: Study.Id): Fu[List[Chapter]] =
coll {