broadcast round stream API event on tag update

pull/9890/head^2
Thibault Duplessis 2021-09-28 08:15:19 +02:00
parent fa0f8540cd
commit d659e5e3ba
2 changed files with 18 additions and 16 deletions

View File

@ -40,11 +40,9 @@ final class RelayPgnStream(
.queue[Set[Chapter.Id]](8, akka.stream.OverflowStrategy.dropHead)
.mapMaterializedValue { queue =>
val chan = SyncResult busChannel rt.relay.id
val sub = Bus.subscribeFun(chan) { case SyncResult.Ok(moves, _) =>
val sub = Bus.subscribeFun(chan) { case SyncResult.Ok(chapters, _) =>
queue
.offer(moves.collect {
case (id, nb) if nb > 0 => id
}.toSet)
.offer(chapters.view.filter(c => c.tagUpdate || c.newMoves > 0).map(_.id).toSet)
.unit
}
queue.watchCompletion().foreach { _ =>

View File

@ -23,18 +23,18 @@ final private class RelaySync(
case None =>
lila.common.Future.linear(games) { game =>
findCorrespondingChapter(game, chapters, games.size) match {
case Some(chapter) => updateChapter(rt.tour, study, chapter, game).dmap(chapter.id -> _)
case Some(chapter) => updateChapter(rt.tour, study, chapter, game)
case None =>
createChapter(study, game) flatMap { chapter =>
chapters.find(_.isEmptyInitial).ifTrue(chapter.order == 2).?? { initial =>
studyApi.deleteChapter(study.id, initial.id) {
actorApi.Who(study.ownerId, sri)
}
} inject (chapter.id -> chapter.root.mainline.size)
} inject SyncResult.ChapterResult(chapter.id, true, chapter.root.mainline.size)
}
}
} flatMap { moves =>
val result = SyncResult.Ok(moves.toMap, games)
} flatMap { chapterUpdates =>
val result = SyncResult.Ok(chapterUpdates.toList, games)
lila.common.Bus.publish(result, SyncResult busChannel rt.round.id)
tourRepo.setSyncedNow(rt.tour) inject result
}
@ -61,9 +61,11 @@ final private class RelaySync(
study: Study,
chapter: Chapter,
game: RelayGame
): Fu[NbMoves] =
updateChapterTags(tour, study, chapter, game) >>
updateChapterTree(study, chapter, game)
): Fu[SyncResult.ChapterResult] =
updateChapterTags(tour, study, chapter, game) zip
updateChapterTree(study, chapter, game) map { case (tagUpdate, nbMoves) =>
SyncResult.ChapterResult(chapter.id, tagUpdate, nbMoves)
}
private def updateChapterTree(study: Study, chapter: Chapter, game: RelayGame): Fu[NbMoves] = {
val who = actorApi.Who(chapter.ownerId, sri)
@ -121,9 +123,9 @@ final private class RelaySync(
study: Study,
chapter: Chapter,
game: RelayGame
): Funit = {
): Fu[Boolean] = {
val gameTags = game.tags.value.foldLeft(Tags(Nil)) { case (newTags, tag) =>
if (!chapter.tags.value.exists(tag ==)) newTags + tag
if (!chapter.tags.value.has(tag)) newTags + tag
else newTags
}
val newEndTag = game.end
@ -144,7 +146,7 @@ final private class RelaySync(
)(actorApi.Who(chapter.ownerId, sri)) >> {
val newEnd = chapter.tags.resultColor.isEmpty && tags.resultColor.isDefined
newEnd ?? onChapterEnd(tour, study, chapter)
}
} inject true
}
}
@ -214,8 +216,8 @@ sealed trait SyncResult {
val reportKey: String
}
object SyncResult {
case class Ok(moves: Map[Chapter.Id, Int], games: RelayGames) extends SyncResult {
def nbMoves = moves.values.sum
case class Ok(chapters: List[ChapterResult], games: RelayGames) extends SyncResult {
def nbMoves = chapters.foldLeft(0)(_ + _.newMoves)
val reportKey = "ok"
}
case object Timeout extends Exception with SyncResult {
@ -226,5 +228,7 @@ object SyncResult {
val reportKey = "error"
}
case class ChapterResult(id: Chapter.Id, tagUpdate: Boolean, newMoves: Int)
def busChannel(roundId: RelayRound.Id) = s"relaySyncResult:$roundId"
}