coach data incremental aggregation!

This commit is contained in:
Thibault Duplessis 2015-07-25 18:35:34 +02:00
parent 40b59c28a7
commit 04c10a5fae
7 changed files with 106 additions and 102 deletions

View file

@ -54,8 +54,8 @@ object Coach extends LilaController {
def refresh(username: String) = Open { implicit ctx =>
Accessible(username) { user =>
{
if (isGranted(_.SuperAdmin)) env.statApi.computeForce(user.id)
else env.statApi.computeIfOld(user.id)
if (isGranted(_.SuperAdmin)) env.aggregator(user.id)
else env.aggregator(user.id)
} inject Ok
}
}

View file

@ -27,7 +27,6 @@
</form>
}
<br />
Yes, I know data is borked.
<div class="refreshing none">
Hang on while we crunch data<br />
from @u.username @if(u.count.rated > 10 * 5000) {

View file

@ -0,0 +1,83 @@
package lila.coach
import akka.actor._
import akka.pattern.ask
import org.joda.time.DateTime
import play.api.libs.iteratee._
import play.api.libs.json.Json
import reactivemongo.bson._
import lila.db.api._
import lila.db.BSON._
import lila.db.Implicits._
import lila.game.BSONHandlers.gameBSONHandler
import lila.game.tube.gameTube
import lila.game.{ Game, Query }
import lila.hub.Sequencer
final class Aggregator(api: StatApi, sequencer: ActorRef) {
private implicit val timeout = makeTimeout.minutes(60)
def apply(userId: String): Funit = {
val p = scala.concurrent.Promise[Unit]()
sequencer ? Sequencer.work(compute(userId), p.some)
p.future
}
private def compute(userId: String): Funit = api.fetchLast(userId) flatMap {
case None => fromScratch(userId)
case Some(p) => api.remove(p) >> computeFrom(userId, p.from)
}
private def fromScratch(userId: String): Funit =
pimpQB($query(gameQuery(userId))).sort(Query.sortCreated).skip(maxGames - 1).one[Game] flatMap {
_.?? { g => computeFrom(userId, g.createdAt) }
}
private def gameQuery(userId: String) = Query.user(userId) ++ Query.rated ++ Query.finished
private val maxGames = 5 * 1000
private def computeFrom(userId: String, from: DateTime): Funit = {
pimpQB($query(gameQuery(userId) ++ Json.obj(Game.BSONFields.createdAt -> $gte($date(from)))))
.sort(Query.sortChronological)
.cursor[Game]()
.enumerate(maxGames, stopOnError = true) &>
richPovEnumeratee(userId) |>>>
Iteratee.foldM[Option[RichPov], Periods.Computation](Periods.initComputation(userId, api.insert)) {
case (comp, Some(p)) => try {
comp aggregate p
}
catch {
case e: Exception =>
e.printStackTrace
logwarn(s"[StatApi] game ${p.pov.game.id} $e"); fuccess(comp)
}
// case (comp, Some(p)) => comp aggregate p
case (comp, _) => logwarn("[StatApi] invalid pov"); fuccess(comp)
}
}.map(_.run)
private def richPovEnumeratee(userId: String) =
Enumeratee.mapM[lila.game.Game].apply[Option[RichPov]] { game =>
lila.game.Pov.ofUserId(game, userId) ?? { pov =>
lila.game.GameRepo.initialFen(game) zip
(game.metadata.analysed ?? lila.analyse.AnalysisRepo.doneById(game.id)) map {
case (fen, an) =>
val division = chess.Replay.boards(
moveStrs = game.pgnMoves,
initialFen = fen,
variant = game.variant
).toOption.fold(chess.Division.empty)(chess.Divider.apply)
RichPov(
pov = pov,
initialFen = fen,
analysis = an,
division = division,
accuracy = an.flatMap { lila.analyse.Accuracy(pov, _, division) },
moveAccuracy = an.map { lila.analyse.Accuracy.diffsList(pov, _) }
).some
}
}
}
}

View file

@ -27,8 +27,11 @@ final class Env(
lazy val jsonView = new JsonView(jsonWriters)
lazy val statApi = new StatApi(
coll = db(CollectionStat),
makeThrottler = f => new Throttler(system, f))
coll = db(CollectionStat))
lazy val aggregator = new Aggregator(
api = statApi,
sequencer = system.actorOf(Props(classOf[lila.hub.Sequencer], 5 minutes)))
}
object Env {

View file

@ -7,19 +7,18 @@ import reactivemongo.bson.Macros
import reactivemongo.core.commands._
import scala.concurrent.duration._
import lila.db.api.$query
import lila.db.BSON._
import lila.db.Implicits._
import lila.user.UserRepo
final class StatApi(
coll: Coll,
makeThrottler: (String => Funit) => Throttler) {
final class StatApi(coll: Coll) {
import BSONHandlers._
private def selectId(id: String) = BSONDocument("_id" -> id)
private def selectUserId(id: String) = BSONDocument("userId" -> id)
private val sortChronological = BSONDocument("from" -> 1)
private val sortAntiChronological = BSONDocument("from" -> -1)
def fetchRange(userId: String, range: Option[Range]): Fu[Option[Period]] =
range.fold(fetchAll(userId)) { r =>
@ -40,82 +39,15 @@ final class StatApi(
def fetchFirst(userId: String): Fu[Option[Period]] =
fetchRange(userId, Range(0, 1).some)
def fetchLast(userId: String): Fu[Option[Period]] =
coll.find(selectUserId(userId)).sort(sortAntiChronological).one[Period]
def count(userId: String): Fu[Int] =
coll.count(selectUserId(userId).some)
def computeIfOld(id: String): Funit = fetchFirst(id) flatMap {
case Some(stat) => funit
case _ => throttler(id)
}
def insert(p: Period) = coll.insert(p).void
def computeForce(id: String): Funit = throttler(id)
def remove(p: Period) = coll.remove(selectId(p.id)).void
private val throttler = makeThrottler { id =>
def aggregate(period: Period.Computation, povOption: Option[RichPov], gameId: String) = povOption match {
case Some(pov) => try {
period aggregate pov
}
catch {
case e: Exception => logwarn("[StatApi] " + e); period
}
case _ => logwarn("[StatApi] invalid game " + gameId); period
}
coll.remove(selectUserId(id)) >> {
import play.api.libs.json.Json
import lila.game.tube.gameTube
import lila.game.BSONHandlers.gameBSONHandler
import lila.game.{ Game, Query }
import lila.db.api._
val gameQuery = Query.user(id) ++ Query.rated ++ Query.finished
val maxGames = 5 * 1000
pimpQB($query(gameQuery)).sort(Query.sortCreated).skip(maxGames - 1).one[Game] flatMap {
_.?? { firstGame =>
{
pimpQB($query(gameQuery ++ Json.obj(Game.BSONFields.createdAt -> $gte($date(firstGame.createdAt)))))
.sort(Query.sortChronological)
.cursor[Game]()
.enumerate(maxGames, stopOnError = true) &>
StatApi.richPov(id) |>>>
Iteratee.foldM[Option[RichPov], Periods.Computation](Periods.initComputation(id, { p => coll.insert(p).void })) {
case (comp, Some(p)) => try {
comp aggregate p
}
catch {
case e: Exception =>
e.printStackTrace
logwarn(s"[StatApi] game ${p.pov.game.id} $e"); fuccess(comp)
}
// case (comp, Some(p)) => comp aggregate p
case (comp, _) => logwarn("[StatApi] invalid pov"); fuccess(comp)
}
}.map(_.run)
}
}
}
}
}
private object StatApi {
def richPov(userId: String) = Enumeratee.mapM[lila.game.Game].apply[Option[RichPov]] { game =>
lila.game.Pov.ofUserId(game, userId) ?? { pov =>
lila.game.GameRepo.initialFen(game) zip
(game.metadata.analysed ?? lila.analyse.AnalysisRepo.doneById(game.id)) map {
case (fen, an) =>
val division = chess.Replay.boards(
moveStrs = game.pgnMoves,
initialFen = fen,
variant = game.variant
).toOption.fold(chess.Division.empty)(chess.Divider.apply)
RichPov(
pov = pov,
initialFen = fen,
analysis = an,
division = division,
accuracy = an.flatMap { lila.analyse.Accuracy(pov, _, division) },
moveAccuracy = an.map { lila.analyse.Accuracy.diffsList(pov, _) }
).some
}
}
}
def removeAll(userId: String) = coll.remove(selectUserId(userId)).void
}

View file

@ -1,17 +0,0 @@
package lila.coach
import akka.actor._
import akka.pattern.ask
private[coach] final class Throttler(system: ActorSystem, f: String => Funit) {
private implicit val timeout = makeTimeout.minutes(2)
private val actor = system.actorOf(Props(new lila.hub.SequentialProvider {
def process = {
case id: String => f(id)
}
}))
def apply(id: String): Funit = (actor ? id).void
}

View file

@ -1,6 +1,7 @@
package lila.hub
import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.util.Try
import akka.actor._
@ -36,7 +37,10 @@ final class Sequencer(receiveTimeout: FiniteDuration) extends Actor {
private def processThenDone(work: Any) {
work match {
case ReceiveTimeout => self ! PoisonPill
case Sequencer.Work(run) => run() andThenAnyway { self ! Done }
case Sequencer.Work(run, promiseOption) => run() andThenAnyway {
promiseOption.foreach(_.success(()))
self ! Done
}
case x => play.api.Logger("Sequencer").warn(s"Unsupported message $x")
}
}
@ -44,7 +48,7 @@ final class Sequencer(receiveTimeout: FiniteDuration) extends Actor {
object Sequencer {
case class Work(run: () => Funit)
case class Work(run: () => Funit, promise: Option[Promise[Unit]] = None)
def work(run: => Funit): Work = Work(() => run)
def work(run: => Funit, promise: Option[Promise[Unit]] = None): Work = Work(() => run, promise)
}