lila/modules/relay/src/main/RelayApi.scala

328 lines
11 KiB
Scala

package lila.relay
import akka.stream.scaladsl.Source
import org.joda.time.DateTime
import ornicar.scalalib.Zero
import play.api.libs.json._
import reactivemongo.akkastream.cursorProducer
import reactivemongo.api.bson._
import reactivemongo.api.ReadPreference
import scala.concurrent.duration._
import scala.util.chaining._
import lila.common.config.MaxPerSecond
import lila.db.dsl._
import lila.study.{ Settings, Study, StudyApi, StudyMaker, StudyMultiBoard, StudyRepo }
import lila.user.User
final class RelayApi(
roundRepo: RelayRoundRepo,
tourRepo: RelayTourRepo,
studyApi: StudyApi,
studyRepo: StudyRepo,
multiboard: StudyMultiBoard,
jsonView: JsonView,
formatApi: RelayFormatApi
)(implicit ec: scala.concurrent.ExecutionContext, mat: akka.stream.Materializer) {
import BSONHandlers._
import lila.study.BSONHandlers.{ StudyBSONHandler, StudyIdBSONHandler }
def byId(id: RelayRound.Id) = roundRepo.coll.byId[RelayRound](id.value)
def byIdWithTour(id: RelayRound.Id): Fu[Option[RelayRound.WithTour]] =
roundRepo.coll
.aggregateOne() { framework =>
import framework._
Match($id(id)) -> List(
PipelineOperator(tourRepo lookup "tourId"),
UnwindField("tour")
)
}
.map(_ flatMap readRoundWithTour)
def byIdAndContributor(id: RelayRound.Id, me: User) =
byIdWithStudy(id) map {
_ collect {
case RelayRound.WithTourAndStudy(relay, tour, study) if study.canContribute(me.id) =>
relay withTour tour
}
}
def byIdWithStudy(id: RelayRound.Id): Fu[Option[RelayRound.WithTourAndStudy]] =
byIdWithTour(id) flatMap {
_ ?? { case RelayRound.WithTour(relay, tour) =>
studyApi.byId(relay.studyId) dmap2 {
RelayRound.WithTourAndStudy(relay, tour, _)
}
}
}
def byTourOrdered(tour: RelayTour): Fu[List[RelayRound.WithTour]] =
roundRepo.byTourOrdered(tour).dmap(_.map(_ withTour tour))
def withRounds(tour: RelayTour) = roundRepo.byTourOrdered(tour).dmap(tour.withRounds)
def denormalizeTourActive(tourId: RelayTour.Id): Funit =
roundRepo.coll.exists(roundRepo.selectors.tour(tourId) ++ $doc("finished" -> false)) flatMap {
tourRepo.setActive(tourId, _)
}
def activeTourNextRound(tour: RelayTour): Fu[Option[RelayRound]] = tour.active ??
roundRepo.coll
.find($doc("tourId" -> tour.id, "finished" -> false))
.sort(roundRepo.sort.chrono)
.one[RelayRound]
def tourLastRound(tour: RelayTour): Fu[Option[RelayRound]] =
roundRepo.coll
.find($doc("tourId" -> tour.id))
.sort($doc("startedAt" -> -1, "startsAt" -> -1))
.one[RelayRound]
def officialActive: Fu[List[RelayTour.ActiveWithNextRound]] =
tourRepo.coll
.aggregateList(20) { framework =>
import framework._
Match(tourRepo.selectors.official ++ tourRepo.selectors.active) -> List(
Sort(Descending("tier")),
PipelineOperator(
$doc(
"$lookup" -> $doc(
"from" -> roundRepo.coll.name,
"as" -> "round",
"let" -> $doc("id" -> "$_id"),
"pipeline" -> $arr(
$doc(
"$match" -> $doc(
"$expr" -> $doc(
"$and" -> $arr(
$doc("$eq" -> $arr("$tourId", "$$id")),
$doc("$eq" -> $arr("$finished", false))
)
)
)
),
$doc("$addFields" -> $doc("sync.log" -> $arr())),
$doc("$sort" -> roundRepo.sort.chrono),
$doc("$limit" -> 1)
)
)
)
),
UnwindField("round")
)
}
.map { docs =>
for {
doc <- docs
tour <- doc.asOpt[RelayTour]
round <- doc.getAsOpt[RelayRound]("round")
} yield RelayTour.ActiveWithNextRound(tour, round)
}
def tourById(id: RelayTour.Id) = tourRepo.coll.byId[RelayTour](id.value)
private[relay] def toSync: Fu[List[RelayRound.WithTour]] =
fetchWithTours(
$doc(
"sync.until" $exists true,
"sync.nextAt" $lt DateTime.now
),
20
)
def fetchWithTours(query: Bdoc, maxDocs: Int, readPreference: ReadPreference = ReadPreference.primary) =
roundRepo.coll
.aggregateList(maxDocs, readPreference) { framework =>
import framework._
Match(query) -> List(
PipelineOperator(tourRepo lookup "tourId"),
UnwindField("tour")
)
}
.map(_ flatMap readRoundWithTour)
def tourCreate(data: RelayTourForm.Data, user: User): Fu[RelayTour] = {
val tour = data.make(user)
tourRepo.coll.insert.one(tour) inject tour
}
def tourUpdate(tour: RelayTour, data: RelayTourForm.Data, user: User): Funit =
tourRepo.coll.update.one($id(tour.id), data.update(tour, user)).void
def create(data: RelayRoundForm.Data, user: User, tour: RelayTour): Fu[RelayRound] =
roundRepo.lastByTour(tour) flatMap {
_ ?? { last => studyRepo.byId(last.studyId) }
} flatMap { lastStudy =>
import lila.study.{ StudyMember, StudyMembers }
val relay = data.make(user, tour)
roundRepo.coll.insert.one(relay) >>
studyApi.create(
StudyMaker.ImportGame(
id = relay.studyId.some,
name = Study.Name(relay.name).some,
settings = lastStudy
.fold(
Settings.init
.copy(
chat = Settings.UserSelection.Everyone,
sticky = false
)
)(_.settings)
.some,
from = Study.From.Relay(none).some
),
user,
_.copy(members =
lastStudy.fold(StudyMembers.empty)(_.members) + StudyMember(
id = user.id,
role = StudyMember.Role.Write
)
)
) >>
tourRepo.setActive(tour.id, true) >>
studyApi.addTopics(relay.studyId, List("Broadcast")) inject relay
}
def requestPlay(id: RelayRound.Id, v: Boolean): Funit =
WithRelay(id) { relay =>
relay.sync.upstream.flatMap(_.asUrl).map(_.withRound) foreach formatApi.refresh
update(relay) { r =>
if (v) r.withSync(_.play) else r.withSync(_.pause)
} void
}
def update(from: RelayRound)(f: RelayRound => RelayRound): Fu[RelayRound] = {
val round = f(from) pipe { r =>
if (r.sync.upstream != from.sync.upstream) r.withSync(_.clearLog) else r
}
studyApi.rename(round.studyId, Study.Name(round.name)) >> {
if (round == from) fuccess(round)
else
roundRepo.coll.update.one($id(round.id), round).void >> {
(round.sync.playing != from.sync.playing) ?? sendToContributors(
round.id,
"relaySync",
jsonView sync round
)
} >> {
(round.finished != from.finished) ?? denormalizeTourActive(round.tourId)
} >>- {
round.sync.log.events.lastOption.ifTrue(round.sync.log != from.sync.log).foreach { event =>
sendToContributors(round.id, "relayLog", JsonView.syncLogEventWrites writes event)
}
} inject round
}
}
def reset(relay: RelayRound, by: User): Funit =
studyApi.deleteAllChapters(relay.studyId, by) >>-
multiboard.invalidate(relay.studyId) >>
requestPlay(relay.id, v = true)
def deleteRound(roundId: RelayRound.Id): Fu[Option[RelayTour]] =
byIdWithTour(roundId) flatMap {
_ ?? { rt =>
roundRepo.coll.delete.one($id(rt.round.id)) >>
denormalizeTourActive(rt.tour.id) inject rt.tour.some
}
}
def getOngoing(id: RelayRound.Id): Fu[Option[RelayRound.WithTour]] =
roundRepo.coll.one[RelayRound]($doc("_id" -> id, "finished" -> false)) flatMap {
_ ?? { relay =>
tourById(relay.tourId) map2 relay.withTour
}
}
def canUpdate(user: User, tour: RelayTour): Fu[Boolean] =
fuccess(tour.ownerId == user.id) >>|
roundRepo.coll.distinctEasy[Study.Id, List]("_id", roundRepo.selectors tour tour.id).flatMap { ids =>
studyRepo.membersByIds(ids) map {
_.exists(_ contributorIds user.id)
}
}
def officialTourStream(perSecond: MaxPerSecond, nb: Int): Source[RelayTour.WithRounds, _] =
tourRepo.coll
.aggregateWith[Bdoc](readPreference = ReadPreference.secondaryPreferred) { framework =>
import framework._
List(
Match(tourRepo.selectors.official),
Sort(Descending("syncedAt")),
PipelineOperator(
$lookup.pipeline(
from = roundRepo.coll,
as = "rounds",
local = "_id",
foreign = "tourId",
pipeline = List(
$doc("$sort" -> $doc("startedAt" -> 1, "startsAt" -> 1, "name" -> 1))
)
)
)
)
}
.documentSource(nb)
.mapConcat { doc =>
doc
.asOpt[RelayTour]
.flatMap { tour =>
doc.getAsOpt[List[RelayRound]]("rounds") map tour.withRounds
}
.toList
}
.throttle(perSecond.value, 1 second)
private[relay] def autoStart: Funit =
roundRepo.coll.list[RelayRound](
$doc(
"startsAt" $lt DateTime.now.plusMinutes(30) // start 30 minutes early to fetch boards
$gt DateTime.now.minusDays(1), // bit late now
"startedAt" $exists false,
"sync.until" $exists false
)
) flatMap {
_.map { relay =>
logger.info(s"Automatically start $relay")
requestPlay(relay.id, v = true)
}.sequenceFu.void
}
private[relay] def autoFinishNotSyncing: Funit =
roundRepo.coll.list[RelayRound](
$doc(
"sync.until" $exists false,
"finished" -> false,
"startedAt" $lt DateTime.now.minusHours(3),
$or(
"startsAt" $exists false,
"startsAt" $lt DateTime.now
)
)
) flatMap {
_.map { relay =>
logger.info(s"Automatically finish $relay")
update(relay)(_.finish)
}.sequenceFu.void
}
private[relay] def WithRelay[A: Zero](id: RelayRound.Id)(f: RelayRound => Fu[A]): Fu[A] =
byId(id) flatMap { _ ?? f }
private[relay] def onStudyRemove(studyId: String) =
roundRepo.coll.delete.one($id(RelayRound.Id(studyId))).void
private def sendToContributors(id: RelayRound.Id, t: String, msg: JsObject): Funit =
studyApi members Study.Id(id.value) map {
_.map(_.contributorIds).withFilter(_.nonEmpty) foreach { userIds =>
import lila.hub.actorApi.socket.SendTos
import JsonView.roundIdWrites
import lila.socket.Socket.makeMessage
val payload = makeMessage(t, msg ++ Json.obj("id" -> id))
lila.common.Bus.publish(SendTos(userIds, payload), "socketUsers")
}
}
}