rewrite MongoCache with Scaffeine
This commit is contained in:
parent
591b74f26d
commit
3af9dd4e2b
|
@ -200,7 +200,7 @@ object User extends LilaController {
|
|||
def topWeek = Open { implicit ctx =>
|
||||
negotiate(
|
||||
html = notFound,
|
||||
api = _ => env.cached.topWeek(true).map { users =>
|
||||
api = _ => env.cached.topWeek(()).map { users =>
|
||||
Ok(Json toJson users.map(env.jsonView.lightPerfIsOnline))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ import play.api.libs.json._
|
|||
|
||||
final class Preload(
|
||||
tv: Tv,
|
||||
leaderboard: Boolean => Fu[List[User.LightPerf]],
|
||||
leaderboard: Unit => Fu[List[User.LightPerf]],
|
||||
tourneyWinners: Fu[List[Winner]],
|
||||
timelineEntries: String => Fu[List[Entry]],
|
||||
streamsOnAir: () => Fu[List[StreamOnAir]],
|
||||
|
@ -40,7 +40,7 @@ final class Preload(
|
|||
simuls zip
|
||||
tv.getBestGame zip
|
||||
(ctx.userId ?? timelineEntries) zip
|
||||
leaderboard(true) zip
|
||||
leaderboard(()) zip
|
||||
tourneyWinners zip
|
||||
dailyPuzzle() zip
|
||||
streamsOnAir() zip
|
||||
|
|
|
@ -11,12 +11,12 @@ final class Cached(
|
|||
mongoCache: MongoCache.Builder,
|
||||
defaultTtl: FiniteDuration) {
|
||||
|
||||
def nbImportedBy(userId: String): Fu[Int] = count(Query imported userId)
|
||||
def clearNbImportedByCache(userId: String) = count.remove(Query imported userId)
|
||||
def nbImportedBy(userId: String): Fu[Int] = countCache(Query imported userId)
|
||||
def clearNbImportedByCache(userId: String) = countCache.remove(Query imported userId)
|
||||
|
||||
def nbPlaying(userId: String): Fu[Int] = countShortTtl(Query nowPlaying userId)
|
||||
|
||||
def nbTotal: Fu[Int] = count($empty)
|
||||
def nbTotal: Fu[Int] = countCache($empty)
|
||||
|
||||
private implicit val userHandler = User.userBSONHandler
|
||||
|
||||
|
@ -29,7 +29,7 @@ final class Cached(
|
|||
f = (o: Bdoc) => coll countSel o,
|
||||
timeToLive = 5.seconds)
|
||||
|
||||
private val count = mongoCache(
|
||||
private val countCache = mongoCache(
|
||||
prefix = "game:count",
|
||||
f = (o: Bdoc) => coll countSel o,
|
||||
timeToLive = defaultTtl,
|
||||
|
|
|
@ -1,40 +1,44 @@
|
|||
package lila.memo
|
||||
|
||||
import com.github.blemale.scaffeine.{ Scaffeine, Cache }
|
||||
import org.joda.time.DateTime
|
||||
import reactivemongo.bson._
|
||||
import reactivemongo.bson.Macros
|
||||
import scala.concurrent.duration._
|
||||
import spray.caching.{ LruCache, Cache }
|
||||
|
||||
import lila.db.BSON.BSONJodaDateTimeHandler
|
||||
import lila.db.dsl._
|
||||
|
||||
final class MongoCache[K, V: MongoCache.Handler] private (
|
||||
prefix: String,
|
||||
expiresAt: () => DateTime,
|
||||
cache: Cache[V],
|
||||
cache: Cache[K, Fu[V]],
|
||||
mongoExpiresAt: () => DateTime,
|
||||
coll: Coll,
|
||||
f: K => Fu[V],
|
||||
keyToString: K => String) {
|
||||
|
||||
def apply(k: K): Fu[V] = cache(k) {
|
||||
def apply(k: K): Fu[V] = cache.get(k, k =>
|
||||
coll.find(select(k)).uno[Entry] flatMap {
|
||||
case None => f(k) flatMap { v =>
|
||||
coll.insert(makeEntry(k, v)) recover
|
||||
lila.db.recoverDuplicateKey(_ => ()) inject v
|
||||
persist(k, v) inject v
|
||||
}
|
||||
case Some(entry) => fuccess(entry.v)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
def remove(k: K): Funit =
|
||||
coll.remove(select(k)).void >>- (cache remove k)
|
||||
def remove(k: K): Funit = {
|
||||
val fut = f(k)
|
||||
cache.put(k, fut)
|
||||
fut flatMap { v =>
|
||||
persist(k, v).void
|
||||
}
|
||||
}
|
||||
|
||||
private case class Entry(_id: String, v: V, e: DateTime)
|
||||
|
||||
private implicit val entryBSONHandler = Macros.handler[Entry]
|
||||
|
||||
private def makeEntry(k: K, v: V) = Entry(makeKey(k), v, expiresAt())
|
||||
private def persist(k: K, v: V): Funit =
|
||||
coll.insert(Entry(makeKey(k), v, mongoExpiresAt())).void recover lila.db.recoverDuplicateKey(_ => ())
|
||||
|
||||
private def makeKey(k: K) = s"$prefix:${keyToString(k)}"
|
||||
|
||||
|
@ -45,22 +49,28 @@ object MongoCache {
|
|||
|
||||
private type Handler[T] = BSONHandler[_ <: BSONValue, T]
|
||||
|
||||
private def expiresAt(ttl: Duration)(): DateTime =
|
||||
DateTime.now plusSeconds ttl.toSeconds.toInt
|
||||
// expire in mongo 3 seconds before in heap,
|
||||
// to make sure the mongo cache is cleared
|
||||
// when the heap value expires
|
||||
private def mongoExpiresAt(ttl: FiniteDuration): () => DateTime = {
|
||||
val seconds = ttl.toSeconds.toInt - 3
|
||||
() => DateTime.now plusSeconds seconds
|
||||
}
|
||||
|
||||
final class Builder(coll: Coll) {
|
||||
|
||||
def apply[K, V: Handler](
|
||||
prefix: String,
|
||||
f: K => Fu[V],
|
||||
maxCapacity: Int = 512,
|
||||
initialCapacity: Int = 64,
|
||||
maxCapacity: Int = 1024,
|
||||
timeToLive: FiniteDuration,
|
||||
timeToLiveMongo: Option[FiniteDuration] = None,
|
||||
keyToString: K => String): MongoCache[K, V] = new MongoCache[K, V](
|
||||
prefix = prefix,
|
||||
expiresAt = expiresAt(timeToLiveMongo | timeToLive),
|
||||
cache = LruCache(maxCapacity, initialCapacity, timeToLive),
|
||||
cache = Scaffeine()
|
||||
.expireAfterWrite(timeToLive)
|
||||
.maximumSize(maxCapacity)
|
||||
.build[K, Fu[V]],
|
||||
mongoExpiresAt = mongoExpiresAt(timeToLive),
|
||||
coll = coll,
|
||||
f = f,
|
||||
keyToString = keyToString)
|
||||
|
@ -68,11 +78,13 @@ object MongoCache {
|
|||
def single[V: Handler](
|
||||
prefix: String,
|
||||
f: => Fu[V],
|
||||
timeToLive: FiniteDuration,
|
||||
timeToLiveMongo: Option[FiniteDuration] = None) = new MongoCache[Boolean, V](
|
||||
timeToLive: FiniteDuration) = new MongoCache[Unit, V](
|
||||
prefix = prefix,
|
||||
expiresAt = expiresAt(timeToLiveMongo | timeToLive),
|
||||
cache = LruCache(timeToLive = timeToLive),
|
||||
cache = Scaffeine()
|
||||
.expireAfterWrite(timeToLive)
|
||||
.maximumSize(1)
|
||||
.build[Unit, Fu[V]],
|
||||
mongoExpiresAt = mongoExpiresAt(timeToLive),
|
||||
coll = coll,
|
||||
f = _ => f,
|
||||
keyToString = _.toString)
|
||||
|
|
|
@ -18,8 +18,7 @@ final class TournamentStatsApi(mongoCache: lila.memo.MongoCache.Builder) {
|
|||
prefix = "tournament:stats",
|
||||
keyToString = identity,
|
||||
f = fetch,
|
||||
timeToLive = 10 minutes,
|
||||
timeToLiveMongo = 90.days.some)
|
||||
timeToLive = 10 minutes)
|
||||
|
||||
private def fetch(tournamentId: Tournament.ID): Fu[TournamentStats] = for {
|
||||
rating <- PlayerRepo.averageRating(tournamentId)
|
||||
|
|
|
@ -116,12 +116,12 @@ final class WinnersApi(
|
|||
f = fetchAll,
|
||||
timeToLive = ttl)
|
||||
|
||||
def all: Fu[AllWinners] = allCache(true)
|
||||
def all: Fu[AllWinners] = allCache(())
|
||||
|
||||
// because we read on secondaries, delay cache clear
|
||||
def clearCache(tour: Tournament) =
|
||||
if (tour.schedule.exists(_.freq.isDailyOrBetter))
|
||||
scheduler.once(5.seconds) { allCache.remove(true) }
|
||||
scheduler.once(5.seconds) { allCache.remove(()) }
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue