move monitoring to lila-ws
parent
b85628cf5a
commit
4e94b55945
|
@ -332,18 +332,6 @@ object mon {
|
|||
val users = rec("socket.remote.sets.users")
|
||||
val games = rec("socket.remote.sets.games")
|
||||
}
|
||||
val connections = rec("socket.remote.connections")
|
||||
object redis {
|
||||
val publishTime = rec("socket.remote.redis.publish_time")
|
||||
object in {
|
||||
def channel(channel: String) = inc(s"socket.remote.redis.in.channel.$channel")
|
||||
def path(channel: String, path: String) = inc(s"socket.remote.redis.in.path.$channel:$path")
|
||||
}
|
||||
object out {
|
||||
def channel(channel: String) = inc(s"socket.remote.redis.out.channel.$channel")
|
||||
def path(channel: String, path: String) = inc(s"socket.remote.redis.out.path.$channel:$path")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
object trouper {
|
||||
|
|
|
@ -107,7 +107,6 @@ final class RemoteSocket(
|
|||
|
||||
private def tick(nbConn: Int): Unit = {
|
||||
setNb(nbConn)
|
||||
mon.connections(nbConn)
|
||||
mon.sets.games(watchedGameIds.size)
|
||||
mon.sets.users(connectedUserIds.get.size)
|
||||
}
|
||||
|
@ -121,16 +120,11 @@ final class RemoteSocket(
|
|||
def subscribe(channel: Channel, reader: In.Reader)(handler: Handler): Future[Unit] = {
|
||||
val conn = redisClient.connectPubSub()
|
||||
conn.addListener(new pubsub.RedisPubSubAdapter[String, String] {
|
||||
override def message(_channel: String, message: String): Unit = {
|
||||
val raw = RawMsg(message)
|
||||
mon.redis.in.channel(channel)()
|
||||
mon.redis.in.path(channel, raw.path)()
|
||||
// println(message, s"in:$channel")
|
||||
reader(raw) collect handler match {
|
||||
override def message(_channel: String, message: String): Unit =
|
||||
reader(RawMsg(message)) collect handler match {
|
||||
case Some(_) => // processed
|
||||
case None => logger.warn(s"Unhandled $channel $message")
|
||||
}
|
||||
}
|
||||
})
|
||||
val subPromise = Promise[Unit]
|
||||
conn.async.subscribe(channel).thenRun {
|
||||
|
@ -153,28 +147,19 @@ object RemoteSocket {
|
|||
|
||||
final class Sender(conn: StatefulRedisPubSubConnection[String, String], channel: Channel) {
|
||||
|
||||
private val mon = lila.mon.socket.remote
|
||||
|
||||
def apply(msg: String): Unit = {
|
||||
val chrono = Chronometer.start
|
||||
conn.async.publish(channel, msg).thenRun {
|
||||
new Runnable { def run = chrono.mon(_.socket.remote.redis.publishTime) }
|
||||
}
|
||||
mon.redis.out.channel(channel)()
|
||||
mon.redis.out.path(channel, msg.takeWhile(' ' !=))()
|
||||
}
|
||||
def apply(msg: String): Unit = conn.async.publish(channel, msg)
|
||||
}
|
||||
|
||||
object Protocol {
|
||||
|
||||
case class RawMsg(path: Path, args: Args) {
|
||||
final class RawMsg(val path: Path, val args: Args) {
|
||||
def get(nb: Int)(f: PartialFunction[Array[String], Option[In]]): Option[In] =
|
||||
f.applyOrElse(args.split(" ", nb), (_: Array[String]) => None)
|
||||
def all = args split ' '
|
||||
}
|
||||
def RawMsg(msg: String): RawMsg = {
|
||||
val parts = msg.split(" ", 2)
|
||||
RawMsg(parts(0), ~parts.lift(1))
|
||||
new RawMsg(parts(0), ~parts.lift(1))
|
||||
}
|
||||
|
||||
trait In
|
||||
|
|
Loading…
Reference in New Issue