From 9aef32ee8843bda94bdb26b6faf20a3c42010a73 Mon Sep 17 00:00:00 2001 From: Thibault Duplessis Date: Thu, 23 Sep 2021 10:18:10 +0200 Subject: [PATCH] broadcast streaming API --- app/controllers/RelayRound.scala | 22 ++++++++++++++-- app/controllers/RelayTour.scala | 2 +- conf/routes | 1 + modules/relay/src/main/RelayFetch.scala | 9 ++++--- modules/relay/src/main/RelayPgnStream.scala | 28 +++++++++++++++++++-- modules/relay/src/main/RelayPush.scala | 2 +- modules/relay/src/main/RelaySync.scala | 15 +++++++---- modules/study/src/main/ChapterRepo.scala | 9 +++++++ 8 files changed, 73 insertions(+), 15 deletions(-) diff --git a/app/controllers/RelayRound.scala b/app/controllers/RelayRound.scala index 337499a609..bb2bee4131 100644 --- a/app/controllers/RelayRound.scala +++ b/app/controllers/RelayRound.scala @@ -11,10 +11,12 @@ import lila.app._ import lila.relay.{ RelayRound => RoundModel, RelayTour => TourModel, RelayRoundForm } import lila.user.{ User => UserModel } import views._ +import lila.common.HTTPRequest final class RelayRound( env: Env, - studyC: => Study + studyC: => Study, + apiC: => Api ) extends LilaController(env) { def form(tourId: String) = @@ -161,6 +163,22 @@ final class RelayRound( } ) + def stream(id: String) = AnonOrScoped() { req => me => + env.relay.api.byIdWithStudy(id) flatMap { + _ ?? { rt => + studyC.CanView(rt.study, me) { + apiC + .GlobalConcurrencyLimitPerIP(HTTPRequest ipAddress req)( + env.relay.pgnStream.streamRoundGames(rt) + ) { source => + noProxyBuffer(Ok chunked source.keepAlive(60.seconds, () => " ") as pgnContentType) + } + .fuccess + }(Unauthorized.fuccess, Forbidden.fuccess) + } + } + } + def chapter(ts: String, rs: String, id: String, chapterId: String) = Open { implicit ctx => WithRoundAndTour(ts, rs, id) { rt => @@ -250,7 +268,7 @@ final class RelayRound( else if (me.hasTitle || me.isVerified) 5 else 10 CreateLimitPerUser(me.id, cost = cost) { - CreateLimitPerIP(lila.common.HTTPRequest ipAddress req, cost = cost) { + CreateLimitPerIP(HTTPRequest ipAddress req, cost = cost) { create }(fail.fuccess) }(fail.fuccess) diff --git a/app/controllers/RelayTour.scala b/app/controllers/RelayTour.scala index 7bdf5b0248..885edf6de3 100644 --- a/app/controllers/RelayTour.scala +++ b/app/controllers/RelayTour.scala @@ -131,7 +131,7 @@ final class RelayTour(env: Env, apiC: => Api) extends LilaController(env) { env.relay.api tourById TourModel.Id(id) map { _ ?? { tour => apiC.GlobalConcurrencyLimitPerIP(HTTPRequest ipAddress req)( - env.relay.pgnStream(tour) + env.relay.pgnStream.exportFullTour(tour) ) { source => asAttachmentStream(s"${env.relay.pgnStream filename tour}.pgn")( Ok chunked source as pgnContentType diff --git a/conf/routes b/conf/routes index d79d2bc66f..b6bf49f5a6 100644 --- a/conf/routes +++ b/conf/routes @@ -213,6 +213,7 @@ GET /broadcast/round/$roundId<\w{8}>/edit controllers.RelayRound.edit(roundI POST /broadcast/round/$roundId<\w{8}>/edit controllers.RelayRound.update(roundId: String) POST /broadcast/round/$roundId<\w{8}>/reset controllers.RelayRound.reset(roundId: String) POST /broadcast/round/$roundId<\w{8}>/push controllers.RelayRound.push(roundId: String) +GET /api/stream/broadcast/round/$roundId<\w{8}>.pgn controllers.RelayRound.stream(roundId: String) # Learn GET /learn controllers.Learn.index diff --git a/modules/relay/src/main/RelayFetch.scala b/modules/relay/src/main/RelayFetch.scala index e35c25a6c4..85d0c29135 100644 --- a/modules/relay/src/main/RelayFetch.scala +++ b/modules/relay/src/main/RelayFetch.scala @@ -82,7 +82,7 @@ final private class RelayFetch( .mon(_.relay.syncTime(rt.tour.official, rt.round.slug)) .map { res => res -> rt.round - .withSync(_ addLog SyncLog.event(res.moves, none)) + .withSync(_ addLog SyncLog.event(res.nbMoves, none)) .copy(finished = games.forall(_.end.isDefined)) } } @@ -102,9 +102,10 @@ final private class RelayFetch( def afterSync(result: SyncResult, rt: RelayRound.WithTour): RelayRound = result match { - case SyncResult.Ok(0, _) => continueRelay(rt) - case SyncResult.Ok(nbMoves, _) => - lila.mon.relay.moves(rt.tour.official, rt.round.slug).increment(nbMoves) + case result: SyncResult.Ok if result.nbMoves == 0 => continueRelay(rt) + case result: SyncResult.Ok => + continueRelay(rt) + lila.mon.relay.moves(rt.tour.official, rt.round.slug).increment(result.nbMoves) continueRelay(rt.round.ensureStarted.resume withTour rt.tour) case _ => continueRelay(rt) } diff --git a/modules/relay/src/main/RelayPgnStream.scala b/modules/relay/src/main/RelayPgnStream.scala index 47675e35d4..f007720f18 100644 --- a/modules/relay/src/main/RelayPgnStream.scala +++ b/modules/relay/src/main/RelayPgnStream.scala @@ -5,15 +5,17 @@ import org.joda.time.format.DateTimeFormat import scala.concurrent.duration._ import scala.concurrent.ExecutionContext -import lila.study.{ PgnDump, Study, StudyRepo } +import lila.study.{ Chapter, ChapterRepo, PgnDump, Study, StudyRepo } +import lila.common.Bus final class RelayPgnStream( roundRepo: RelayRoundRepo, studyRepo: StudyRepo, + studyChapterRepo: ChapterRepo, studyPgnDump: PgnDump )(implicit ec: ExecutionContext) { - def apply(tour: RelayTour): Source[String, _] = + def exportFullTour(tour: RelayTour): Source[String, _] = Source futureSource { roundRepo.idsByTourOrdered(tour) flatMap { ids => studyRepo.byOrderedIds(ids.map(_.value).map(Study.Id)) map { studies => @@ -30,4 +32,26 @@ final class RelayPgnStream( val date = dateFormat.print(tour.syncedAt | tour.createdAt) fileR.replaceAllIn(s"lichess_broadcast_${tour.slug}_${tour.id}_$date", "") } + + def streamRoundGames(rt: RelayRound.WithTourAndStudy): Source[String, _] = { + if (rt.relay.hasStarted) studyPgnDump(rt.study, flags) + else Source.empty[String] + } concat Source + .queue[Set[Chapter.Id]](8, akka.stream.OverflowStrategy.dropHead) + .mapMaterializedValue { queue => + val chan = SyncResult busChannel rt.relay.id + val sub = Bus.subscribeFun(chan) { case SyncResult.Ok(moves, _) => + queue + .offer(moves.collect { + case (id, nb) if nb > 0 => id + }.toSet) + .unit + } + queue.watchCompletion().foreach { _ => + Bus.unsubscribe(sub, chan) + } + } + .flatMapConcat(studyChapterRepo.byIdsSource) + .throttle(16, 1 second) + .mapAsync(1)(studyPgnDump.ofChapter(rt.study, flags)) } diff --git a/modules/relay/src/main/RelayPush.scala b/modules/relay/src/main/RelayPush.scala index cb86b529b6..6eeb258878 100644 --- a/modules/relay/src/main/RelayPush.scala +++ b/modules/relay/src/main/RelayPush.scala @@ -32,7 +32,7 @@ final class RelayPush(sync: RelaySync, api: RelayApi)(implicit .flatMap { games => sync(rt, games) .map { res => - SyncLog.event(res.moves, none) + SyncLog.event(res.nbMoves, none) } .recover { case e: Exception => SyncLog.event(0, e.some) diff --git a/modules/relay/src/main/RelaySync.scala b/modules/relay/src/main/RelaySync.scala index b2a84965c3..4dd15b4323 100644 --- a/modules/relay/src/main/RelaySync.scala +++ b/modules/relay/src/main/RelaySync.scala @@ -23,18 +23,20 @@ final private class RelaySync( case None => lila.common.Future.linear(games) { game => findCorrespondingChapter(game, chapters, games.size) match { - case Some(chapter) => updateChapter(rt.tour, study, chapter, game) + case Some(chapter) => updateChapter(rt.tour, study, chapter, game).dmap(chapter.id -> _) case None => createChapter(study, game) flatMap { chapter => chapters.find(_.isEmptyInitial).ifTrue(chapter.order == 2).?? { initial => studyApi.deleteChapter(study.id, initial.id) { actorApi.Who(study.ownerId, sri) } - } inject chapter.root.mainline.size + } inject (chapter.id -> chapter.root.mainline.size) } } - } map { _.sum } flatMap { moves => - tourRepo.setSyncedNow(rt.tour) inject SyncResult.Ok(moves, games) + } flatMap { moves => + val result = SyncResult.Ok(moves.toMap, games) + lila.common.Bus.publish(result, SyncResult busChannel rt.round.id) + tourRepo.setSyncedNow(rt.tour) inject result } } } @@ -212,7 +214,8 @@ sealed trait SyncResult { val reportKey: String } object SyncResult { - case class Ok(moves: Int, games: RelayGames) extends SyncResult { + case class Ok(moves: Map[Chapter.Id, Int], games: RelayGames) extends SyncResult { + def nbMoves = moves.values.sum val reportKey = "ok" } case object Timeout extends Exception with SyncResult { @@ -222,4 +225,6 @@ object SyncResult { case class Error(msg: String) extends SyncResult { val reportKey = "error" } + + def busChannel(roundId: RelayRound.Id) = s"relaySyncResult:$roundId" } diff --git a/modules/study/src/main/ChapterRepo.scala b/modules/study/src/main/ChapterRepo.scala index a148b780dc..f755eb8fb0 100644 --- a/modules/study/src/main/ChapterRepo.scala +++ b/modules/study/src/main/ChapterRepo.scala @@ -62,6 +62,15 @@ final class ChapterRepo(val coll: AsyncColl)(implicit } } + def byIdsSource(ids: Iterable[Chapter.Id]): Source[Chapter, _] = + Source futureSource { + coll map { + _.find($inIds(ids)) + .cursor[Chapter](readPreference = readPref) + .documentSource() + } + } + // loads all study chapters in memory! def orderedByStudy(studyId: Study.Id): Fu[List[Chapter]] = coll {