bidir redis pub/sub with Jedis

howler-resume
Thibault Duplessis 2019-07-01 12:31:13 -04:00
parent 92cc9bc4ef
commit 96ce2bc5a9
4 changed files with 29 additions and 22 deletions

View File

@ -33,7 +33,7 @@ libraryDependencies ++= Seq(
reactivemongo.driver, reactivemongo.iteratees, akka.actor, akka.slf4j,
maxmind, prismic, netty, guava,
kamon.core, kamon.influxdb, scalatags,
java8compat, semver, scrimage, scalaConfigs, scaffeine, scredis
java8compat, semver, scrimage, scalaConfigs, scaffeine, jedis
)
resourceDirectory in Assets := (sourceDirectory in Compile).value / "assets"
unmanagedResourceDirectories in Assets ++= (if (scala.sys.env.get("SERVE_ASSETS").exists(_ == "1")) Seq(baseDirectory.value / "public") else Nil)
@ -391,7 +391,7 @@ lazy val tree = module("tree", Seq(common)).settings(
)
lazy val socket = module("socket", Seq(common, hub, memo, tree)).settings(
libraryDependencies ++= provided(play.api, reactivemongo.driver, scredis)
libraryDependencies ++= provided(play.api, reactivemongo.driver, jedis)
)
lazy val hub = module("hub", Seq(common)).settings(

View File

@ -3,7 +3,7 @@ package lila.socket
import akka.actor._
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.concurrent.Future
import redis.clients.jedis.Jedis
import actorApi._
@ -26,20 +26,14 @@ final class Env(
private val userRegister = new UserRegister(system)
private val redis = new scredis.Redis(host = RedisHost, port = RedisPort)
private val remoteSocket = new RemoteSocket(
redis = redis,
makeRedis = () => new Jedis(RedisHost, RedisPort),
chanIn = "site-in",
chanOut = "site-out",
lifecycle = lifecycle,
bus = system.lilaBus
)
lifecycle.addStopHook { () =>
logger.info("Stopping the Redis client...")
Future(redis.quit())
}
system.scheduler.schedule(5 seconds, 1 seconds) { population ! PopulationTell }
val socketDebugSetting = settingStore[Boolean](

View File

@ -1,22 +1,35 @@
package lila.socket
import scredis._
import redis.clients.jedis._
import scala.concurrent.Future
private final class RemoteSocket(
redis: Redis,
makeRedis: () => Jedis,
chanIn: String,
chanOut: String,
lifecycle: play.api.inject.ApplicationLifecycle,
bus: lila.common.Bus
) {
redis.subscriber.subscribe(chanIn) {
case message @ PubSubMessage.Message(channel, messageBytes) =>
val msg = message.readAs[String]()
send(s"Received $msg")
case PubSubMessage.Subscribe(channel, subscribedChannelsCount) => logger.info(
s"Subscribed to redis channel $channel"
)
private val clientIn = makeRedis()
private val clientOut = makeRedis()
Future {
clientIn.subscribe(new JedisPubSub() {
override def onMessage(channel: String, message: String): Unit = {
println(s"Received $message")
send(s"Received $message")
}
}, chanIn)
}
def send(data: String) = redis.publish(chanOut, data)
lifecycle.addStopHook { () =>
logger.info("Stopping the Redis clients...")
Future {
clientIn.quit()
clientOut.quit()
}
}
def send(data: String) = clientOut.publish(chanOut, data)
}

View File

@ -43,7 +43,7 @@ object Dependencies {
val specs2Scalaz = "org.specs2" %% "specs2-scalaz" % "4.0.2" % "test"
val scalaUri = "io.lemonlabs" %% "scala-uri" % "1.2.0"
val scalatags = "com.lihaoyi" %% "scalatags" % "0.6.7"
val scredis = "com.github.scredis" %% "scredis" % "2.2.4"
val jedis = "redis.clients" % "jedis" % "3.0.1"
object reactivemongo {
val version = "0.12.4"