diff --git a/app/controllers/Coach.scala b/app/controllers/Coach.scala
index ccf11ae60a..58ed59cc2f 100644
--- a/app/controllers/Coach.scala
+++ b/app/controllers/Coach.scala
@@ -54,8 +54,8 @@ object Coach extends LilaController {
def refresh(username: String) = Open { implicit ctx =>
Accessible(username) { user =>
{
- if (isGranted(_.SuperAdmin)) env.statApi.computeForce(user.id)
- else env.statApi.computeIfOld(user.id)
+ if (isGranted(_.SuperAdmin)) env.aggregator(user.id)
+ else env.aggregator(user.id)
} inject Ok
}
}
diff --git a/app/views/coach/layout.scala.html b/app/views/coach/layout.scala.html
index e1a0d1b7c8..2d701fccbd 100644
--- a/app/views/coach/layout.scala.html
+++ b/app/views/coach/layout.scala.html
@@ -27,7 +27,6 @@
}
- Yes, I know data is borked.
Hang on while we crunch data
from @u.username @if(u.count.rated > 10 * 5000) {
diff --git a/modules/coach/src/main/Aggregator.scala b/modules/coach/src/main/Aggregator.scala
new file mode 100644
index 0000000000..d35686ad7b
--- /dev/null
+++ b/modules/coach/src/main/Aggregator.scala
@@ -0,0 +1,83 @@
+package lila.coach
+
+import akka.actor._
+import akka.pattern.ask
+import org.joda.time.DateTime
+import play.api.libs.iteratee._
+import play.api.libs.json.Json
+import reactivemongo.bson._
+
+import lila.db.api._
+import lila.db.BSON._
+import lila.db.Implicits._
+import lila.game.BSONHandlers.gameBSONHandler
+import lila.game.tube.gameTube
+import lila.game.{ Game, Query }
+import lila.hub.Sequencer
+
+final class Aggregator(api: StatApi, sequencer: ActorRef) {
+
+ private implicit val timeout = makeTimeout.minutes(60)
+
+ def apply(userId: String): Funit = {
+ val p = scala.concurrent.Promise[Unit]()
+ sequencer ? Sequencer.work(compute(userId), p.some)
+ p.future
+ }
+
+ private def compute(userId: String): Funit = api.fetchLast(userId) flatMap {
+ case None => fromScratch(userId)
+ case Some(p) => api.remove(p) >> computeFrom(userId, p.from)
+ }
+
+ private def fromScratch(userId: String): Funit =
+ pimpQB($query(gameQuery(userId))).sort(Query.sortCreated).skip(maxGames - 1).one[Game] flatMap {
+ _.?? { g => computeFrom(userId, g.createdAt) }
+ }
+
+ private def gameQuery(userId: String) = Query.user(userId) ++ Query.rated ++ Query.finished
+ private val maxGames = 5 * 1000
+
+ private def computeFrom(userId: String, from: DateTime): Funit = {
+ pimpQB($query(gameQuery(userId) ++ Json.obj(Game.BSONFields.createdAt -> $gte($date(from)))))
+ .sort(Query.sortChronological)
+ .cursor[Game]()
+ .enumerate(maxGames, stopOnError = true) &>
+ richPovEnumeratee(userId) |>>>
+ Iteratee.foldM[Option[RichPov], Periods.Computation](Periods.initComputation(userId, api.insert)) {
+ case (comp, Some(p)) => try {
+ comp aggregate p
+ }
+ catch {
+ case e: Exception =>
+ e.printStackTrace
+ logwarn(s"[StatApi] game ${p.pov.game.id} $e"); fuccess(comp)
+ }
+ // case (comp, Some(p)) => comp aggregate p
+ case (comp, _) => logwarn("[StatApi] invalid pov"); fuccess(comp)
+ }
+ }.map(_.run)
+
+ private def richPovEnumeratee(userId: String) =
+ Enumeratee.mapM[lila.game.Game].apply[Option[RichPov]] { game =>
+ lila.game.Pov.ofUserId(game, userId) ?? { pov =>
+ lila.game.GameRepo.initialFen(game) zip
+ (game.metadata.analysed ?? lila.analyse.AnalysisRepo.doneById(game.id)) map {
+ case (fen, an) =>
+ val division = chess.Replay.boards(
+ moveStrs = game.pgnMoves,
+ initialFen = fen,
+ variant = game.variant
+ ).toOption.fold(chess.Division.empty)(chess.Divider.apply)
+ RichPov(
+ pov = pov,
+ initialFen = fen,
+ analysis = an,
+ division = division,
+ accuracy = an.flatMap { lila.analyse.Accuracy(pov, _, division) },
+ moveAccuracy = an.map { lila.analyse.Accuracy.diffsList(pov, _) }
+ ).some
+ }
+ }
+ }
+}
diff --git a/modules/coach/src/main/Env.scala b/modules/coach/src/main/Env.scala
index 82abe994a2..3f78018a47 100644
--- a/modules/coach/src/main/Env.scala
+++ b/modules/coach/src/main/Env.scala
@@ -27,8 +27,11 @@ final class Env(
lazy val jsonView = new JsonView(jsonWriters)
lazy val statApi = new StatApi(
- coll = db(CollectionStat),
- makeThrottler = f => new Throttler(system, f))
+ coll = db(CollectionStat))
+
+ lazy val aggregator = new Aggregator(
+ api = statApi,
+ sequencer = system.actorOf(Props(classOf[lila.hub.Sequencer], 5 minutes)))
}
object Env {
diff --git a/modules/coach/src/main/StatApi.scala b/modules/coach/src/main/StatApi.scala
index 83429af1d3..9d5f12e929 100644
--- a/modules/coach/src/main/StatApi.scala
+++ b/modules/coach/src/main/StatApi.scala
@@ -7,19 +7,18 @@ import reactivemongo.bson.Macros
import reactivemongo.core.commands._
import scala.concurrent.duration._
-import lila.db.api.$query
import lila.db.BSON._
import lila.db.Implicits._
import lila.user.UserRepo
-final class StatApi(
- coll: Coll,
- makeThrottler: (String => Funit) => Throttler) {
+final class StatApi(coll: Coll) {
import BSONHandlers._
+ private def selectId(id: String) = BSONDocument("_id" -> id)
private def selectUserId(id: String) = BSONDocument("userId" -> id)
private val sortChronological = BSONDocument("from" -> 1)
+ private val sortAntiChronological = BSONDocument("from" -> -1)
def fetchRange(userId: String, range: Option[Range]): Fu[Option[Period]] =
range.fold(fetchAll(userId)) { r =>
@@ -40,82 +39,15 @@ final class StatApi(
def fetchFirst(userId: String): Fu[Option[Period]] =
fetchRange(userId, Range(0, 1).some)
+ def fetchLast(userId: String): Fu[Option[Period]] =
+ coll.find(selectUserId(userId)).sort(sortAntiChronological).one[Period]
+
def count(userId: String): Fu[Int] =
coll.count(selectUserId(userId).some)
- def computeIfOld(id: String): Funit = fetchFirst(id) flatMap {
- case Some(stat) => funit
- case _ => throttler(id)
- }
+ def insert(p: Period) = coll.insert(p).void
- def computeForce(id: String): Funit = throttler(id)
+ def remove(p: Period) = coll.remove(selectId(p.id)).void
- private val throttler = makeThrottler { id =>
- def aggregate(period: Period.Computation, povOption: Option[RichPov], gameId: String) = povOption match {
- case Some(pov) => try {
- period aggregate pov
- }
- catch {
- case e: Exception => logwarn("[StatApi] " + e); period
- }
- case _ => logwarn("[StatApi] invalid game " + gameId); period
- }
- coll.remove(selectUserId(id)) >> {
- import play.api.libs.json.Json
- import lila.game.tube.gameTube
- import lila.game.BSONHandlers.gameBSONHandler
- import lila.game.{ Game, Query }
- import lila.db.api._
- val gameQuery = Query.user(id) ++ Query.rated ++ Query.finished
- val maxGames = 5 * 1000
- pimpQB($query(gameQuery)).sort(Query.sortCreated).skip(maxGames - 1).one[Game] flatMap {
- _.?? { firstGame =>
- {
- pimpQB($query(gameQuery ++ Json.obj(Game.BSONFields.createdAt -> $gte($date(firstGame.createdAt)))))
- .sort(Query.sortChronological)
- .cursor[Game]()
- .enumerate(maxGames, stopOnError = true) &>
- StatApi.richPov(id) |>>>
- Iteratee.foldM[Option[RichPov], Periods.Computation](Periods.initComputation(id, { p => coll.insert(p).void })) {
- case (comp, Some(p)) => try {
- comp aggregate p
- }
- catch {
- case e: Exception =>
- e.printStackTrace
- logwarn(s"[StatApi] game ${p.pov.game.id} $e"); fuccess(comp)
- }
- // case (comp, Some(p)) => comp aggregate p
- case (comp, _) => logwarn("[StatApi] invalid pov"); fuccess(comp)
- }
- }.map(_.run)
- }
- }
- }
- }
-}
-
-private object StatApi {
-
- def richPov(userId: String) = Enumeratee.mapM[lila.game.Game].apply[Option[RichPov]] { game =>
- lila.game.Pov.ofUserId(game, userId) ?? { pov =>
- lila.game.GameRepo.initialFen(game) zip
- (game.metadata.analysed ?? lila.analyse.AnalysisRepo.doneById(game.id)) map {
- case (fen, an) =>
- val division = chess.Replay.boards(
- moveStrs = game.pgnMoves,
- initialFen = fen,
- variant = game.variant
- ).toOption.fold(chess.Division.empty)(chess.Divider.apply)
- RichPov(
- pov = pov,
- initialFen = fen,
- analysis = an,
- division = division,
- accuracy = an.flatMap { lila.analyse.Accuracy(pov, _, division) },
- moveAccuracy = an.map { lila.analyse.Accuracy.diffsList(pov, _) }
- ).some
- }
- }
- }
+ def removeAll(userId: String) = coll.remove(selectUserId(userId)).void
}
diff --git a/modules/coach/src/main/Throttler.scala b/modules/coach/src/main/Throttler.scala
deleted file mode 100644
index f0b9de5d66..0000000000
--- a/modules/coach/src/main/Throttler.scala
+++ /dev/null
@@ -1,17 +0,0 @@
-package lila.coach
-
-import akka.actor._
-import akka.pattern.ask
-
-private[coach] final class Throttler(system: ActorSystem, f: String => Funit) {
-
- private implicit val timeout = makeTimeout.minutes(2)
-
- private val actor = system.actorOf(Props(new lila.hub.SequentialProvider {
- def process = {
- case id: String => f(id)
- }
- }))
-
- def apply(id: String): Funit = (actor ? id).void
-}
diff --git a/modules/hub/src/main/Sequencer.scala b/modules/hub/src/main/Sequencer.scala
index 9cb3a40a3b..dfae1b089d 100644
--- a/modules/hub/src/main/Sequencer.scala
+++ b/modules/hub/src/main/Sequencer.scala
@@ -1,6 +1,7 @@
package lila.hub
import scala.concurrent.duration._
+import scala.concurrent.Promise
import scala.util.Try
import akka.actor._
@@ -36,7 +37,10 @@ final class Sequencer(receiveTimeout: FiniteDuration) extends Actor {
private def processThenDone(work: Any) {
work match {
case ReceiveTimeout => self ! PoisonPill
- case Sequencer.Work(run) => run() andThenAnyway { self ! Done }
+ case Sequencer.Work(run, promiseOption) => run() andThenAnyway {
+ promiseOption.foreach(_.success(()))
+ self ! Done
+ }
case x => play.api.Logger("Sequencer").warn(s"Unsupported message $x")
}
}
@@ -44,7 +48,7 @@ final class Sequencer(receiveTimeout: FiniteDuration) extends Actor {
object Sequencer {
- case class Work(run: () => Funit)
+ case class Work(run: () => Funit, promise: Option[Promise[Unit]] = None)
- def work(run: => Funit): Work = Work(() => run)
+ def work(run: => Funit, promise: Option[Promise[Unit]] = None): Work = Work(() => run, promise)
}