refactor concurrency limit storage
parent
7ad920ca65
commit
849ff2898f
|
@ -5,8 +5,10 @@ import play.api.libs.json.Json
|
|||
import play.api.mvc.Result
|
||||
import play.api.mvc.Results.TooManyRequests
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.collection.immutable.Queue
|
||||
import scala.concurrent.Promise
|
||||
|
||||
/** only allow one stream at a time per key
|
||||
/** only allow X streams at a time per key
|
||||
*/
|
||||
final class ConcurrencyLimit[K](
|
||||
name: String,
|
||||
|
@ -17,27 +19,23 @@ final class ConcurrencyLimit[K](
|
|||
toString: K => String = (k: K) => k.toString
|
||||
)(implicit ec: scala.concurrent.ExecutionContext) {
|
||||
|
||||
private val storage = lila.memo.CacheApi.scaffeineNoScheduler
|
||||
.expireAfterWrite(ttl)
|
||||
.build[String, Int]()
|
||||
|
||||
private val concurrentMap = storage.underlying.asMap
|
||||
private val storage = new ConcurrencyLimit.Storage(ttl, maxConcurrency, toString)
|
||||
|
||||
private lazy val logger = lila.log("concurrencylimit").branch(name)
|
||||
private lazy val monitor = lila.mon.security.concurrencyLimit(key)
|
||||
|
||||
def compose[T](k: K, msg: => String = ""): Option[Source[T, _] => Source[T, _]] =
|
||||
get(k) match {
|
||||
storage.get(k) match {
|
||||
case c @ _ if c >= maxConcurrency =>
|
||||
logger.info(s"$k $msg")
|
||||
monitor.increment()
|
||||
none
|
||||
case c @ _ =>
|
||||
inc(k)
|
||||
storage.inc(k)
|
||||
some {
|
||||
_.watchTermination() { (_, done) =>
|
||||
done.onComplete { _ =>
|
||||
dec(k)
|
||||
storage.dec(k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -49,14 +47,26 @@ final class ConcurrencyLimit[K](
|
|||
compose[T](k, msg).fold(limitedDefault(maxConcurrency)) { watch =>
|
||||
makeResult(watch(makeSource))
|
||||
}
|
||||
|
||||
private def get(k: K) = ~storage.getIfPresent(toString(k))
|
||||
private def inc(k: K) = concurrentMap.compute(toString(k), (_, c) => (~Option(c) + 1) atMost maxConcurrency)
|
||||
private def dec(k: K) = concurrentMap.computeIfPresent(toString(k), (_, c) => (c - 1) atLeast 0)
|
||||
}
|
||||
|
||||
object ConcurrencyLimit {
|
||||
|
||||
final class Storage[K](
|
||||
ttl: FiniteDuration,
|
||||
maxConcurrency: Int,
|
||||
toString: K => String = (k: K) => k.toString
|
||||
) {
|
||||
private val storage = lila.memo.CacheApi.scaffeineNoScheduler
|
||||
.expireAfterWrite(ttl)
|
||||
.build[String, Int]()
|
||||
|
||||
private val concurrentMap = storage.underlying.asMap
|
||||
|
||||
def get(k: K) = ~storage.getIfPresent(toString(k))
|
||||
def inc(k: K) = concurrentMap.compute(toString(k), (_, c) => (~Option(c) + 1) atMost maxConcurrency)
|
||||
def dec(k: K) = concurrentMap.computeIfPresent(toString(k), (_, c) => (c - 1) atLeast 0)
|
||||
}
|
||||
|
||||
def limitedDefault(max: Int) =
|
||||
TooManyRequests(Json.obj("error" -> s"Please only run $max request(s) at a time"))
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package lila.memo
|
|||
import play.api.mvc.Result
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
/** only allow one future at a time per key
|
||||
/** only allow X futures at a time per key
|
||||
*/
|
||||
final class FutureConcurrencyLimit[K](
|
||||
key: String,
|
||||
|
@ -12,29 +12,19 @@ final class FutureConcurrencyLimit[K](
|
|||
toString: K => String = (k: K) => k.toString
|
||||
)(implicit ec: scala.concurrent.ExecutionContext) {
|
||||
|
||||
private val storage = lila.memo.CacheApi.scaffeineNoScheduler
|
||||
.expireAfterWrite(ttl)
|
||||
.build[String, Int]()
|
||||
|
||||
private val concurrentMap = storage.underlying.asMap
|
||||
private val storage = new ConcurrencyLimit.Storage(ttl, maxConcurrency, toString)
|
||||
|
||||
private lazy val monitor = lila.mon.security.concurrencyLimit(key)
|
||||
|
||||
def apply(k: K, limited: => Fu[Result])(op: => Fu[Result]): Fu[Result] =
|
||||
get(k) match {
|
||||
storage.get(k) match {
|
||||
case c @ _ if c >= maxConcurrency =>
|
||||
monitor.increment()
|
||||
limited
|
||||
case c @ _ =>
|
||||
inc(k)
|
||||
storage.inc(k)
|
||||
op addEffectAnyway {
|
||||
dec(k)
|
||||
storage.dec(k).unit
|
||||
}
|
||||
}
|
||||
|
||||
private def get(k: K): Int = ~storage.getIfPresent(toString(k))
|
||||
private def inc(k: K): Unit =
|
||||
concurrentMap.compute(toString(k), (_, c) => (~Option(c) + 1) atMost maxConcurrency).unit
|
||||
private def dec(k: K): Unit =
|
||||
concurrentMap.computeIfPresent(toString(k), (_, c) => (c - 1) atLeast 0).unit
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue