stream period persistence to control aggregation memory

This commit is contained in:
Thibault Duplessis 2015-07-25 02:24:39 +02:00
parent 4cd65bbfcb
commit a6562ca0e3
2 changed files with 21 additions and 29 deletions

View file

@ -52,27 +52,26 @@ object Periods {
case class Computation(
userId: String,
period: Option[Period.Computation],
periods: List[Period]) {
save: Period => Funit,
period: Option[Period.Computation]) {
def aggregate(p: RichPov) = ((period, periods, p) match {
case (None, acc, pov) => Period.initComputation(userId, p) -> acc
case (Some(comp), acc, pov) if comp.nbGames >= 100 => Period.initComputation(userId, p) -> (comp.run :: acc)
case (Some(comp), acc, pov) => comp.aggregate(p) -> acc
}) match {
case (comp, acc) => copy(
period = comp.some,
periods = acc)
def aggregate(p: RichPov): Fu[Computation] = ((period, p) match {
case (None, p) => fuccess {
Period.initComputation(userId, p)
}
case (Some(comp), p) if comp.nbGames >= 100 => save(comp.run) inject {
Period.initComputation(userId, p)
}
case (Some(comp), p) => fuccess {
comp.aggregate(p)
}
}) map { comp =>
copy(period = comp.some)
}
def run = period.map(_ -> periods).flatMap {
case (comp, Nil) if comp.nbGames == 0 => None
case (comp, p :: ps) if comp.nbGames == 0 => Periods(NonEmptyList(p, ps: _*)).some
case (comp, Nil) => Periods(NonEmptyList(comp.run)).some
case (comp, periods) => Periods(NonEmptyList(comp.run, periods: _*)).some
}
def run: Funit = period.filter(_.nbGames > 0) ?? { p => save(p.run) }
}
def initComputation(userId: String) = Computation(userId, None, Nil)
def initComputation(userId: String, save: Period => Funit) = Computation(userId, save, None)
def build(userId: String, pov: RichPov) = Period(
userId = userId,

View file

@ -33,9 +33,7 @@ final class StatApi(
.enumerate(r.size) &>
Enumeratee.take(r.size) |>>>
Iteratee.fold[Period, Option[Period]](none) {
case (a, b) =>
println(b.data.results.base.nbGames)
a.fold(b)(_ merge b).some
case (a, b) => a.fold(b)(_ merge b).some
}
}
@ -72,24 +70,19 @@ final class StatApi(
.cursor[lila.game.Game]()
.enumerate(10 * 1000, stopOnError = true) &>
StatApi.richPov(id) |>>>
Iteratee.fold[Option[RichPov], Periods.Computation](Periods.initComputation(id)) {
Iteratee.foldM[Option[RichPov], Periods.Computation](Periods.initComputation(id, { p => coll.insert(p).void })) {
case (comp, Some(p)) => try {
comp aggregate p
}
catch {
case e: Exception =>
e.printStackTrace
logwarn(s"[StatApi] game ${p.pov.game.id} $e"); comp
logwarn(s"[StatApi] game ${p.pov.game.id} $e"); fuccess(comp)
}
// case (comp, Some(p)) => comp aggregate p
case (comp, _) =>
logwarn("[StatApi] invalid pov"); comp
case (comp, _) => logwarn("[StatApi] invalid pov"); fuccess(comp)
}
}.map(_.run).flatten("[StatApi] Nothing to persist") flatMap {
_.periods.list.map { p =>
coll.insert(p)
}.sequenceFu.void
}
}.map(_.run)
}
}
}