lila/modules/team/src/main/TeamMemberStream.scala

44 lines
1.2 KiB
Scala
Raw Permalink Normal View History

2018-05-07 16:14:38 -06:00
package lila.team
2019-12-02 21:31:35 -07:00
import akka.stream.scaladsl._
2019-12-08 10:35:26 -07:00
import reactivemongo.akkastream.cursorProducer
2018-05-07 16:14:38 -06:00
import reactivemongo.api.ReadPreference
import scala.concurrent.duration._
2019-12-02 21:31:35 -07:00
import lila.common.config.MaxPerSecond
2018-05-07 16:14:38 -06:00
import lila.db.dsl._
import lila.user.{ User, UserRepo }
2019-12-02 21:31:35 -07:00
final class TeamMemberStream(
memberRepo: MemberRepo,
userRepo: UserRepo
2020-06-24 03:37:18 -06:00
)(implicit
ec: scala.concurrent.ExecutionContext,
mat: akka.stream.Materializer
) {
2018-05-07 16:14:38 -06:00
2019-12-02 21:31:35 -07:00
def apply(team: Team, perSecond: MaxPerSecond): Source[User, _] =
2020-04-24 19:49:50 -06:00
idsBatches(team, perSecond)
.mapAsync(1)(userRepo.usersFromSecondary)
.mapConcat(identity)
2020-07-10 09:06:46 -06:00
def subscribedIds(team: Team, perSecond: MaxPerSecond): Source[User.ID, _] =
idsBatches(team, perSecond, $doc("unsub" $ne true))
2020-04-24 19:49:50 -06:00
.mapConcat(identity)
2020-07-10 09:06:46 -06:00
private def idsBatches(
team: Team,
perSecond: MaxPerSecond,
selector: Bdoc = $empty
): Source[Seq[User.ID], _] =
memberRepo.coll
.find($doc("team" -> team.id) ++ selector, $doc("user" -> true).some)
2018-05-07 16:14:38 -06:00
.sort($sort desc "date")
2019-12-02 21:31:35 -07:00
.batchSize(perSecond.value)
.cursor[Bdoc](ReadPreference.secondaryPreferred)
.documentSource()
.grouped(perSecond.value)
2019-12-04 23:52:53 -07:00
.map(_.flatMap(_.getAsOpt[User.ID]("user")))
2019-12-08 19:54:14 -07:00
.throttle(1, 1 second)
2018-05-07 16:14:38 -06:00
}