migrate insight module

rm0193-mapreduce
Thibault Duplessis 2019-12-03 13:15:15 -06:00
parent 85dbd38f97
commit 1d1ca6536e
16 changed files with 361 additions and 379 deletions

View File

@ -216,7 +216,7 @@ lazy val insight = module(
"insight",
Seq(common, game, user, analyse, relation, pref, socket, round, security)
).settings(
libraryDependencies ++= provided(play.api, scalatags) ++ reactivemongo.bundle
libraryDependencies ++= provided(play.api, play.joda, scalatags) ++ reactivemongo.bundle
)
lazy val tournament = module("tournament", Seq(

View File

@ -492,10 +492,6 @@ insight {
uri = "mongodb://127.0.0.1:27017/lichess-insight"
mongo-async-driver = {}
}
collection {
entry = insight
user_cache = insight_user_cache
}
}
notify {
collection.notify = notify

View File

@ -14,11 +14,11 @@ object AggregationClusters {
private def single[X](question: Question[X], aggDocs: List[Bdoc]): List[Cluster[X]] =
aggDocs flatMap { doc =>
for {
x <- doc.getAs[X]("_id")(question.dimension.bson)
value <- doc.getAs[BSONNumberLike]("v")
nb <- doc.getAs[Int]("nb")
ids <- doc.getAs[List[String]]("ids")
} yield Cluster(x, Insight.Single(Point(value.toDouble)), nb, ids)
x <- doc.getAsOpt[X]("_id")(question.dimension.bson)
value <- doc.double("v")
nb <- doc.int("nb")
ids <- doc.getAsOpt[List[String]]("ids")
} yield Cluster(x, Insight.Single(Point(value)), nb, ids)
}
private case class StackEntry(metric: BSONValue, v: BSONNumberLike)
@ -29,18 +29,18 @@ object AggregationClusters {
val metricValues = Metric valuesOf question.metric
// println(lila.db.BSON debug doc)
for {
x <- doc.getAs[X]("_id")(question.dimension.bson)
stack <- doc.getAs[List[StackEntry]]("stack")
x <- doc.getAsOpt[X]("_id")(question.dimension.bson)
stack <- doc.getAsOpt[List[StackEntry]]("stack")
points = metricValues.map {
case Metric.MetricValue(id, name) =>
name -> Point(stack.find(_.metric == id).??(_.v.toDouble))
name -> Point(stack.find(_.metric == id).??(_.v.toDouble.get))
}
total = stack.map(_.v.toInt).sum
total = stack.map(_.v.toInt.get).sum
percents = if (total == 0) points
else points.map {
case (n, p) => n -> Point(100 * p.y / total)
}
ids <- doc.getAs[List[String]]("ids")
ids <- doc.getAsOpt[List[String]]("ids")
} yield Cluster(x, Insight.Stacked(percents), total, ids)
}

View File

@ -1,17 +1,39 @@
package lila.insight
import reactivemongo.api.bson._
import reactivemongo.api.bson.collection.BSONSerializationPack
import scalaz.{ NonEmptyList, IList }
import lila.db.dsl._
import lila.user.User
private final class AggregationPipeline {
private final class AggregationPipeline(store: Storage) {
def aggregate[X](question: Question[X], user: User): Fu[List[Bdoc]] =
store.coll.aggregateList(
maxDocs = Int.MaxValue,
allowDiskUse = true
) { implicit framework =>
import framework._
import question.{ dimension, metric, filters }
import lila.insight.{ Dimension => D, Metric => M }
import Storage._
import Entry.{ BSONFields => F }
import Storage._
private lazy val movetimeIdDispatcher =
val sampleGames = Sample(10_1000)
val sampleMoves = Sample(200_1000).some
val unwindMoves = UnwindField(F.moves).some
val sortNb = Sort(Descending("nb")).some
def limit(nb: Int) = Limit(nb).some
val regroupStacked = GroupField("_id.dimension")(
"nb" -> SumField("v"),
"ids" -> FirstField("ids"),
"stack" -> Push($doc("metric" -> "$_id.metric", "v" -> "$v"))
)
lazy val movetimeIdDispatcher =
MovetimeRange.reversedNoInf.foldLeft[BSONValue](BSONInteger(MovetimeRange.MTRInf.id)) {
case (acc, mtr) => $doc(
"$cond" -> $arr(
@ -21,7 +43,7 @@ private final class AggregationPipeline {
)
)
}
private lazy val materialIdDispatcher = $doc(
lazy val materialIdDispatcher = $doc(
"$cond" -> $arr(
$doc("$eq" -> $arr("$" + F.moves("i"), 0)),
MaterialRange.Equal.id,
@ -36,57 +58,44 @@ private final class AggregationPipeline {
}
)
)
private def dimensionGroupId(dim: Dimension[_]): BSONValue = dim match {
def dimensionGroupId(dim: Dimension[_]): BSONValue = dim match {
case Dimension.MovetimeRange => movetimeIdDispatcher
case Dimension.MaterialRange => materialIdDispatcher
case d => BSONString("$" + d.dbKey)
}
private sealed trait Grouping
private object Grouping {
sealed trait Grouping
object Grouping {
object Group extends Grouping
case class BucketAuto(buckets: Int, granularity: Option[String] = None) extends Grouping
}
private def dimensionGrouping(dim: Dimension[_]): Grouping = dim match {
def dimensionGrouping(dim: Dimension[_]): Grouping = dim match {
case D.Date => Grouping.BucketAuto(buckets = 12)
case _ => Grouping.Group
}
private val sampleGames = Sample(10 * 1000)
// private val sortDate = Sort(Descending(F.date))
private val sampleMoves = Sample(200 * 1000).some
private val unwindMoves = UnwindField(F.moves).some
private val sortNb = Sort(Descending("nb")).some
private def limit(nb: Int) = Limit(nb).some
val gameIdsSlice = $doc("ids" -> $doc("$slice" -> $arr("$ids", 4)))
val includeSomeGameIds = AddFields(gameIdsSlice)
val toPercent = $doc("v" -> $doc("$multiply" -> $arr(100, $doc("$avg" -> "$v"))))
private val regroupStacked = GroupField("_id.dimension")(
"nb" -> SumField("v"),
"ids" -> FirstField("ids"),
"stack" -> Push($doc("metric" -> "$_id.metric", "v" -> "$v"))
)
private val gameIdsSlice = $doc("ids" -> $doc("$slice" -> $arr("$ids", 4)))
private val includeSomeGameIds = AddFields(gameIdsSlice)
private val toPercent = $doc("v" -> $doc("$multiply" -> $arr(100, $doc("$avg" -> "$v"))))
private def group(d: Dimension[_], f: GroupFunction): List[Option[PipelineOperator]] =
def group(d: Dimension[_], f: GroupFunction): List[Option[PipelineOperator]] =
List(dimensionGrouping(d) match {
case Grouping.Group => Group(dimensionGroupId(d))(
"v" -> f,
"nb" -> SumValue(1),
"nb" -> SumAll,
"ids" -> AddFieldToSet("_id")
)
case Grouping.BucketAuto(buckets, granularity) => BucketAuto(dimensionGroupId(d), buckets, granularity)(
"v" -> f,
"nb" -> SumValue(1),
"nb" -> SumAll,
"ids" -> AddFieldToSet("_id") // AddFieldToSet crashes mongodb 3.4.1 server
)
}) map { Option(_) }
private def groupMulti(d: Dimension[_], metricDbKey: String): List[Option[PipelineOperator]] =
def groupMulti(d: Dimension[_], metricDbKey: String): List[Option[PipelineOperator]] =
(dimensionGrouping(d) match {
case Grouping.Group => List[PipelineOperator](
Group($doc("dimension" -> dimensionGroupId(d), "metric" -> s"$$$metricDbKey"))(
"v" -> SumValue(1),
"v" -> SumAll,
"ids" -> AddFieldToSet("_id")
),
regroupStacked,
@ -101,7 +110,7 @@ private final class AggregationPipeline {
),
UnwindField("doc"),
Group($doc("dimension" -> "$_id", "metric" -> "$doc.metric"))(
"v" -> SumValue(1),
"v" -> SumAll,
"ids" -> AddFieldToSet("doc.id")
),
regroupStacked,
@ -110,114 +119,109 @@ private final class AggregationPipeline {
)
}) map { Option(_) }
def apply(question: Question[_], userId: String): NonEmptyList[PipelineOperator] = {
import question.{ dimension, metric, filters }
val gameMatcher = combineDocs(question.filters.collect {
case f if f.dimension.isInGame => f.matcher
})
def matchMoves(extraMatcher: Bdoc = $empty) =
combineDocs(extraMatcher :: question.filters.collect {
case f if f.dimension.isInMove => f.matcher
}).some.filterNot(_.isEmpty) map Match
def projectForMove = Project(BSONDocument({
metric.dbKey :: dimension.dbKey :: filters.collect {
case Filter(d, _) if d.isInMove => d.dbKey
}
}.distinct.map(_ -> BSONBoolean(true)))).some
val gameMatcher = combineDocs(question.filters.collect {
case f if f.dimension.isInGame => f.matcher
})
def matchMoves(extraMatcher: Bdoc = $empty) =
combineDocs(extraMatcher :: question.filters.collect {
case f if f.dimension.isInMove => f.matcher
}).some.filterNot(_.isEmpty) map Match
def projectForMove = Project(BSONDocument({
metric.dbKey :: dimension.dbKey :: filters.collect {
case lila.insight.Filter(d, _) if d.isInMove => d.dbKey
}
}.distinct.map(_ -> BSONBoolean(true)))).some
NonEmptyList.nel[PipelineOperator](
Match(
selectUserId(userId) ++
gameMatcher ++
(dimension == Dimension.Opening).??($doc(F.eco $exists true)) ++
Metric.requiresAnalysis(metric).??($doc(F.analysed -> true)) ++
(Metric.requiresStableRating(metric) || Dimension.requiresStableRating(dimension)).?? {
$doc(F.provisional $ne true)
}
),
/* sortDate :: */ IList.fromList {
sampleGames :: ((metric match {
case M.MeanCpl => List(
projectForMove,
unwindMoves,
matchMoves(),
sampleMoves
) :::
group(dimension, AvgField(F.moves("c"))) :::
List(includeSomeGameIds.some)
case M.Material => List(
projectForMove,
unwindMoves,
matchMoves(),
sampleMoves
) :::
group(dimension, AvgField(F.moves("i"))) :::
List(includeSomeGameIds.some)
case M.Opportunism => List(
projectForMove,
unwindMoves,
matchMoves($doc(F.moves("o") -> $doc("$exists" -> true))),
sampleMoves
) :::
group(dimension, GroupFunction("$push", $doc("$cond" -> $arr("$" + F.moves("o"), 1, 0)))) :::
List(AddFields(gameIdsSlice ++ toPercent).some)
case M.Luck => List(
projectForMove,
unwindMoves,
matchMoves($doc(F.moves("l") $exists true)),
sampleMoves
) :::
group(dimension, GroupFunction("$push", $doc("$cond" -> $arr("$" + F.moves("l"), 1, 0)))) :::
List(AddFields(gameIdsSlice ++ toPercent).some)
case M.NbMoves => List(
projectForMove,
unwindMoves,
matchMoves(),
sampleMoves
) :::
group(dimension, SumValue(1)) :::
List(
Project($doc(
"v" -> true,
"ids" -> true,
"nb" -> $doc("$size" -> "$ids")
)).some,
AddFields(
$doc("v" -> $doc("$divide" -> $arr("$v", "$nb"))) ++
gameIdsSlice
).some
)
case M.Movetime => List(
projectForMove,
unwindMoves,
matchMoves(),
sampleMoves
) :::
group(dimension, GroupFunction(
"$avg",
$doc("$divide" -> $arr("$" + F.moves("t"), 10))
)) :::
List(includeSomeGameIds.some)
case M.RatingDiff =>
group(dimension, AvgField(F.ratingDiff)) ::: List(includeSomeGameIds.some)
case M.OpponentRating =>
group(dimension, AvgField(F.opponentRating)) ::: List(includeSomeGameIds.some)
case M.Result =>
groupMulti(dimension, F.result)
case M.Termination =>
groupMulti(dimension, F.termination)
case M.PieceRole => List(
projectForMove,
unwindMoves,
matchMoves(),
sampleMoves
) :::
groupMulti(dimension, F.moves("r"))
}) ::: (dimension match {
case D.Opening => List(sortNb, limit(12))
case _ => Nil
})).flatten
}
)
Match(
selectUserId(user.id) ++
gameMatcher ++
(dimension == Dimension.Opening).??($doc(F.eco $exists true)) ++
Metric.requiresAnalysis(metric).??($doc(F.analysed -> true)) ++
(Metric.requiresStableRating(metric) || Dimension.requiresStableRating(dimension)).?? {
$doc(F.provisional $ne true)
}
) -> /* sortDate :: */ {
sampleGames :: ((metric match {
case M.MeanCpl => List(
projectForMove,
unwindMoves,
matchMoves(),
sampleMoves
) :::
group(dimension, AvgField(F.moves("c"))) :::
List(includeSomeGameIds.some)
case M.Material => List(
projectForMove,
unwindMoves,
matchMoves(),
sampleMoves
) :::
group(dimension, AvgField(F.moves("i"))) :::
List(includeSomeGameIds.some)
case M.Opportunism => List(
projectForMove,
unwindMoves,
matchMoves($doc(F.moves("o") -> $doc("$exists" -> true))),
sampleMoves
) :::
group(dimension, GroupFunction("$push", $doc("$cond" -> $arr("$" + F.moves("o"), 1, 0)))) :::
List(AddFields(gameIdsSlice ++ toPercent).some)
case M.Luck => List(
projectForMove,
unwindMoves,
matchMoves($doc(F.moves("l") $exists true)),
sampleMoves
) :::
group(dimension, GroupFunction("$push", $doc("$cond" -> $arr("$" + F.moves("l"), 1, 0)))) :::
List(AddFields(gameIdsSlice ++ toPercent).some)
case M.NbMoves => List(
projectForMove,
unwindMoves,
matchMoves(),
sampleMoves
) :::
group(dimension, SumAll) :::
List(
Project($doc(
"v" -> true,
"ids" -> true,
"nb" -> $doc("$size" -> "$ids")
)).some,
AddFields(
$doc("v" -> $doc("$divide" -> $arr("$v", "$nb"))) ++
gameIdsSlice
).some
)
case M.Movetime => List(
projectForMove,
unwindMoves,
matchMoves(),
sampleMoves
) :::
group(dimension, GroupFunction(
"$avg",
$doc("$divide" -> $arr("$" + F.moves("t"), 10))
)) :::
List(includeSomeGameIds.some)
case M.RatingDiff =>
group(dimension, AvgField(F.ratingDiff)) ::: List(includeSomeGameIds.some)
case M.OpponentRating =>
group(dimension, AvgField(F.opponentRating)) ::: List(includeSomeGameIds.some)
case M.Result =>
groupMulti(dimension, F.result)
case M.Termination =>
groupMulti(dimension, F.termination)
case M.PieceRole => List(
projectForMove,
unwindMoves,
matchMoves(),
sampleMoves
) :::
groupMulti(dimension, F.moves("r"))
}) ::: (dimension match {
case D.Opening => List(sortNb, limit(12))
case _ => Nil
})).flatten
}
}
}

View File

@ -6,55 +6,50 @@ import chess.opening.{ Ecopening, EcopeningDB }
import chess.{ Role, Color }
import lila.db.BSON
import lila.db.dsl._
import lila.rating.PerfType
import lila.rating.BSONHandlers.perfTypeIdHandler
import lila.rating.PerfType
private object BSONHandlers {
implicit val ColorBSONHandler = new BSONHandler[BSONBoolean, Color] {
def read(b: BSONBoolean) = Color(b.value)
def write(c: Color) = BSONBoolean(c.white)
}
implicit val EcopeningBSONHandler = new BSONHandler[BSONString, Ecopening] {
def read(b: BSONString) = EcopeningDB.allByEco get b.value err s"Invalid ECO ${b.value}"
def write(e: Ecopening) = BSONString(e.eco)
}
implicit val RelativeStrengthBSONHandler = new BSONHandler[BSONInteger, RelativeStrength] {
def read(b: BSONInteger) = RelativeStrength.byId get b.value err s"Invalid relative strength ${b.value}"
def write(e: RelativeStrength) = BSONInteger(e.id)
}
implicit val ResultBSONHandler = new BSONHandler[BSONInteger, Result] {
def read(b: BSONInteger) = Result.byId get b.value err s"Invalid result ${b.value}"
def write(e: Result) = BSONInteger(e.id)
}
implicit val PhaseBSONHandler = new BSONHandler[BSONInteger, Phase] {
def read(b: BSONInteger) = Phase.byId get b.value err s"Invalid phase ${b.value}"
def write(e: Phase) = BSONInteger(e.id)
}
implicit val RoleBSONHandler = new BSONHandler[BSONString, Role] {
def read(b: BSONString) = Role.allByForsyth get b.value.head err s"Invalid role ${b.value}"
def write(e: Role) = BSONString(e.forsyth.toString)
}
implicit val TerminationBSONHandler = new BSONHandler[BSONInteger, Termination] {
def read(b: BSONInteger) = Termination.byId get b.value err s"Invalid termination ${b.value}"
def write(e: Termination) = BSONInteger(e.id)
}
implicit val MovetimeRangeBSONHandler = new BSONHandler[BSONInteger, MovetimeRange] {
def read(b: BSONInteger) = MovetimeRange.byId get b.value err s"Invalid movetime range ${b.value}"
def write(e: MovetimeRange) = BSONInteger(e.id)
}
implicit val CastlingBSONHandler = new BSONHandler[BSONInteger, Castling] {
def read(b: BSONInteger) = Castling.byId get b.value err s"Invalid Castling ${b.value}"
def write(e: Castling) = BSONInteger(e.id)
}
implicit val QueenTradeBSONHandler = new BSONHandler[BSONBoolean, QueenTrade] {
def read(b: BSONBoolean) = QueenTrade(b.value)
def write(e: QueenTrade) = BSONBoolean(e.id)
}
implicit val MaterialRangeBSONHandler = new BSONHandler[BSONInteger, MaterialRange] {
def read(b: BSONInteger) = MaterialRange.byId get b.value err s"Invalid material range ${b.value}"
def write(e: MaterialRange) = BSONInteger(e.id)
}
implicit val ColorBSONHandler = BSONBooleanHandler.as[Color](Color.apply, _.white)
implicit val EcopeningBSONHandler = tryHandler[Ecopening](
{ case BSONString(v) => EcopeningDB.allByEco get v toTry s"Invalid ECO $v" },
e => BSONString(e.eco)
)
implicit val RelativeStrengthBSONHandler = tryHandler[RelativeStrength](
{ case BSONInteger(v) => RelativeStrength.byId get v toTry s"Invalid relative strength $v" },
e => BSONInteger(e.id)
)
implicit val ResultBSONHandler = tryHandler[Result](
{ case BSONInteger(v) => Result.byId get v toTry s"Invalid result $v" },
e => BSONInteger(e.id)
)
implicit val PhaseBSONHandler = tryHandler[Phase](
{ case BSONInteger(v) => Phase.byId get v toTry s"Invalid phase $v" },
e => BSONInteger(e.id)
)
implicit val RoleBSONHandler = tryHandler[Role](
{ case BSONString(v) => Role.allByForsyth get v.head toTry s"Invalid role $v" },
e => BSONString(e.forsyth.toString)
)
implicit val TerminationBSONHandler = tryHandler[Termination](
{ case BSONInteger(v) => Termination.byId get v toTry s"Invalid termination $v" },
e => BSONInteger(e.id)
)
implicit val MovetimeRangeBSONHandler = tryHandler[MovetimeRange](
{ case BSONInteger(v) => MovetimeRange.byId get v toTry s"Invalid movetime range $v" },
e => BSONInteger(e.id)
)
implicit val CastlingBSONHandler = tryHandler[Castling](
{ case BSONInteger(v) => Castling.byId get v toTry s"Invalid Castling $v" },
e => BSONInteger(e.id)
)
implicit val MaterialRangeBSONHandler = tryHandler[MaterialRange](
{ case BSONInteger(v) => MaterialRange.byId get v toTry s"Invalid material range $v" },
e => BSONInteger(e.id)
)
implicit val QueenTradeBSONHandler = BSONBooleanHandler.as[QueenTrade](QueenTrade.apply, _.id)
implicit val DateRangeBSONHandler = Macros.handler[lila.insight.DateRange]

View File

@ -193,7 +193,7 @@ object Dimension {
case Dimension.Period => selected.sortBy(-_.days).headOption.fold($empty) { period =>
$doc(d.dbKey $gt period.min)
}
case _ => selected map d.bson.write match {
case _ => selected flatMap d.bson.writeOpt match {
case Nil => $empty
case List(x) => $doc(d.dbKey -> x)
case xs => $doc(d.dbKey -> $doc("$in" -> BSONArray(xs)))

View File

@ -132,7 +132,7 @@ object Castling {
object None extends Castling(3, "No castling")
val all = List(Kingside, Queenside, None)
val byId = all map { p => (p.id, p) } toMap
def fromMoves(moves: Traversable[String]) = moves.find(_ startsWith "O") match {
def fromMoves(moves: Iterable[String]) = moves.find(_ startsWith "O") match {
case Some("O-O") => Kingside
case Some("O-O-O") => Queenside
case _ => None

View File

@ -1,62 +1,49 @@
package lila.insight
import akka.actor._
import com.typesafe.config.Config
import com.softwaremill.macwire._
import io.methvin.play.autoconfig._
import play.api.Configuration
import scala.concurrent.duration.FiniteDuration
import lila.common.config._
import lila.db.DbConfig
import lila.db.Env.configLoader
final class Env(
config: Config,
getPref: String => Fu[lila.pref.Pref],
areFriends: (String, String) => Fu[Boolean],
system: ActorSystem,
lifecycle: play.api.inject.ApplicationLifecycle
) {
appConfig: Configuration,
gameRepo: lila.game.GameRepo,
analysisRepo: lila.analyse.AnalysisRepo,
prefApi: lila.pref.PrefApi,
relationApi: lila.relation.RelationApi
)(implicit system: akka.actor.ActorSystem) {
private val settings = new {
val CollectionEntry = config getString "collection.entry"
val CollectionUserCache = config getString "collection.user_cache"
}
import settings._
private val db = new lila.db.Env("insight", config getConfig "mongodb", lifecycle)
lazy val share = new Share(getPref, areFriends)
lazy val jsonView = new JsonView
private lazy val storage = new Storage(coll = db(CollectionEntry))
private lazy val aggregationPipeline = new AggregationPipeline
private lazy val indexer = new Indexer(
storage = storage,
sequencer = new lila.hub.FutureSequencer(
system = system,
executionTimeout = None,
logger = logger
)
private lazy val db = new lila.db.Env(
"insight",
appConfig.get[DbConfig]("insight.mongodb")(AutoConfig.loader)
)
private lazy val userCacheApi = new UserCacheApi(coll = db(CollectionUserCache))
lazy val share = wire[Share]
lazy val api = new InsightApi(
storage = storage,
userCacheApi = userCacheApi,
pipeline = aggregationPipeline,
indexer = indexer
lazy val jsonView = wire[JsonView]
private lazy val storage = new Storage(db(CollName("insight")))
private lazy val aggregationPipeline = wire[AggregationPipeline]
private def sequencer = new lila.hub.FutureSequencer(
executionTimeout = None,
logger = logger
)
private lazy val povToEntry = wire[PovToEntry]
private lazy val indexer = wire[Indexer]
private lazy val userCacheApi = new UserCacheApi(db(CollName("insight_user_cache")))
lazy val api = wire[InsightApi]
lila.common.Bus.subscribeFun("analysisReady") {
case lila.analyse.actorApi.AnalysisReady(game, _) => api updateGame game
}
}
object Env {
lazy val current: Env = "insight" boot new Env(
config = lila.common.PlayApp loadConfig "insight",
getPref = lila.pref.Env.current.api.getPrefById,
areFriends = lila.relation.Env.current.api.fetchAreFriends,
system = lila.common.PlayApp.system,
lifecycle = lila.common.PlayApp.lifecycle
)
}

View File

@ -1,9 +1,8 @@
package lila.insight
import akka.actor.ActorRef
import akka.stream.scaladsl.Sink
import org.joda.time.DateTime
import play.api.libs.iteratee._
import reactivemongo.api.ReadPreference
import reactivemongo.api._
import reactivemongo.api.bson._
import lila.db.dsl._
@ -13,7 +12,12 @@ import lila.game.{ Game, GameRepo, Query }
import lila.hub.FutureSequencer
import lila.user.User
private final class Indexer(storage: Storage, sequencer: FutureSequencer) {
private final class Indexer(
povToEntry: PovToEntry,
gameRepo: GameRepo,
storage: Storage,
sequencer: FutureSequencer
)(implicit mat: akka.stream.Materializer) {
def all(user: User): Funit = sequencer {
storage.fetchLast(user.id) flatMap {
@ -23,7 +27,7 @@ private final class Indexer(storage: Storage, sequencer: FutureSequencer) {
}
def update(game: Game, userId: String, previous: Entry): Funit =
PovToEntry(game, userId, previous.provisional) flatMap {
povToEntry(game, userId, previous.provisional) flatMap {
case Right(e) => storage update e.copy(number = previous.number)
case _ => funit
}
@ -41,53 +45,45 @@ private final class Indexer(storage: Storage, sequencer: FutureSequencer) {
Query.notHordeOrSincePawnsAreWhite
// private val maxGames = 1 * 10
private val maxGames = 10 * 1000
private val maxGames = 10_1000
private def fetchFirstGame(user: User): Fu[Option[Game]] =
if (user.count.rated == 0) fuccess(none)
else {
(user.count.rated >= maxGames) ?? GameRepo.coll
(user.count.rated >= maxGames) ?? gameRepo.coll.ext
.find(gameQuery(user))
.sort(Query.sortCreated)
.skip(maxGames - 1)
.uno[Game](readPreference = ReadPreference.secondaryPreferred)
} orElse GameRepo.coll
} orElse gameRepo.coll.ext
.find(gameQuery(user))
.sort(Query.sortChronological)
.uno[Game](readPreference = ReadPreference.secondaryPreferred)
private def computeFrom(user: User, from: DateTime, fromNumber: Int): Funit = {
import reactivemongo.play.iteratees.cursorProducer
storage nbByPerf user.id flatMap { nbs =>
var nbByPerf = nbs
def toEntry(game: Game): Fu[Option[Entry]] = game.perfType ?? { pt =>
val nb = nbByPerf.getOrElse(pt, 0) + 1
nbByPerf = nbByPerf.updated(pt, nb)
PovToEntry(game, user.id, provisional = nb < 10).addFailureEffect { e =>
povToEntry(game, user.id, provisional = nb < 10).addFailureEffect { e =>
println(e)
e.printStackTrace
} map (_.right.toOption)
} map (_.toOption)
}
val query = gameQuery(user) ++ $doc(Game.BSONFields.createdAt $gte from)
GameRepo.sortedCursor(query, Query.sortChronological)
.enumerator(maxGames) &>
Enumeratee.grouped(Iteratee takeUpTo 4) &>
Enumeratee.mapM[Seq[Game]].apply[Seq[Entry]] { games =>
games.map(toEntry).sequenceFu.map(_.flatten).addFailureEffect { e =>
println(e)
e.printStackTrace
}
} &>
Enumeratee.grouped(Iteratee takeUpTo 50) |>>>
Iteratee.foldM[Seq[Seq[Entry]], Int](fromNumber) {
case (number, xs) =>
val entries = xs.flatten.sortBy(_.date).zipWithIndex.map {
case (e, i) => e.copy(number = number + i)
}
val nextNumber = number + entries.size
storage bulkInsert entries inject nextNumber
}
gameRepo.sortedCursor(query, Query.sortChronological)
.documentSource()
.take(maxGames)
.mapAsync(4)(toEntry)
.mapConcat(_.toList)
.zipWithIndex
.map { case (e, i) => e.copy(number = fromNumber + i.toInt) }
.grouped(50)
.mapAsyncUnordered(1)(storage.bulkInsert)
.to(Sink.ignore)
.run
} void
}
}

View File

@ -7,8 +7,9 @@ import lila.user.User
final class InsightApi(
storage: Storage,
userCacheApi: UserCacheApi,
pipeline: AggregationPipeline,
userCacheApi: UserCacheApi,
gameRepo: GameRepo,
indexer: Indexer
) {
@ -25,16 +26,16 @@ final class InsightApi(
}
def ask[X](question: Question[X], user: User): Fu[Answer[X]] =
storage.aggregate(pipeline(question, user.id)).flatMap { aggDocs =>
pipeline.aggregate(question, user).flatMap { aggDocs =>
val clusters = AggregationClusters(question, aggDocs)
val gameIds = scala.util.Random.shuffle(clusters.flatMap(_.gameIds)) take 4
GameRepo.userPovsByGameIds(gameIds, user) map { povs =>
gameRepo.userPovsByGameIds(gameIds, user) map { povs =>
Answer(question, clusters, povs)
}
}.mon(_.insight.request.time) >>- lila.mon.insight.request.count()
def userStatus(user: User): Fu[UserStatus] =
GameRepo lastFinishedRatedNotFromPosition user flatMap {
gameRepo lastFinishedRatedNotFromPosition user flatMap {
case None => fuccess(UserStatus.NoGame)
case Some(game) => storage fetchLast user.id map {
case None => UserStatus.Empty

View File

@ -63,10 +63,10 @@ object JsonQuestion {
def fromQuestion(q: Question[_]) = JsonQuestion(
dimension = q.dimension.key,
metric = q.metric.key,
filters = q.filters.map {
filters = q.filters.view.map {
case Filter(dimension, selected) =>
dimension.key -> selected.map(Dimension.valueKey(dimension))
}(scala.collection.breakOut)
}.toMap
)
implicit val QuestionFormats = Json.format[JsonQuestion]

View File

@ -1,10 +1,12 @@
package lila.insight
import play.api.libs.json._
import play.api.libs.json.JodaWrites._
final class JsonView {
import lila.insight.{ Dimension => D, Metric => M }
import writers._
case class Categ(name: String, items: List[JsValue])
private implicit val categWrites = Json.writes[Categ]
@ -23,27 +25,27 @@ final class JsonView {
val dimensionCategs = List(
Categ("Setup", List(
Json toJson D.Date,
Json toJson D.Period,
Json toJson D.Perf,
Json toJson D.Color,
Json toJson D.OpponentStrength
Json.toJson(D.Date: Dimension[_]),
Json.toJson(D.Period: Dimension[_]),
Json.toJson(D.Perf: Dimension[_]),
Json.toJson(D.Color: Dimension[_]),
Json.toJson(D.OpponentStrength: Dimension[_])
)),
Categ("Game", List(
openingJson,
Json toJson D.MyCastling,
Json toJson D.OpCastling,
Json toJson D.QueenTrade
Json.toJson(D.MyCastling: Dimension[_]),
Json.toJson(D.OpCastling: Dimension[_]),
Json.toJson(D.QueenTrade: Dimension[_])
)),
Categ("Move", List(
Json toJson D.PieceRole,
Json toJson D.MovetimeRange,
Json toJson D.MaterialRange,
Json toJson D.Phase
Json.toJson(D.PieceRole: Dimension[_]),
Json.toJson(D.MovetimeRange: Dimension[_]),
Json.toJson(D.MaterialRange: Dimension[_]),
Json.toJson(D.Phase: Dimension[_])
)),
Categ("Result", List(
Json toJson D.Termination,
Json toJson D.Result
Json.toJson(D.Termination: Dimension[_]),
Json.toJson(D.Result: Dimension[_])
))
)
@ -56,59 +58,62 @@ final class JsonView {
private val metricCategs = List(
Categ("Setup", List(
Json toJson M.OpponentRating
Json.toJson(M.OpponentRating: Metric)
)),
Categ("Move", List(
Json toJson M.Movetime,
Json toJson M.PieceRole,
Json toJson M.Material,
Json toJson M.NbMoves
Json.toJson(M.Movetime: Metric),
Json.toJson(M.PieceRole: Metric),
Json.toJson(M.Material: Metric),
Json.toJson(M.NbMoves: Metric)
)),
Categ("Evaluation", List(
Json toJson M.MeanCpl,
Json toJson M.Opportunism,
Json toJson M.Luck
Json.toJson(M.MeanCpl: Metric),
Json.toJson(M.Opportunism: Metric),
Json.toJson(M.Luck: Metric)
)),
Categ("Result", List(
Json toJson M.Termination,
Json toJson M.Result,
Json toJson M.RatingDiff
Json.toJson(M.Termination: Metric),
Json.toJson(M.Result: Metric),
Json.toJson(M.RatingDiff: Metric)
))
)
private implicit def presetWriter[X]: OWrites[Preset] = OWrites { p =>
Json.obj(
"name" -> p.name,
"dimension" -> p.question.dimension.key,
"metric" -> p.question.metric.key,
"filters" -> JsObject(p.question.filters.map {
case Filter(dimension, selected) =>
dimension.key -> JsArray(selected.map(Dimension.valueKey(dimension)).map(JsString.apply))
})
)
}
private object writers {
private implicit def dimensionWriter[X]: OWrites[Dimension[X]] = OWrites { d =>
Json.obj(
"key" -> d.key,
"name" -> d.name,
"position" -> d.position,
"description" -> d.description.render,
"values" -> Dimension.valuesOf(d).map(Dimension.valueToJson(d))
)
}
implicit def presetWriter[X]: Writes[Preset] = Writes { p =>
Json.obj(
"name" -> p.name,
"dimension" -> p.question.dimension.key,
"metric" -> p.question.metric.key,
"filters" -> JsObject(p.question.filters.map {
case Filter(dimension, selected) =>
dimension.key -> JsArray(selected.map(Dimension.valueKey(dimension)).map(JsString.apply))
})
)
}
private implicit def metricWriter: OWrites[Metric] = OWrites { m =>
Json.obj(
"key" -> m.key,
"name" -> m.name,
"description" -> m.description.render,
"position" -> m.position
)
}
implicit def dimensionWriter[X]: Writes[Dimension[X]] = Writes { d =>
Json.obj(
"key" -> d.key,
"name" -> d.name,
"position" -> d.position,
"description" -> d.description.render,
"values" -> Dimension.valuesOf(d).map(Dimension.valueToJson(d))
)
}
private implicit def positionWriter: Writes[Position] = Writes { p =>
JsString(p.name)
implicit val metricWriter: Writes[Metric] = Writes { m =>
Json.obj(
"key" -> m.key,
"name" -> m.name,
"description" -> m.description.render,
"position" -> m.position
)
}
implicit val positionWriter: Writes[Position] = Writes { p =>
JsString(p.name)
}
}
object chart {
@ -123,8 +128,8 @@ final class JsonView {
def question(metric: String, dimension: String, filters: String) = Json.obj(
"metric" -> metric,
"dimension" -> dimension,
"filters" -> (filters.split('/').map(_ split ':').collect {
"filters" -> (filters.split('/').view.map(_ split ':').collect {
case Array(key, values) => key -> JsArray(values.split(',').map(JsString.apply))
}(scala.collection.breakOut): Map[String, JsArray])
}.toMap: Map[String, JsArray])
)
}

View File

@ -1,12 +1,15 @@
package lila.insight
import chess.{ Role, Board }
import chess.format.FEN
import chess.{ Role, Board }
import lila.analyse.{ Accuracy, Advice }
import lila.game.{ Game, Pov, GameRepo }
import lila.game.{ Game, Pov }
import scalaz.NonEmptyList
object PovToEntry {
private final class PovToEntry(
gameRepo: lila.game.GameRepo,
analysisRepo: lila.analyse.AnalysisRepo
) {
private type Ply = Int
@ -28,8 +31,8 @@ object PovToEntry {
private def removeWrongAnalysis(game: Game): Boolean = {
if (game.metadata.analysed && !game.analysable) {
GameRepo setUnanalysed game.id
lila.analyse.AnalysisRepo remove game.id
gameRepo setUnanalysed game.id
analysisRepo remove game.id
true
}
false
@ -38,8 +41,8 @@ object PovToEntry {
private def enrich(game: Game, userId: String, provisional: Boolean): Fu[Option[RichPov]] =
if (removeWrongAnalysis(game)) fuccess(none)
else lila.game.Pov.ofUserId(game, userId) ?? { pov =>
lila.game.GameRepo.initialFen(game) zip
(game.metadata.analysed ?? lila.analyse.AnalysisRepo.byId(game.id)) map {
gameRepo.initialFen(game) zip
(game.metadata.analysed ?? analysisRepo.byId(game.id)) map {
case (fen, an) => for {
boards <- chess.Replay.boards(
moveStrs = game.pgnMoves,
@ -56,7 +59,7 @@ object PovToEntry {
moveAccuracy = an.map { Accuracy.diffsList(pov, _) },
boards = boards,
movetimes = movetimes,
advices = an.?? { _.advices.map { a => a.info.ply -> a }(scala.collection.breakOut) }
advices = an.?? { _.advices.view.map { a => a.info.ply -> a }.toMap }
)
}
}

View File

@ -5,19 +5,21 @@ import lila.security.Granter
import lila.user.User
final class Share(
getPref: String => Fu[Pref],
areFriends: (String, String) => Fu[Boolean]
prefApi: lila.pref.PrefApi,
relationApi: lila.relation.RelationApi
) {
def getPrefId(insighted: User) = getPref(insighted.id) map (_.insightShare)
def getPrefId(insighted: User) = prefApi.getPrefById(insighted.id) map (_.insightShare)
def grant(insighted: User, to: Option[User]): Fu[Boolean] =
if (to ?? Granter(_.SeeInsight)) fuTrue
else getPref(insighted.id) flatMap { pref =>
else prefApi.getPrefById(insighted.id) flatMap { pref =>
pref.insightShare match {
case _ if to.contains(insighted) => fuTrue
case Pref.InsightShare.EVERYBODY => fuTrue
case Pref.InsightShare.FRIENDS => to ?? { t => areFriends(insighted.id, t.id) }
case Pref.InsightShare.FRIENDS => to ?? { t =>
relationApi.fetchAreFriends(insighted.id, t.id)
}
case Pref.InsightShare.NOBODY => fuFalse
}
}

View File

@ -7,20 +7,12 @@ import lila.db.dsl._
import lila.rating.BSONHandlers.perfTypeIdHandler
import lila.rating.PerfType
private final class Storage(coll: Coll) {
private final class Storage(val coll: Coll) {
import Storage._
import BSONHandlers._
import Entry.{ BSONFields => F }
def aggregate(operators: NonEmptyList[PipelineOperator]): Fu[List[Bdoc]] =
coll.aggregateList(
operators.head,
operators.tail.toList,
maxDocs = Int.MaxValue,
allowDiskUse = true
)
def fetchFirst(userId: String): Fu[Option[Entry]] =
coll.find(selectUserId(userId)).sort(sortChronological).uno[Entry]
@ -28,13 +20,12 @@ private final class Storage(coll: Coll) {
coll.find(selectUserId(userId)).sort(sortAntiChronological).uno[Entry]
def count(userId: String): Fu[Int] =
coll.count(selectUserId(userId).some)
coll.countSel(selectUserId(userId))
def insert(p: Entry) = coll.insert(p).void
def bulkInsert(ps: Seq[Entry]) = coll.bulkInsert(
documents = ps.map(BSONHandlers.EntryBSONHandler.write).toStream,
ordered = false
def bulkInsert(ps: Seq[Entry]) = coll.insert.many(
ps.flatMap(BSONHandlers.EntryBSONHandler.writeOpt)
)
def update(p: Entry) = coll.update(selectId(p.id), p, upsert = true).void
@ -46,20 +37,23 @@ private final class Storage(coll: Coll) {
def find(id: String) = coll.find(selectId(id)).uno[Entry]
def ecos(userId: String): Fu[Set[String]] =
coll.distinct[String, Set](F.eco, selectUserId(userId).some)
coll.distinctEasy[String, Set](F.eco, selectUserId(userId))
def nbByPerf(userId: String): Fu[Map[PerfType, Int]] = coll.aggregateList(
Match(BSONDocument(F.userId -> userId)),
List(GroupField(F.perf)("nb" -> SumValue(1))),
maxDocs = 50
).map {
_.flatMap { doc =>
for {
perfType <- doc.getAs[PerfType]("_id")
nb <- doc.getAs[Int]("nb")
} yield perfType -> nb
}(scala.collection.breakOut)
}
) { framework =>
import framework._
Match(BSONDocument(F.userId -> userId)) -> List(
GroupField(F.perf)("nb" -> SumValue(1))
)
}.map {
_.flatMap { doc =>
for {
perfType <- doc.getAsOpt[PerfType]("_id")
nb <- doc.int("nb")
} yield perfType -> nb
}.toMap
}
}
private object Storage {

View File

@ -322,7 +322,7 @@ final class ReportApi(
}.void
}
def countOpenByRooms: Fu[Room.Counts] = {
def countOpenByRooms: Fu[Room.Counts] =
coll.aggregateList(maxDocs = 100) { framework =>
import framework._
Match(selectOpenAvailableInRoom(none)) -> List(
@ -335,7 +335,6 @@ final class ReportApi(
}
}.toMap)
}
}
private def findRecent(nb: Int, selector: Bdoc): Fu[List[Report]] = (nb > 0) ?? {
coll.ext.find(selector).sort(sortLastAtomAt).list[Report](nb)