Update DB module

pull/7020/head
Cédric Chantepie 2020-07-19 15:11:06 +02:00
parent c02a0e8eda
commit 8fe45e5698
7 changed files with 83 additions and 46 deletions

View File

@ -3,7 +3,7 @@ package lila.db
import org.joda.time.DateTime
import ornicar.scalalib.Zero
import reactivemongo.api.bson._
import reactivemongo.api.bson.compat._
import scala.util.{ Success, Try }
import dsl._

View File

@ -5,7 +5,7 @@ import scala.annotation.nowarn
import reactivemongo.api._
import reactivemongo.api.bson._
import reactivemongo.api.commands.{ WriteConcern => CWC }
import reactivemongo.api.{ WriteConcern => CWC }
trait CollExt { self: dsl with QueryBuilderExt =>
@ -16,12 +16,12 @@ trait CollExt { self: dsl with QueryBuilderExt =>
def ext = this
def find(selector: Bdoc) = coll.find(selector, none)
def find(selector: Bdoc) = coll.find(selector, none[Bdoc])
def find(selector: Bdoc, proj: Bdoc) = coll.find(selector, proj.some)
def one[D: BSONDocumentReader](selector: Bdoc): Fu[Option[D]] =
coll.find(selector, none).one[D]
coll.find(selector, none[Bdoc]).one[D]
def one[D: BSONDocumentReader](selector: Bdoc, projection: Bdoc): Fu[Option[D]] =
coll.find(selector, projection.some).one[D]
@ -30,15 +30,17 @@ trait CollExt { self: dsl with QueryBuilderExt =>
selector: Bdoc,
readPreference: ReadPreference = ReadPreference.primary
): Fu[List[D]] =
coll.find(selector, none).list[D](Int.MaxValue, readPreference = readPreference)
coll.find(selector, none[Bdoc]).
cursor[D](readPreference).list(Int.MaxValue)
def list[D: BSONDocumentReader](selector: Bdoc, limit: Int): Fu[List[D]] =
coll.find(selector, none).list[D](limit = limit)
coll.find(selector, none[Bdoc]).cursor[D]().list(limit = limit)
def byId[D: BSONDocumentReader, I: BSONWriter](id: I): Fu[Option[D]] =
one[D]($id(id))
def byId[D: BSONDocumentReader](id: String): Fu[Option[D]] = one[D]($id(id))
def byId[D: BSONDocumentReader](id: String): Fu[Option[D]] = one[D]($id(id))
def byId[D: BSONDocumentReader](id: String, projection: Bdoc): Fu[Option[D]] = one[D]($id(id), projection)
def byIds[D: BSONDocumentReader, I: BSONWriter](
@ -103,8 +105,8 @@ trait CollExt { self: dsl with QueryBuilderExt =>
.fold(find($inIds(ids))) { proj =>
find($inIds(ids), proj)
}
.cursor[D](readPreference = readPreference)
.collect[List](Int.MaxValue, err = Cursor.FailOnError[List[D]]())
.cursor[D](readPreference)
.collect[List](Int.MaxValue)
.map {
_.view.map(u => docId(u) -> u).toMap
}
@ -119,7 +121,8 @@ trait CollExt { self: dsl with QueryBuilderExt =>
def primitive[V: BSONReader](selector: Bdoc, field: String): Fu[List[V]] =
find(selector, $doc(field -> true))
.list[Bdoc]()
.cursor[Bdoc]()
.list()
.dmap {
_ flatMap { _.getAsOpt[V](field) }
}
@ -127,7 +130,8 @@ trait CollExt { self: dsl with QueryBuilderExt =>
def primitive[V: BSONReader](selector: Bdoc, sort: Bdoc, field: String): Fu[List[V]] =
find(selector, $doc(field -> true))
.sort(sort)
.list[Bdoc]()
.cursor[Bdoc]()
.list()
.dmap {
_ flatMap { _.getAsOpt[V](field) }
}
@ -135,7 +139,8 @@ trait CollExt { self: dsl with QueryBuilderExt =>
def primitive[V: BSONReader](selector: Bdoc, sort: Bdoc, nb: Int, field: String): Fu[List[V]] =
(nb > 0) ?? find(selector, $doc(field -> true))
.sort(sort)
.list[Bdoc](nb)
.cursor[Bdoc]()
.list(nb)
.dmap {
_ flatMap { _.getAsOpt[V](field) }
}
@ -161,7 +166,8 @@ trait CollExt { self: dsl with QueryBuilderExt =>
fieldExtractor: Bdoc => Option[V]
): Fu[Map[I, V]] =
find($inIds(ids), $doc(field -> true))
.list[Bdoc]()
.cursor[Bdoc]()
.list()
.dmap {
_ flatMap { obj =>
obj.getAsOpt[I]("_id") flatMap { id =>
@ -203,8 +209,11 @@ trait CollExt { self: dsl with QueryBuilderExt =>
.aggregateWith[Bdoc](
allowDiskUse = allowDiskUse,
readPreference = readPreference
)(f)
.collect[List](maxDocs = maxDocs, Cursor.FailOnError[List[Bdoc]]())
)(agg => {
val nonEmpty = f(agg)
nonEmpty._1 +: nonEmpty._2
})
.collect[List](maxDocs = maxDocs)
def aggregateOne(
readPreference: ReadPreference = ReadPreference.primary,
@ -216,9 +225,12 @@ trait CollExt { self: dsl with QueryBuilderExt =>
.aggregateWith[Bdoc](
allowDiskUse = allowDiskUse,
readPreference = readPreference
)(f)
.collect[List](maxDocs = 1, Cursor.FailOnError[List[Bdoc]]())
.dmap(_.headOption)
)(agg => {
val nonEmpty = f(agg)
nonEmpty._1 +: nonEmpty._2
})
.collect[List](maxDocs = 1)
.dmap(_.headOption) // .one[Bdoc] ?
def aggregateExists(
readPreference: ReadPreference = ReadPreference.primary,
@ -230,7 +242,10 @@ trait CollExt { self: dsl with QueryBuilderExt =>
.aggregateWith[Bdoc](
allowDiskUse = allowDiskUse,
readPreference = readPreference
)(f)
)(agg => {
val nonEmpty = f(agg)
nonEmpty._1 +: nonEmpty._2
})
.headOption
.dmap(_.isDefined)

View File

@ -1,8 +1,7 @@
package lila.db
import reactivemongo.api._
import reactivemongo.api.commands.Command
import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.concurrent.Future
@ -21,7 +20,7 @@ final class AsyncDb(
driver.connect(parsedUri, name.some).dmap(_ -> parsedUri.db)
}
private def db: Future[DefaultDB] =
private def db: Future[DB] =
connection flatMap {
case (conn, dbName) => conn database dbName.getOrElse("lichess")
}
@ -37,7 +36,7 @@ final class Db(
private val logger = lila.db.logger branch name
private lazy val db: DefaultDB = Chronometer.syncEffect(
private lazy val db: DB = Chronometer.syncEffect(
MongoConnection
.fromString(uri)
.flatMap { parsedUri =>
@ -52,9 +51,8 @@ final class Db(
def apply(name: CollName): Coll = db(name.value)
val runCommand = new RunCommand((command, readPreference) => {
val pack = reactivemongo.api.bson.collection.BSONSerializationPack
@nowarn val runner = Command.run(pack, FailoverStrategy.strict)
runner(db, runner.rawCommand(command)).one[dsl.Bdoc](readPreference)
val runCommand = new RunCommand({ (command, readPreference) =>
db.runCommand(command, FailoverStrategy.strict)
.one[dsl.Bdoc](readPreference)
})
}

View File

@ -38,10 +38,11 @@ final class Adapter[A: BSONDocumentReader](
.pipe { query =>
hint match {
case None => query
case Some(h) => query hint h
case Some(h) => query.hint(collection hint h)
}
}
.list[A](length, readPreference)
.cursor[A](readPreference)
.list(length)
}
/*
@ -68,7 +69,8 @@ final class MapReduceAdapter[A: BSONDocumentReader](
.find(selector, $id(true).some)
.sort(sort)
.skip(offset)
.list[Bdoc](length, readPreference)
.cursor[Bdoc](readPreference)
.list(length)
.dmap { _ flatMap { _.getAsOpt[BSONString]("_id") } }
.flatMap { ids =>
runCommand(

View File

@ -1,40 +1,62 @@
package lila.db
import scala.annotation.nowarn
import reactivemongo.api._
import scala.collection.Factory
import scala.concurrent.ExecutionContext
import reactivemongo.api._ /*,
bson.BSONDocumentReader,
bson.collection.BSONSerializationPack,
collections.QueryBuilderFactory*/
trait QueryBuilderExt { self: dsl =>
implicit final class ExtendCursor[A](cursor: Cursor.WithOps[A])(implicit ec: ExecutionContext) { // CursorProducer?
implicit final class ExtendQueryBuilder[P <: SerializationPack](
@nowarn b: collections.GenericQueryBuilder[P]
)(implicit ec: scala.concurrent.ExecutionContext) {
def gather[M[_]](upTo: Int)(implicit factory: Factory[A, M[A]]): Fu[M[A]] =
cursor.collect[M](upTo, Cursor.ContOnError[M[A]]())
def list(): Fu[List[A]] =
gather[List](Int.MaxValue)
def list(limit: Int): Fu[List[A]] =
gather[List](limit)
def list(limit: Option[Int]): Fu[List[A]] =
gather[List](limit | Int.MaxValue)
def vector(limit: Int): Fu[Vector[A]] =
gather[Vector](limit)
}
/*
implicit final class ExtendQueryBuilder[B <: QueryBuilderFactory[BSONSerializationPack.type]#QueryBuilder](b: B)(implicit ec: ExecutionContext) {
// like collect, but with stopOnError defaulting to false
def gather[A, M[_]](upTo: Int, readPreference: ReadPreference = ReadPreference.primary)(implicit
factory: Factory[A, M[A]],
reader: b.pack.Reader[A],
reader: BSONDocumentReader[A],
cp: CursorProducer[A]
): Fu[M[A]] =
b.cursor[A](readPreference = readPreference)(reader, cp)
.collect[M](upTo, Cursor.ContOnError[M[A]]())
def list[A: b.pack.Reader](): Fu[List[A]] =
def list[A: BSONDocumentReader](): Fu[List[A]] =
gather[A, List](Int.MaxValue)
def list[A: b.pack.Reader](limit: Int): Fu[List[A]] =
def list[A: BSONDocumentReader](limit: Int): Fu[List[A]] =
gather[A, List](limit)
def list[A: b.pack.Reader](limit: Option[Int]): Fu[List[A]] =
def list[A: BSONDocumentReader](limit: Option[Int]): Fu[List[A]] =
gather[A, List](limit | Int.MaxValue)
def list[A: b.pack.Reader](limit: Option[Int], readPreference: ReadPreference): Fu[List[A]] =
def list[A: BSONDocumentReader](limit: Option[Int], readPreference: ReadPreference): Fu[List[A]] =
gather[A, List](limit | Int.MaxValue, readPreference)
def list[A: b.pack.Reader](limit: Int, readPreference: ReadPreference): Fu[List[A]] =
def list[A: BSONDocumentReader](limit: Int, readPreference: ReadPreference): Fu[List[A]] =
gather[A, List](limit, readPreference)
def vector[A: b.pack.Reader](limit: Int, readPreference: ReadPreference): Fu[Vector[A]] =
def vector[A: BSONDocumentReader](limit: Int, readPreference: ReadPreference): Fu[Vector[A]] =
gather[A, Vector](limit, readPreference)
}
*/
}

View File

@ -17,7 +17,7 @@
package lila.db
import ornicar.scalalib.Zero
import reactivemongo.api._
import reactivemongo.api.bson._
trait dsl {

View File

@ -46,9 +46,9 @@ object Dependencies {
}
object reactivemongo {
val versionFix = "0.20.12-fix1"
val version = "0.20.12"
val driver = "org.reactivemongo" %% "reactivemongo" % versionFix
val version = "1.0.0-rc.2"
val driver = "org.reactivemongo" %% "reactivemongo" % version
val stream = "org.reactivemongo" %% "reactivemongo-akkastream" % version
val epoll = "org.reactivemongo" % "reactivemongo-shaded-native" % s"$version-linux-x86-64"
def bundle = Seq(driver, stream)