migrate team search module

rm0193-mapreduce
Thibault Duplessis 2019-12-03 10:52:11 -06:00
parent 42148b0d37
commit d668a0a8d3
6 changed files with 86 additions and 79 deletions

View File

@ -2,13 +2,13 @@ package lila.setup
import chess.Mode
private trait HumanConfig extends Config {
private[setup] trait HumanConfig extends Config {
// casual or rated
val mode: Mode
}
private trait BaseHumanConfig extends BaseConfig {
private[setup] trait BaseHumanConfig extends BaseConfig {
val modes = Mode.all map (_.id)
val modeChoices = Mode.all map { e => e.id.toString -> e.toString }

View File

@ -42,7 +42,7 @@ final class TeamApi(
open = s.isOpen,
createdBy = me
)
teamRepo.coll.insert(team) >>
teamRepo.coll.insert.one(team) >>
memberRepo.add(team.id, me.id) >>- {
cached invalidateTeamIds me.id
indexer ! InsertTeam(team)
@ -59,7 +59,7 @@ final class TeamApi(
description = e.description,
open = e.isOpen
) |> { team =>
teamRepo.coll.update($id(team.id), team).void >>
teamRepo.coll.update.one($id(team.id), team).void >>
!team.isCreator(me.id) ?? {
modLog.teamEdit(me.id, team.createdBy, team.name)
} >>-
@ -120,12 +120,12 @@ final class TeamApi(
requestable(team, user) flatMap {
_ ?? {
val request = Request.make(team = team.id, user = user.id, message = setup.message)
requestRepo.coll.insert(request).void >>- (cached.nbRequests invalidate team.createdBy)
requestRepo.coll.insert.one(request).void >>- (cached.nbRequests invalidate team.createdBy)
}
}
def processRequest(team: Team, request: Request, accept: Boolean): Funit = for {
_ <- requestRepo.coll.remove(request)
_ <- requestRepo.coll.delete.one(request)
_ = cached.nbRequests invalidate team.createdBy
userOption <- userRepo byId request.user
_ <- userOption.filter(_ => accept).??(user =>
@ -191,7 +191,7 @@ final class TeamApi(
// delete for ever, with members but not forums
def delete(team: Team): Funit =
teamRepo.coll.remove($id(team.id)) >>
teamRepo.coll.delete.one($id(team.id)) >>
memberRepo.removeByteam(team.id) >>-
(indexer ! RemoveTeam(team.id))
@ -210,7 +210,7 @@ final class TeamApi(
teamRepo.coll.distinctEasy[Team.ID, Set]("_id", $doc("_id" $in ids))
def autocomplete(term: String, max: Int): Fu[List[Team]] =
teamRepo.coll.find($doc(
teamRepo.coll.ext.find($doc(
"name".$startsWith(java.util.regex.Pattern.quote(term), "i"),
"enabled" -> true
)).sort($sort desc "nbMembers").list[Team](max, ReadPreference.secondaryPreferred)
@ -218,7 +218,7 @@ final class TeamApi(
def nbRequests(teamId: Team.ID) = cached.nbRequests get teamId
def recomputeNbMembers =
teamRepo.coll.find($empty).cursor[Team](ReadPreference.secondaryPreferred).foldWhileM({}) { (_, team) =>
teamRepo.coll.ext.find($empty).cursor[Team](ReadPreference.secondaryPreferred).foldWhileM({}) { (_, team) =>
for {
nb <- memberRepo.countByTeam(team.id)
_ <- teamRepo.coll.updateField($id(team.id), "nbMembers", nb)

View File

@ -1,13 +1,14 @@
package lila.team
import org.joda.time.{ DateTime, Period }
import reactivemongo.akkastream.{ AkkaStreamCursor, cursorProducer }
import reactivemongo.api._
import reactivemongo.api.bson._
import lila.db.dsl._
import lila.user.User
private final class TeamRepo(val coll: Coll) {
final class TeamRepo(val coll: Coll) {
import BSONHandlers._
@ -35,14 +36,14 @@ private final class TeamRepo(val coll: Coll) {
coll.primitiveOne[String]($id(teamId), "createdBy")
def incMembers(teamId: String, by: Int): Funit =
coll.update($id(teamId), $inc("nbMembers" -> by)).void
coll.update.one($id(teamId), $inc("nbMembers" -> by)).void
def enable(team: Team) = coll.updateField($id(team.id), "enabled", true)
def disable(team: Team) = coll.updateField($id(team.id), "enabled", false)
def addRequest(teamId: String, request: Request): Funit =
coll.update(
coll.update.one(
$id(teamId) ++ $doc("requests.user" $ne request.user),
$push("requests", request.user)
).void
@ -50,6 +51,10 @@ private final class TeamRepo(val coll: Coll) {
def changeOwner(teamId: String, newOwner: User.ID) =
coll.updateField($id(teamId), "createdBy", newOwner)
def cursor =
coll.ext.find($doc("enabled" -> true))
.cursor[Team](ReadPreference.secondaryPreferred)
val enabledQuery = $doc("enabled" -> true)
val sortPopular = $sort desc "nbMembers"

View File

@ -1,23 +1,36 @@
package lila.teamSearch
import akka.actor._
import com.typesafe.config.Config
import com.softwaremill.macwire._
import io.methvin.play.autoconfig._
import play.api.Configuration
import lila.common.config._
import lila.search._
@Module
private class RoundConfig(
@ConfigName("index.name") val indexName: String,
@ConfigName("paginator.max_per_page") val maxPerPage: MaxPerPage,
@ConfigName("actor.name") val actorName: String
)
final class Env(
config: Config,
appConfig: Configuration,
makeClient: Index => ESClient,
system: ActorSystem
teamRepo: lila.team.TeamRepo
)(implicit
system: ActorSystem,
mat: akka.stream.Materializer
) {
private val IndexName = config getString "index"
private val PaginatorMaxPerPage = config getInt "paginator.max_per_page"
private val ActorName = config getString "actor.name"
private val config = appConfig.get[RoundConfig]("round")(AutoConfig.loader)
private val client = makeClient(Index(IndexName))
private lazy val client = makeClient(Index(config.indexName))
val api = new TeamSearchApi(client)
private lazy val paginatorBuilder = wire[lila.search.PaginatorBuilder[lila.team.Team, Query]]
lazy val api: TeamSearchApi = wire[TeamSearchApi]
def apply(text: String, page: Int) = paginatorBuilder(Query(text), page)
@ -27,25 +40,11 @@ final class Env(
}
}
private lazy val paginatorBuilder = new lila.search.PaginatorBuilder[lila.team.Team, Query](
searchApi = api,
maxPerPage = lila.common.MaxPerPage(PaginatorMaxPerPage)
)
system.actorOf(Props(new Actor {
import lila.team.actorApi._
def receive = {
case InsertTeam(team) => api store team
case RemoveTeam(id) => client deleteById Id(id)
}
}), name = ActorName)
}
object Env {
lazy val current = "teamSearch" boot new Env(
config = lila.common.PlayApp loadConfig "teamSearch",
makeClient = lila.search.Env.current.makeClient,
system = lila.common.PlayApp.system
)
}), name = config.actorName)
}

View File

@ -1,15 +1,25 @@
package lila.teamSearch
import akka.stream.scaladsl._
import play.api.libs.json._
import reactivemongo.akkastream.{ AkkaStreamCursor, cursorProducer }
import reactivemongo.api._
import lila.db.dsl._
import lila.search._
import lila.team.{ Team, TeamRepo }
import play.api.libs.json._
final class TeamSearchApi(client: ESClient) extends SearchReadApi[Team, Query] {
final class TeamSearchApi(
client: ESClient,
teamRepo: TeamRepo
)(implicit
system: akka.actor.ActorSystem,
mat: akka.stream.Materializer
) extends SearchReadApi[Team, Query] {
def search(query: Query, from: From, size: Size) =
client.search(query, from, size) flatMap { res =>
TeamRepo byOrderedIds res.ids
teamRepo byOrderedIds res.ids
}
def count(query: Query) = client.count(query) map (_.count)
@ -25,29 +35,22 @@ final class TeamSearchApi(client: ESClient) extends SearchReadApi[Team, Query] {
def reset = client match {
case c: ESClientHttp => c.putMapping >> {
import play.api.libs.iteratee._
import reactivemongo.api.ReadPreference
import reactivemongo.play.iteratees.cursorProducer
import lila.db.dsl._
logger.info(s"Index to ${c.index.name}")
val batchSize = 200
val maxEntries = Int.MaxValue
TeamRepo.cursor(
selector = $doc("enabled" -> true),
readPreference = ReadPreference.secondaryPreferred
)
.enumerator(maxEntries) &>
Enumeratee.grouped(Iteratee takeUpTo batchSize) |>>>
Iteratee.foldM[Seq[Team], Int](0) {
case (nb, teams) =>
c.storeBulk(teams.toList map (t => Id(t.id) -> toDoc(t))) inject {
logger.info(s"Indexed $nb teams")
nb + teams.size
}
teamRepo.cursor
.documentSource()
.map(t => Id(t.id) -> toDoc(t))
.grouped(200)
.mapAsyncUnordered(1) { teams =>
c.storeBulk(teams) inject teams.size
}
.fold(0)((acc, nb) => acc + nb)
.wireTap { nb =>
if (nb % 5 == 0) logger.info(s"Indexing teams... $nb")
}
.to(Sink.ignore)
.run
} >> client.refresh
case _ => funit
}

View File

@ -26,7 +26,7 @@ final class PlayerRepo(coll: Coll) {
def byId(id: Tournament.ID): Fu[Option[Player]] = coll.uno[Player](selectId(id))
private[tournament] def bestByTour(tourId: Tournament.ID, nb: Int, skip: Int = 0): Fu[List[Player]] =
coll.find(selectTour(tourId)).sort(bestSort).skip(skip).list[Player](nb)
coll.ext.find(selectTour(tourId)).sort(bestSort).skip(skip).list[Player](nb)
private[tournament] def bestByTourWithRank(tourId: Tournament.ID, nb: Int, skip: Int = 0): Fu[RankedPlayers] =
bestByTour(tourId, nb, skip).map { res =>
@ -87,22 +87,22 @@ final class PlayerRepo(coll: Coll) {
private[tournament] def teamInfo(tourId: Tournament.ID, teamId: TeamId, battle: TeamBattle): Fu[TeamBattle.TeamInfo] = {
coll.aggregateWith[Bdoc]() { framework =>
import framework._
Match(selectTour(tourId) ++ $doc("t" -> teamId)) ->
List(
Sort(Descending("m")),
Facet($doc(
"agg" -> $arr($doc(
"$group" -> $doc(
"_id" -> BSONNull,
"nb" -> $doc("$sum" -> 1),
"rating" -> $doc("$avg" -> "$r"),
"perf" -> $doc("$avg" -> "$e"),
"score" -> $doc("$avg" -> "$s")
)
)),
"topPlayers" -> $arr($doc("$limit" -> 50))
))
)
Match(selectTour(tourId) ++ $doc("t" -> teamId)) -> List(
Sort(Descending("m")),
Facet(List(
"agg" -> {
Group(BSONNull)(
"nb" -> SumAll,
"rating" -> AvgField("r"),
"perf" -> AvgField("e"),
"score" -> AvgField("s")
) -> Nil
},
"topPlayers" -> {
Limit(50) -> Nil
}
))
)
}.headOption.map {
_.flatMap { doc =>
for {
@ -119,13 +119,13 @@ final class PlayerRepo(coll: Coll) {
}
def bestTeamPlayers(tourId: Tournament.ID, teamId: TeamId, nb: Int): Fu[List[Player]] =
coll.find($doc("tid" -> tourId, "t" -> teamId)).sort($sort desc "m").list[Player](nb)
coll.ext.find($doc("tid" -> tourId, "t" -> teamId)).sort($sort desc "m").list[Player](nb)
def countTeamPlayers(tourId: Tournament.ID, teamId: TeamId): Fu[Int] =
coll.countSel($doc("tid" -> tourId, "t" -> teamId))
def teamsOfPlayers(tourId: Tournament.ID, userIds: List[User.ID]): Fu[List[(User.ID, TeamId)]] =
coll.find($doc("tid" -> tourId, "uid" $in userIds), $doc("_id" -> false, "uid" -> true, "t" -> true))
coll.ext.find($doc("tid" -> tourId, "uid" $in userIds), $doc("_id" -> false, "uid" -> true, "t" -> true))
.list[Bdoc]()
.map {
_.flatMap { doc =>
@ -140,10 +140,10 @@ final class PlayerRepo(coll: Coll) {
def count(tourId: Tournament.ID): Fu[Int] = coll.countSel(selectTour(tourId))
def removeByTour(tourId: Tournament.ID) = coll.remove(selectTour(tourId)).void
def removeByTour(tourId: Tournament.ID) = coll.delete.one(selectTour(tourId)).void
def remove(tourId: Tournament.ID, userId: User.ID) =
coll.remove(selectTourUser(tourId, userId)).void
coll.delete.one(selectTourUser(tourId, userId)).void
def filterExists(tourIds: List[Tournament.ID], userId: User.ID): Fu[List[Tournament.ID]] =
coll.primitive[Tournament.ID]($doc(