connect to redis with lettuce/epoll

lettuce
Thibault Duplessis 2019-07-09 18:54:00 -04:00
parent ba9c5aa62c
commit 2806d29ecc
6 changed files with 15 additions and 78 deletions

View File

@ -33,7 +33,7 @@ libraryDependencies ++= Seq(
reactivemongo.driver, reactivemongo.iteratees, akka.actor, akka.slf4j,
maxmind, prismic, netty, guava,
kamon.core, kamon.influxdb, scalatags,
java8compat, semver, scrimage, scalaConfigs, scaffeine, lettuce
java8compat, semver, scrimage, scalaConfigs, scaffeine, lettuce, epoll
)
resourceDirectory in Assets := (sourceDirectory in Compile).value / "assets"
unmanagedResourceDirectories in Assets ++= (if (scala.sys.env.get("SERVE_ASSETS").exists(_ == "1")) Seq(baseDirectory.value / "public") else Nil)

View File

@ -497,8 +497,7 @@ practice {
}
socket {
redis {
host = localhost
port = 6379
uri = "redis-socket:///var/run/redis/redis-server.sock"
}
}
setup {

View File

@ -328,19 +328,10 @@ object mon {
val games = rec("socket.remote.sets.games")
}
val connections = rec("socket.remote.connections")
object executor {
val threads = rec("soocket.remote.executor.threads")
}
object redis {
val in = inc("socket.remote.redis.in")
val out = inc("socket.remote.redis.out")
val outError = inc("socket.remote.redis.out_error")
val publishTime = rec("socket.remote.redis.publish_time")
object pool {
val active = rec("socket.remote.redis.pool.active")
val idle = rec("socket.remote.redis.pool.idle")
val waiters = rec("socket.remote.redis.pool.waiters")
}
}
}
}

View File

@ -2,8 +2,8 @@ package lila.socket
import akka.actor._
import com.typesafe.config.Config
import scala.concurrent.duration._
import io.lettuce.core._
import scala.concurrent.duration._
import actorApi._
@ -15,11 +15,7 @@ final class Env(
settingStore: lila.memo.SettingStore.Builder
) {
private val settings = new {
val RedisHost = config getString "redis.host"
val RedisPort = config getInt "redis.port"
}
import settings._
private val RedisUri = config getString "redis.uri"
private val population = new Population(system)
@ -28,7 +24,7 @@ final class Env(
private val userRegister = new UserRegister(system)
private val remoteSocket = new RemoteSocket(
redisClient = RedisClient create RedisURI.create(RedisHost, RedisPort),
redisClient = RedisClient create RedisURI.create(RedisUri),
chanIn = "site-in",
chanOut = "site-out",
lifecycle = lifecycle,

View File

@ -96,14 +96,15 @@ private final class RemoteSocket(
}
private def send(path: String, args: String*): Unit = {
connOut.async.publish(chanOut, s"$path ${args mkString " "}")
lila.common.Chronometer.syncMon(_.socket.remote.redis.publishTime) {
connOut.async.publish(chanOut, s"$path ${args mkString " "}")
// .mon(_.socket.remote.redis.publishTime)
// .logFailure(logger)
}
redisMon.out()
// logger.warn(s"RemoteSocket.out $path", e)
// redisMon.outError()
}
private def tick(nbConn: Int): Unit = {
// println(nbIn, "nbIn")
mon.connections(nbConn)
mon.sets.users(connectedUserIds.size)
mon.sets.games(watchedGameIds.size)
@ -116,63 +117,12 @@ private final class RemoteSocket(
connIn.addListener(new pubsub.RedisPubSubAdapter[String, String] {
override def message(channel: String, message: String): Unit = {
if (channel == "bench") {
// (200000,3392)
// (300000,3410)
// (400000,3434)
// (500000,3366)
// (600000,3374)
// (700000,3415)
// (800000,3439)
// (900000,3421)
// (1000000,3430)
// (1100000,3398)
// (1200000,3407)
// (1300000,3441)
// (1400000,3458)
it = it + 1
if (it % 100000 == 0) {
println(it, (nowMillis - last).toString)
last = nowMillis
}
connOut.async.publish("bench", "bench tagada")
} else {
val parts = message.split(" ", 2)
// println(parts(0), ~parts.lift(1))
onReceive(parts(0), ~parts.lift(1))
redisMon.in()
// }
}
}
val parts = message.split(" ", 2)
onReceive(parts(0), ~parts.lift(1))
redisMon.in()
}
})
connIn.async.subscribe(chanIn)
connIn.async.subscribe("bench")
// import java.util.concurrent.{ Executors, ThreadFactory }
// import java.util.concurrent.atomic.AtomicLong
// private val ioThreadCounter = new AtomicLong(0L)
// private val ioThreadPool = Executors.newCachedThreadPool(
// new ThreadFactory {
// def newThread(r: Runnable) = {
// val th = new Thread(r)
// th.setName(s"remote-socket-redis-${ioThreadCounter.getAndIncrement}")
// th.setDaemon(true)
// th
// }
// }
// )
// private def executeBlockingIO[T](cb: => T): Unit = {
// ioThreadPool.execute(new Runnable {
// def run() = try {
// blocking(cb)
// } catch {
// case scala.util.control.NonFatal(e) =>
// logger.warn(s"RemoteSocket.out", e)
// redisMon.outError()
// }
// })
// }
lifecycle.addStopHook { () =>
logger.info("Stopping the Redis pool...")

View File

@ -44,6 +44,7 @@ object Dependencies {
val scalaUri = "io.lemonlabs" %% "scala-uri" % "1.2.0"
val scalatags = "com.lihaoyi" %% "scalatags" % "0.6.7"
val lettuce = "io.lettuce" % "lettuce-core" % "5.1.7.RELEASE"
val epoll = "io.netty" % "netty-transport-native-epoll" % "4.1.36.Final" classifier "linux-x86_64"
object reactivemongo {
val version = "0.12.4"