new cache API - WIP
parent
19472510b4
commit
7767769149
|
@ -193,6 +193,7 @@ final class Team(
|
|||
)
|
||||
|
||||
def requests = Auth { implicit ctx => me =>
|
||||
import lila.memo.CacheApi._
|
||||
env.team.cached.nbRequests invalidate me.id
|
||||
api requestsWithUsers me map { html.team.request.all(_) }
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package lila.memo
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import com.github.benmanes.caffeine.cache.{ Cache => CaffeineCache }
|
||||
import com.github.blemale.scaffeine.{ AsyncLoadingCache, Cache, Scaffeine }
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
@ -52,7 +51,7 @@ final class AsyncCacheSingle[V](cache: AsyncLoadingCache[Unit, V], f: Unit => Fu
|
|||
|
||||
object AsyncCache {
|
||||
|
||||
final class Builder(implicit ec: ExecutionContext, system: ActorSystem) {
|
||||
final class Builder(api: CacheApi)(implicit ec: ExecutionContext, system: ActorSystem) {
|
||||
|
||||
def multi[K, V](
|
||||
name: String,
|
||||
|
@ -67,10 +66,10 @@ object AsyncCache {
|
|||
lila.base.LilaException(s"AsyncCache.multi $name key=$k timed out after $resultTimeout")
|
||||
)
|
||||
val cache: AsyncLoadingCache[K, V] = makeExpire(
|
||||
Scaffeine().maximumSize(maxCapacity),
|
||||
api.scaffeine.maximumSize(maxCapacity),
|
||||
expireAfter
|
||||
).recordStats.buildAsyncFuture(safeF)
|
||||
startMonitoring(name, cache.underlying.synchronous)
|
||||
api.monitor(name, cache.underlying.synchronous)
|
||||
new AsyncCache[K, V](cache, safeF)
|
||||
}
|
||||
|
||||
|
@ -88,10 +87,10 @@ object AsyncCache {
|
|||
lila.base.LilaException(s"$fullName key=$k timed out after $resultTimeout")
|
||||
)
|
||||
val cache: Cache[K, Fu[V]] = makeExpire(
|
||||
Scaffeine().maximumSize(maxCapacity),
|
||||
api.scaffeine.maximumSize(maxCapacity),
|
||||
expireAfter
|
||||
).recordStats.build[K, Fu[V]]
|
||||
startMonitoring(name, cache.underlying)
|
||||
api.monitor(name, cache.underlying)
|
||||
new AsyncCacheClearable[K, V](cache, safeF, logger = logger branch fullName)
|
||||
}
|
||||
|
||||
|
@ -107,22 +106,14 @@ object AsyncCache {
|
|||
resultTimeout,
|
||||
lila.base.LilaException(s"AsyncCache.single $name single timed out after $resultTimeout")
|
||||
)
|
||||
val builder = makeExpire(Scaffeine().maximumSize(1), expireAfter)
|
||||
val builder = makeExpire(api.scaffeine.maximumSize(1), expireAfter)
|
||||
if (monitor) builder.recordStats
|
||||
val cache: AsyncLoadingCache[Unit, V] = builder.buildAsyncFuture(safeF)
|
||||
if (monitor) startMonitoring(name, cache.underlying.synchronous)
|
||||
if (monitor) api.monitor(name, cache.underlying.synchronous)
|
||||
new AsyncCacheSingle[V](cache, safeF)
|
||||
}
|
||||
}
|
||||
|
||||
private[memo] def startMonitoring(
|
||||
name: String,
|
||||
cache: CaffeineCache[_, _]
|
||||
)(implicit ec: ExecutionContext, system: ActorSystem): Unit =
|
||||
system.scheduler.scheduleWithFixedDelay(1 minute, 1 minute) { () =>
|
||||
lila.mon.caffeineStats(cache, name)
|
||||
}
|
||||
|
||||
sealed trait ExpireAfter
|
||||
case class ExpireAfterAccess(duration: FiniteDuration) extends ExpireAfter
|
||||
case class ExpireAfterWrite(duration: FiniteDuration) extends ExpireAfter
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
package lila.memo
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import com.github.benmanes.caffeine
|
||||
import com.github.blemale.{ scaffeine => s }
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
final class CacheApi(implicit ec: ExecutionContext, system: ActorSystem) {
|
||||
|
||||
private type Builder = s.Scaffeine[Any, Any]
|
||||
|
||||
val scaffeine: Builder =
|
||||
s.Scaffeine().scheduler(caffeine.cache.Scheduler.systemScheduler)
|
||||
|
||||
def asyncLoading[K, V](
|
||||
name: String
|
||||
)(build: Builder => s.AsyncLoadingCache[K, V]): s.AsyncLoadingCache[K, V] = {
|
||||
val cache = build(scaffeine)
|
||||
monitor(name, cache)
|
||||
cache
|
||||
}
|
||||
|
||||
def monitor(name: String, cache: s.AsyncCache[_, _]): Unit =
|
||||
monitor(name, cache.underlying.synchronous)
|
||||
|
||||
def monitor(name: String, cache: s.Cache[_, _]): Unit =
|
||||
monitor(name, cache.underlying)
|
||||
|
||||
def monitor(name: String, cache: caffeine.cache.Cache[_, _]): Unit =
|
||||
CacheApi.startMonitor(name, cache)
|
||||
}
|
||||
|
||||
object CacheApi {
|
||||
|
||||
implicit def beafedAsync[K, V](cache: s.AsyncCache[K, V]) = new BeafedAsync[K, V](cache)
|
||||
|
||||
private[memo] def startMonitor(
|
||||
name: String,
|
||||
cache: caffeine.cache.Cache[_, _]
|
||||
)(implicit ec: ExecutionContext, system: ActorSystem): Unit =
|
||||
system.scheduler.scheduleWithFixedDelay(1 minute, 1 minute) { () =>
|
||||
lila.mon.caffeineStats(cache, name)
|
||||
}
|
||||
}
|
||||
|
||||
final class BeafedAsync[K, V](cache: s.AsyncCache[K, V]) {
|
||||
|
||||
def invalidate(key: K): Unit = cache.underlying.synchronous invalidate key
|
||||
}
|
|
@ -25,5 +25,7 @@ final class Env(
|
|||
|
||||
lazy val settingStore = wire[SettingStore.Builder]
|
||||
|
||||
lazy val cache = wire[CacheApi]
|
||||
|
||||
lazy val asyncCache = wire[AsyncCache.Builder]
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ final class Syncache[K, V](
|
|||
private val cache: LoadingCache[K, Fu[V]] =
|
||||
Caffeine
|
||||
.newBuilder()
|
||||
.scheduler(Scheduler.systemScheduler)
|
||||
.asInstanceOf[Caffeine[K, Fu[V]]]
|
||||
.initialCapacity(initialCapacity)
|
||||
.pipe { c =>
|
||||
|
@ -55,9 +56,9 @@ final class Syncache[K, V](
|
|||
default(k)
|
||||
}
|
||||
})
|
||||
.tap {
|
||||
lila.memo.AsyncCache.startMonitoring(s"syncache.$name", _)
|
||||
}
|
||||
.tap {
|
||||
lila.memo.CacheApi.startMonitor(s"syncache.$name", _)
|
||||
}
|
||||
|
||||
// get the value asynchronously, never blocks (preferred)
|
||||
def async(k: K): Fu[V] = cache get k
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
package lila.team
|
||||
|
||||
import lila.memo.Syncache
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import lila.memo.Syncache
|
||||
import lila.user.User
|
||||
|
||||
final class Cached(
|
||||
teamRepo: TeamRepo,
|
||||
memberRepo: MemberRepo,
|
||||
requestRepo: RequestRepo,
|
||||
asyncCache: lila.memo.AsyncCache.Builder
|
||||
cacheApi: lila.memo.CacheApi
|
||||
)(implicit ec: scala.concurrent.ExecutionContext, system: akka.actor.ActorSystem) {
|
||||
|
||||
val nameCache = new Syncache[String, Option[String]](
|
||||
|
@ -24,26 +26,28 @@ final class Cached(
|
|||
|
||||
def preloadSet = nameCache preloadSet _
|
||||
|
||||
// ~ 50k entries as of 14/12/19
|
||||
private val teamIdsCache = new Syncache[lila.user.User.ID, Team.IdsStr](
|
||||
private val teamIdsCache = new Syncache[User.ID, Team.IdsStr](
|
||||
name = "team.ids",
|
||||
initialCapacity = 32768,
|
||||
initialCapacity = 65536,
|
||||
compute = u => memberRepo.teamIdsByUser(u).dmap(Team.IdsStr.apply),
|
||||
default = _ => Team.IdsStr.empty,
|
||||
strategy = Syncache.WaitAfterUptime(20 millis),
|
||||
expireAfter = Syncache.ExpireAfterAccess(1 hour),
|
||||
expireAfter = Syncache.ExpireAfterWrite(1 hour),
|
||||
logger = logger
|
||||
)
|
||||
|
||||
def syncTeamIds = teamIdsCache sync _
|
||||
def teamIds = teamIdsCache async _
|
||||
def teamIdsList(userId: lila.user.User.ID) = teamIds(userId).dmap(_.toList)
|
||||
def syncTeamIds = teamIdsCache sync _
|
||||
def teamIds = teamIdsCache async _
|
||||
def teamIdsList(userId: User.ID) = teamIds(userId).dmap(_.toList)
|
||||
|
||||
def invalidateTeamIds = teamIdsCache invalidate _
|
||||
|
||||
val nbRequests = asyncCache.clearable[lila.user.User.ID, Int](
|
||||
name = "team.nbRequests",
|
||||
f = userId => teamRepo teamIdsByCreator userId flatMap requestRepo.countByTeams,
|
||||
expireAfter = _.ExpireAfterAccess(15 minutes)
|
||||
)
|
||||
val nbRequests = cacheApi.asyncLoading[User.ID, Int]("team.nbRequests") {
|
||||
_.expireAfterAccess(30 minutes)
|
||||
.initialCapacity(32768)
|
||||
.maximumSize(65536)
|
||||
.buildAsyncFuture[User.ID, Int] { userId =>
|
||||
teamRepo teamIdsByCreator userId flatMap requestRepo.countByTeams,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,8 @@ final class Env(
|
|||
userRepo: lila.user.UserRepo,
|
||||
modLog: ModlogApi,
|
||||
notifyApi: NotifyApi,
|
||||
asyncCache: lila.memo.AsyncCache.Builder,
|
||||
// asyncCache: lila.memo.AsyncCache.Builder,
|
||||
cacheApi: lila.memo.CacheApi,
|
||||
db: lila.db.Db
|
||||
)(implicit ec: scala.concurrent.ExecutionContext, system: ActorSystem) {
|
||||
|
||||
|
|
|
@ -18,13 +18,13 @@ final class RequestRepo(val coll: Coll)(implicit ec: scala.concurrent.ExecutionC
|
|||
coll.countSel(teamQuery(teamId))
|
||||
|
||||
def countByTeams(teamIds: List[ID]): Fu[Int] =
|
||||
coll.countSel(teamsQuery(teamIds))
|
||||
teamIds.nonEmpty ?? coll.countSel(teamsQuery(teamIds))
|
||||
|
||||
def findByTeam(teamId: ID): Fu[List[Request]] =
|
||||
coll.list[Request](teamQuery(teamId))
|
||||
|
||||
def findByTeams(teamIds: List[ID]): Fu[List[Request]] =
|
||||
coll.list[Request](teamsQuery(teamIds))
|
||||
teamIds.nonEmpty ?? coll.list[Request](teamsQuery(teamIds))
|
||||
|
||||
def selectId(teamId: ID, userId: ID) = $id(Request.makeId(teamId, userId))
|
||||
def teamQuery(teamId: ID) = $doc("team" -> teamId)
|
||||
|
|
|
@ -7,6 +7,7 @@ import lila.hub.actorApi.team.{ CreateTeam, JoinTeam }
|
|||
import lila.hub.actorApi.timeline.{ Propagate, TeamCreate, TeamJoin }
|
||||
import lila.hub.LightTeam
|
||||
import lila.mod.ModlogApi
|
||||
import lila.memo.CacheApi._
|
||||
import lila.user.{ User, UserRepo }
|
||||
import org.joda.time.Period
|
||||
import reactivemongo.api.{ Cursor, ReadPreference }
|
||||
|
|
Loading…
Reference in New Issue