implement CollExt.distinctWithReadPreference

pull/2871/head
Thibault Duplessis 2017-03-30 12:25:04 +02:00
parent deb8f9940f
commit 9c66d78ea5
1 changed files with 23 additions and 1 deletions

View File

@ -1,6 +1,8 @@
package lila.db
import scala.collection.breakOut
import scala.collection.generic.CanBuildFrom
import scala.util.{ Success, Failure }
import reactivemongo.api._
import reactivemongo.api.collections.bson.BSONBatchCommands._
@ -122,6 +124,9 @@ trait CollExt { self: dsl with QueryBuilderExt =>
}
}
// sadly we can't access the connection metadata
private val mongoWireVersion = MongoWireVersion.V32
// because mongodb collection.aggregate doesn't have the readPreference argument!
def aggregateWithReadPreference(
firstOperator: AggregationFramework.PipelineOperator,
@ -133,10 +138,27 @@ trait CollExt { self: dsl with QueryBuilderExt =>
firstOperator :: otherOperators,
allowDiskUse = false,
cursor = None,
wireVersion = MongoWireVersion.V32, // sadly we can't access the connection metadata
wireVersion = mongoWireVersion,
bypassDocumentValidation = false,
readConcern = None
), readPreference).map(_.value)
}
def distinctWithReadPreference[T, M[_] <: Iterable[_]](
key: String,
selector: Option[Bdoc],
readPreference: ReadPreference
)(implicit reader: BSONValueReader[T], cbf: CanBuildFrom[M[_], T, M[T]]): Fu[M[T]] = {
implicit val widenReader = pack.widenReader(reader)
coll.runCommand(DistinctCommand.Distinct(
key, selector, ReadConcern.Local, mongoWireVersion
), readPreference).flatMap {
_.result[T, M] match {
case Failure(cause) => scala.concurrent.Future.failed[M[T]](cause)
case Success(result) => fuccess(result)
}
}
}
}
}