move all study collections to the study database

pull/8077/head
Thibault Duplessis 2021-02-02 22:53:47 +01:00
parent c47498053b
commit 69f33d7589
6 changed files with 189 additions and 169 deletions

View File

@ -26,7 +26,6 @@ final class Env(
fishnet: lila.hub.actors.Fishnet,
chatApi: lila.chat.ChatApi,
mongo: lila.db.Env,
mainDb: lila.db.Db,
net: lila.common.config.NetConfig,
cacheApi: lila.memo.CacheApi
)(implicit
@ -48,10 +47,10 @@ final class Env(
private val socket: StudySocket = wire[StudySocket]
lazy val studyRepo = new StudyRepo(mainDb(CollName("study")))
lazy val studyRepo = new StudyRepo(studyDb(CollName("study")))
lazy val chapterRepo = new ChapterRepo(studyDb(CollName("study_chapter_flat")))
private lazy val topicRepo = new StudyTopicRepo(mainDb(CollName("study_topic")))
private lazy val userTopicRepo = new StudyUserTopicRepo(mainDb(CollName("study_user_topic")))
private lazy val topicRepo = new StudyTopicRepo(studyDb(CollName("study_topic")))
private lazy val userTopicRepo = new StudyUserTopicRepo(studyDb(CollName("study_user_topic")))
lazy val jsonView = wire[JsonView]

View File

@ -73,11 +73,12 @@ final private class StudyInvite(
} yield invited
def admin(study: Study, user: User): Funit =
studyRepo.coll.update
.one(
$id(study.id.value),
$set(s"members.${user.id}" -> $doc("role" -> "w", "admin" -> true)) ++
$addToSet("uids" -> user.id)
)
.void
studyRepo.coll {
_.update
.one(
$id(study.id.value),
$set(s"members.${user.id}" -> $doc("role" -> "w", "admin" -> true)) ++
$addToSet("uids" -> user.id)
)
}.void
}

View File

@ -104,9 +104,9 @@ final class StudyPager(
page: Int,
nbResults: Option[Fu[Int]] = none,
hint: Option[Bdoc] = none
): Fu[Paginator[Study.WithChaptersAndLiked]] = {
): Fu[Paginator[Study.WithChaptersAndLiked]] = studyRepo.coll { coll =>
val adapter = new Adapter[Study](
collection = studyRepo.coll,
collection = coll,
selector = selector,
projection = studyRepo.projection.some,
sort = order match {

View File

@ -4,10 +4,11 @@ import org.joda.time.DateTime
import reactivemongo.akkastream.{ cursorProducer, AkkaStreamCursor }
import reactivemongo.api._
import lila.db.AsyncColl
import lila.db.dsl._
import lila.user.User
final class StudyRepo(private[study] val coll: Coll)(implicit ec: scala.concurrent.ExecutionContext) {
final class StudyRepo(private[study] val coll: AsyncColl)(implicit ec: scala.concurrent.ExecutionContext) {
import BSONHandlers._
@ -34,21 +35,21 @@ final class StudyRepo(private[study] val coll: Coll)(implicit ec: scala.concurre
"members" -> true
)
def byId(id: Study.Id) = coll.find($id(id), projection.some).one[Study]
def byId(id: Study.Id) = coll(_.find($id(id), projection.some).one[Study])
def byOrderedIds(ids: Seq[Study.Id]) = coll.byOrderedIds[Study, Study.Id](ids)(_.id)
def byOrderedIds(ids: Seq[Study.Id]) = coll(_.byOrderedIds[Study, Study.Id](ids)(_.id))
def lightById(id: Study.Id): Fu[Option[Study.LightStudy]] =
coll.find($id(id), lightProjection.some).one[Study.LightStudy]
coll(_.find($id(id), lightProjection.some).one[Study.LightStudy])
def sortedCursor(
selector: Bdoc,
sort: Bdoc,
readPreference: ReadPreference = ReadPreference.secondaryPreferred
): AkkaStreamCursor[Study] =
coll.find(selector).sort(sort).cursor[Study](readPreference)
): Fu[AkkaStreamCursor[Study]] =
coll.map(_.find(selector).sort(sort).cursor[Study](readPreference))
def exists(id: Study.Id) = coll.exists($id(id))
def exists(id: Study.Id) = coll(_.exists($id(id)))
private[study] def selectOwnerId(ownerId: User.ID) = $doc("ownerId" -> ownerId)
private[study] def selectMemberId(memberId: User.ID) = $doc(F.uids -> memberId)
@ -65,168 +66,182 @@ final class StudyRepo(private[study] val coll: Coll)(implicit ec: scala.concurre
$doc(s"members.$userId.role" -> "w")
private[study] def selectTopic(topic: StudyTopic) = $doc(F.topics -> topic)
def countByOwner(ownerId: User.ID) = coll.countSel(selectOwnerId(ownerId))
def countByOwner(ownerId: User.ID) = coll(_.countSel(selectOwnerId(ownerId)))
def insert(s: Study): Funit =
coll.insert.one {
StudyBSONHandler.writeTry(s).get ++ $doc(
F.uids -> s.members.ids,
F.likers -> List(s.ownerId),
F.rank -> Study.Rank.compute(s.likes, s.createdAt)
)
coll {
_.insert.one {
StudyBSONHandler.writeTry(s).get ++ $doc(
F.uids -> s.members.ids,
F.likers -> List(s.ownerId),
F.rank -> Study.Rank.compute(s.likes, s.createdAt)
)
}
}.void
def updateSomeFields(s: Study): Funit =
coll.update
.one(
$id(s.id),
$set(
"position" -> s.position,
"name" -> s.name,
"settings" -> s.settings,
"visibility" -> s.visibility,
"description" -> ~s.description,
"updatedAt" -> DateTime.now
coll {
_.update
.one(
$id(s.id),
$set(
"position" -> s.position,
"name" -> s.name,
"settings" -> s.settings,
"visibility" -> s.visibility,
"description" -> ~s.description,
"updatedAt" -> DateTime.now
)
)
)
.void
}.void
def updateTopics(s: Study): Funit =
coll.update
.one(
$id(s.id),
$set("topics" -> s.topics, "updatedAt" -> DateTime.now)
)
.void
coll {
_.update
.one(
$id(s.id),
$set("topics" -> s.topics, "updatedAt" -> DateTime.now)
)
}.void
def delete(s: Study): Funit = coll.delete.one($id(s.id)).void
def delete(s: Study): Funit = coll(_.delete.one($id(s.id))).void
def deleteByIds(ids: List[Study.Id]): Funit = coll.delete.one($inIds(ids)).void
def deleteByIds(ids: List[Study.Id]): Funit = coll(_.delete.one($inIds(ids))).void
def membersById(id: Study.Id): Fu[Option[StudyMembers]] =
coll.primitiveOne[StudyMembers]($id(id), "members")
coll(_.primitiveOne[StudyMembers]($id(id), "members"))
def setPosition(studyId: Study.Id, position: Position.Ref): Funit =
coll.update
.one(
$id(studyId),
$set(
"position" -> position,
"updatedAt" -> DateTime.now
coll(
_.update
.one(
$id(studyId),
$set(
"position" -> position,
"updatedAt" -> DateTime.now
)
)
)
.void
).void
def incViews(study: Study) = coll.incFieldUnchecked($id(study.id), F.views)
def incViews(study: Study) = coll.map(_.incFieldUnchecked($id(study.id), F.views))
def updateNow(s: Study) =
coll.updateFieldUnchecked($id(s.id), "updatedAt", DateTime.now)
coll.map(_.updateFieldUnchecked($id(s.id), "updatedAt", DateTime.now))
def addMember(study: Study, member: StudyMember): Funit =
coll.update
.one(
$id(study.id),
$set(s"members.${member.id}" -> member) ++ $addToSet(F.uids -> member.id)
)
.void
coll {
_.update
.one(
$id(study.id),
$set(s"members.${member.id}" -> member) ++ $addToSet(F.uids -> member.id)
)
}.void
def removeMember(study: Study, userId: User.ID): Funit =
coll.update
.one(
$id(study.id),
$unset(s"members.$userId") ++ $pull(F.uids -> userId)
)
.void
coll {
_.update
.one(
$id(study.id),
$unset(s"members.$userId") ++ $pull(F.uids -> userId)
)
}.void
def setRole(study: Study, userId: User.ID, role: StudyMember.Role): Funit =
coll.update
.one(
$id(study.id),
$set(s"members.$userId.role" -> role)
)
.void
coll {
_.update
.one(
$id(study.id),
$set(s"members.$userId.role" -> role)
)
}.void
def uids(studyId: Study.Id): Fu[Set[User.ID]] =
coll.primitiveOne[Set[User.ID]]($id(studyId), F.uids) map (~_)
coll(_.primitiveOne[Set[User.ID]]($id(studyId), F.uids)) dmap (~_)
private val idNameProjection = $doc("name" -> true)
def publicIdNames(ids: List[Study.Id]): Fu[List[Study.IdName]] =
coll.find($inIds(ids) ++ selectPublic, idNameProjection.some).cursor[Study.IdName]().list()
coll(_.find($inIds(ids) ++ selectPublic, idNameProjection.some).cursor[Study.IdName]().list())
def recentByOwner(userId: User.ID, nb: Int) =
coll
.find(selectOwnerId(userId), idNameProjection.some)
.sort($sort desc "updatedAt")
.cursor[Study.IdName](ReadPreference.secondaryPreferred)
.list(nb)
coll {
_.find(selectOwnerId(userId), idNameProjection.some)
.sort($sort desc "updatedAt")
.cursor[Study.IdName](ReadPreference.secondaryPreferred)
.list(nb)
}
// heavy AF. Only use for GDPR.
private[study] def allIdsByOwner(userId: User.ID): Fu[List[Study.Id]] =
coll.distinctEasy[Study.Id, List]("_id", selectOwnerId(userId), ReadPreference.secondaryPreferred)
coll(_.distinctEasy[Study.Id, List]("_id", selectOwnerId(userId), ReadPreference.secondaryPreferred))
def recentByContributor(userId: User.ID, nb: Int) =
coll
.find(selectContributorId(userId), idNameProjection.some)
.sort($sort desc "updatedAt")
.cursor[Study.IdName](ReadPreference.secondaryPreferred)
.list(nb)
coll {
_.find(selectContributorId(userId), idNameProjection.some)
.sort($sort desc "updatedAt")
.cursor[Study.IdName](ReadPreference.secondaryPreferred)
.list(nb)
}
def isContributor(studyId: Study.Id, userId: User.ID) =
coll.exists($id(studyId) ++ $doc(s"members.$userId.role" -> "w"))
coll(_.exists($id(studyId) ++ $doc(s"members.$userId.role" -> "w")))
def isMember(studyId: Study.Id, userId: User.ID) =
coll.exists($id(studyId) ++ (s"members.$userId" $exists true))
coll(_.exists($id(studyId) ++ (s"members.$userId" $exists true)))
def like(studyId: Study.Id, userId: User.ID, v: Boolean): Fu[Study.Likes] =
countLikes(studyId).flatMap {
case None => fuccess(Study.Likes(0))
case Some((prevLikes, createdAt)) =>
val likes = Study.Likes(prevLikes.value + (if (v) 1 else -1))
coll.update.one(
$id(studyId),
$set(
F.likes -> likes,
F.rank -> Study.Rank.compute(likes, createdAt)
) ++ {
if (v) $addToSet(F.likers -> userId) else $pull(F.likers -> userId)
}
) inject likes
coll {
_.update.one(
$id(studyId),
$set(
F.likes -> likes,
F.rank -> Study.Rank.compute(likes, createdAt)
) ++ {
if (v) $addToSet(F.likers -> userId) else $pull(F.likers -> userId)
}
) inject likes
}
}
def liked(study: Study, user: User): Fu[Boolean] =
coll.exists($id(study.id) ++ selectLiker(user.id))
coll(_.exists($id(study.id) ++ selectLiker(user.id)))
def filterLiked(user: User, studyIds: Seq[Study.Id]): Fu[Set[Study.Id]] =
studyIds.nonEmpty ??
coll.primitive[Study.Id]($inIds(studyIds) ++ selectLiker(user.id), "_id").dmap(_.toSet)
coll(_.primitive[Study.Id]($inIds(studyIds) ++ selectLiker(user.id), "_id").dmap(_.toSet))
def resetAllRanks: Fu[Int] =
coll
.find(
coll {
_.find(
$empty,
$doc(F.likes -> true, F.createdAt -> true).some
)
.cursor[Bdoc]()
.foldWhileM(0) { (count, doc) =>
~(for {
id <- doc.getAsOpt[Study.Id]("_id")
likes <- doc.getAsOpt[Study.Likes](F.likes)
createdAt <- doc.getAsOpt[DateTime](F.createdAt)
} yield coll.update
.one(
$id(id),
$set(F.rank -> Study.Rank.compute(likes, createdAt))
)
.void) inject Cursor.Cont(count + 1)
}
.cursor[Bdoc]()
.foldWhileM(0) { (count, doc) =>
~(for {
id <- doc.getAsOpt[Study.Id]("_id")
likes <- doc.getAsOpt[Study.Likes](F.likes)
createdAt <- doc.getAsOpt[DateTime](F.createdAt)
} yield coll {
_.update
.one(
$id(id),
$set(F.rank -> Study.Rank.compute(likes, createdAt))
)
}.void) inject Cursor.Cont(count + 1)
}
}
private[study] def isAdminMember(study: Study, userId: User.ID): Fu[Boolean] =
coll.exists($id(study.id) ++ $doc(s"members.$userId.admin" -> true))
coll(_.exists($id(study.id) ++ $doc(s"members.$userId.admin" -> true)))
private def countLikes(studyId: Study.Id): Fu[Option[(Study.Likes, DateTime)]] =
coll
.aggregateWith[Bdoc]() { framework =>
coll {
_.aggregateWith[Bdoc]() { framework =>
import framework._
List(
Match($id(studyId)),
@ -238,13 +253,12 @@ final class StudyRepo(private[study] val coll: Coll)(implicit ec: scala.concurre
)
)
)
}
.headOption
.map { docOption =>
for {
doc <- docOption
likes <- doc.getAsOpt[Study.Likes](F.likes)
createdAt <- doc.getAsOpt[DateTime](F.createdAt)
} yield likes -> createdAt
}
}.headOption
}.map { docOption =>
for {
doc <- docOption
likes <- doc.getAsOpt[Study.Likes](F.likes)
createdAt <- doc.getAsOpt[DateTime](F.createdAt)
} yield likes -> createdAt
}
}

View File

@ -1,11 +1,12 @@
package lila.study
import play.api.libs.json._
import reactivemongo.api._
import reactivemongo.api.bson._
import scala.concurrent.duration._
import play.api.libs.json._
import lila.common.Future
import lila.db.AsyncColl
import lila.db.dsl._
import lila.user.User
@ -52,8 +53,8 @@ object StudyTopics {
}
}
final private class StudyTopicRepo(val coll: Coll)
final private class StudyUserTopicRepo(val coll: Coll)
final private class StudyTopicRepo(val coll: AsyncColl)
final private class StudyUserTopicRepo(val coll: AsyncColl)
final class StudyTopicApi(topicRepo: StudyTopicRepo, userTopicRepo: StudyUserTopicRepo, studyRepo: StudyRepo)(
implicit
@ -64,7 +65,7 @@ final class StudyTopicApi(topicRepo: StudyTopicRepo, userTopicRepo: StudyUserTop
import BSONHandlers.{ StudyTopicBSONHandler, StudyTopicsBSONHandler }
def byId(str: String): Fu[Option[StudyTopic]] =
topicRepo.coll.byId[Bdoc](str) dmap { _ flatMap docTopic }
topicRepo.coll(_.byId[Bdoc](str)) dmap { _ flatMap docTopic }
def findLike(str: String, myId: Option[User.ID], nb: Int = 10): Fu[StudyTopics] = {
(str.lengthIs >= 2) ?? {
@ -75,21 +76,21 @@ final class StudyTopicApi(topicRepo: StudyTopicRepo, userTopicRepo: StudyUserTop
}
}
favsFu flatMap { favs =>
topicRepo.coll
.find($doc("_id".$startsWith(str, "i")))
.sort($sort.naturalAsc)
.cursor[Bdoc](ReadPreference.secondaryPreferred)
.list(nb - favs.size)
.dmap {
_ flatMap docTopic
topicRepo
.coll {
_.find($doc("_id".$startsWith(str, "i")))
.sort($sort.naturalAsc)
.cursor[Bdoc](ReadPreference.secondaryPreferred)
.list(nb - favs.size)
}
.dmap { _ flatMap docTopic }
.dmap { favs ::: _ }
}
}
} dmap StudyTopics.apply
def userTopics(userId: User.ID): Fu[StudyTopics] =
userTopicRepo.coll.byId(userId).map {
userTopicRepo.coll(_.byId(userId)).dmap {
_.flatMap(_.getAsOpt[StudyTopics]("topics")) | StudyTopics.empty
}
@ -104,31 +105,34 @@ final class StudyTopicApi(topicRepo: StudyTopicRepo, userTopicRepo: StudyUserTop
case JsSuccess(topics, _) => StudyTopics fromStrs topics.map(_.value)
case _ => StudyTopics.empty
}
userTopicRepo.coll.update
.one(
userTopicRepo.coll {
_.update.one(
$id(user.id),
$set("topics" -> topics),
upsert = true
)
.void
}.void
}
def userTopicsAdd(userId: User.ID, topics: StudyTopics): Funit =
topics.value.nonEmpty ??
userTopicRepo.coll.update
.one(
$id(userId),
$addToSet("topics" -> $doc("$each" -> topics)),
upsert = true
)
.void
userTopicRepo.coll {
_.update
.one(
$id(userId),
$addToSet("topics" -> $doc("$each" -> topics)),
upsert = true
)
}.void
def popular(nb: Int): Fu[StudyTopics] =
topicRepo.coll
.find($empty)
.sort($sort.naturalAsc)
.cursor[Bdoc]()
.list(nb)
topicRepo
.coll {
_.find($empty)
.sort($sort.naturalAsc)
.cursor[Bdoc]()
.list(nb)
}
.dmap {
_ flatMap docTopic
}
@ -151,8 +155,8 @@ final class StudyTopicApi(topicRepo: StudyTopicRepo, userTopicRepo: StudyUserTop
}.unit
private def recomputeNow: Funit =
studyRepo.coll
.aggregateWith[Bdoc]() { framework =>
studyRepo.coll {
_.aggregateWith[Bdoc]() { framework =>
import framework._
List(
Match(
@ -165,9 +169,8 @@ final class StudyTopicApi(topicRepo: StudyTopicRepo, userTopicRepo: StudyUserTop
UnwindField("topics"),
SortByFieldCount("topics"),
Project($doc("_id" -> true)),
Out(topicRepo.coll.name)
Out(topicRepo.coll.name.value)
)
}
.headOption
.void
}.headOption
}.void
}

View File

@ -2,12 +2,12 @@ package lila.studySearch
import akka.actor._
import akka.stream.scaladsl._
import chess.format.pgn.Tag
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import play.api.libs.json._
import scala.concurrent.duration._
import chess.format.pgn.Tag
import lila.hub.LateMultiThrottler
import lila.search._
import lila.study.{ Chapter, ChapterRepo, RootOrNode, Study, StudyRepo }
@ -122,12 +122,15 @@ final class StudySearchApi(
logger.info(s"Index to ${c.index.name} since $since")
val retryLogger = logger.branch("index")
import lila.db.dsl._
studyRepo
.sortedCursor(
$doc("createdAt" $gte since),
sort = $sort asc "createdAt"
)
.documentSource()
Source
.futureSource {
studyRepo
.sortedCursor(
$doc("createdAt" $gte since),
sort = $sort asc "createdAt"
)
.map(_.documentSource())
}
.via(lila.common.LilaStream.logRate[Study]("study index")(logger))
.mapAsyncUnordered(8) { study =>
lila.common.Future.retry(() => doStore(study), 5 seconds, 10, retryLogger.some)