start hub/socket refactoring

This commit is contained in:
Thibault Duplessis 2013-04-07 18:25:52 -03:00
parent 48be1445fa
commit 6f7d31227f
25 changed files with 304 additions and 244 deletions

View file

@ -101,10 +101,6 @@ trait WithPlay { self: PackageObject ⇒
import play.api.Play.current
import play.api.libs.concurrent.Execution.Implicits._
type JsChannel = Channel[JsValue]
type JsEnumerator = Enumerator[JsValue]
type SocketFuture = Fu[(Iteratee[JsValue, _], JsEnumerator)]
// Typeclasses
implicit val LilaFutureFunctor = new Functor[Fu] {
def fmap[A, B](r: Fu[A], f: A B) = r map f

View file

@ -9,7 +9,7 @@ import com.typesafe.config.Config
final class Env(
config: Config,
db: lila.db.Env,
sockets: ActorRef,
socketHub: ActorRef,
captcher: ActorRef,
indexer: ActorRef,
system: ActorSystem) {
@ -61,7 +61,7 @@ object Env {
lazy val current = "[boot] forum" describes new Env(
config = lila.common.PlayApp loadConfig "forum",
db = lila.db.Env.current,
sockets = hub.sockets,
socketHub = hub.socket.hub,
captcher = hub.actor.captcher,
indexer = hub.actor.forumIndexer,
system = lila.common.PlayApp.system)

View file

@ -6,26 +6,32 @@ import lila.common.PimpedConfig._
final class Env(config: Config, system: ActorSystem) {
private val SocketsTimeout = config duration "sockets.timeout"
private val SocketsName = config getString "sockets.name"
private val SocketHubName = config getString "socket.hub.name"
private val SocketHubTimeout = config duration "socket.hub.timeout"
object actor {
val lobby = actorFor(config getString "actor.lobby.name")
val renderer = actorFor(config getString "actor.renderer.name")
val captcher = actorFor(config getString "actor.captcher.name")
val forumIndexer = actorFor(config getString "actor.forum_indexer.name")
val messenger = actorFor(config getString "actor.messenger.name")
val router = actorFor(config getString "actor.router.name")
val forum = actorFor(config getString "actor.forum.name")
val teamIndexer = actorFor(config getString "actor.team_indexer.name")
val ai = actorFor(config getString "actor.ai.name")
val lobby = actorFor("actor.lobby")
val renderer = actorFor("actor.renderer")
val captcher = actorFor("actor.captcher")
val forumIndexer = actorFor("actor.forum_indexer")
val messenger = actorFor("actor.messenger")
val router = actorFor("actor.router")
val forum = actorFor("actor.forum")
val teamIndexer = actorFor("actor.team_indexer")
val ai = actorFor("actor.ai")
val monitor = actorFor("actor.monitor")
}
val sockets = system.actorOf(Props(new Broadcast(List(
actor.lobby
), SocketsTimeout)), name = SocketsName)
object socket {
val lobby = actorFor("socket.lobby")
val monitor = actorFor("socket.monitor")
val hub = system.actorOf(Props(new Broadcast(List(
socket.lobby
), SocketHubTimeout)), name = SocketHubName)
}
private def actorFor(name: String) = system.actorFor("/user/" + name)
private def actorFor(name: String) =
system.actorFor("/user/" + config.getString(name))
}
object Env {

View file

@ -1,4 +1,4 @@
package lila.app.monitor;
package lila.monitor;
import com.sun.management.OperatingSystemMXBean;
import java.lang.management.*;

View file

@ -0,0 +1,46 @@
package lila.monitor
import lila.common.PimpedConfig._
import akka.actor._
import com.typesafe.config.Config
import play.api.Play.current
import play.api.libs.concurrent.Akka.system
import play.api.libs.concurrent.Execution.Implicits._
final class Env(
config: Config,
db: lila.db.Env,
hub: lila.hub.Env) {
private val ActorName = config getString "actor.name"
private val HubName = config getString "actor.name"
private val Timeout = config duration "timeout"
private val WebsocketUidTtl = config duration "socket.uid.ttl"
lazy val socket = system.actorOf(
Props(new Hub(timeout = WebsocketUidTtl)), name = HubName)
// lazy val socket = new Socket(hub = hub)
lazy val reporting = system.actorOf(
Props(new Reporting(
rpsProvider = rpsProvider,
mpsProvider = mpsProvider,
db = db,
hub = hub
)), name = ActorName)
// requests per second
val rpsProvider = new RpsProvider(Timeout)
// moves per second
val mpsProvider = new RpsProvider(Timeout)
}
object Env {
lazy val current = "[boot] monitor" describes new Env(
config = lila.common.PlayApp loadConfig "monitor",
db = lila.db.Env.current,
hub = lila.hub.Env.current)
}

View file

@ -1,13 +1,13 @@
package lila.app
package monitor
package lila.monitor
import socket._
import lila.socket._
import akka.actor._
import play.api.libs.json._
import play.api.libs.iteratee._
import scala.concurrent.duration.Duration
final class Hub(timeout: Int) extends HubActor[Member](timeout) {
private[monitor] final class Hub(timeout: Duration) extends HubActor[Member](timeout) {
def receiveSpecific = {
@ -17,6 +17,6 @@ final class Hub(timeout: Int) extends HubActor[Member](timeout) {
sender ! Connected(enumerator, channel)
}
case MonitorData(data) notifyAll("monitor", JsString(data mkString ";"))
case MonitorData(data) notifyAll("monitor", data mkString ";")
}
}

View file

@ -0,0 +1,52 @@
package lila.monitor
import reactivemongo.core.commands.Status
import reactivemongo.api.DB
import reactivemongo.bson._
import play.modules.reactivemongo.Implicits._
import play.api.libs.json.JsObject
import play.api.libs.concurrent.Execution.Implicits._
private[monitor] case class MongoStatus(
memory: Int = 0,
connection: Int = 0,
query: Int = 0,
totalTime: Long = 0,
lockTime: Long = 0,
qps: Int = 0,
lock: Double = 0d)
private[monitor] object MongoStatus {
def default = new MongoStatus()
def apply(db: DB)(prev: MongoStatus): Fu[MongoStatus] =
db.command(Status) map { status
def get[A](field: String)(implicit reader: BSONReader[_ <: BSONValue, A]): Option[A] =
status.get(field) flatMap (_.seeAsOpt[A])
val query = ~get[Int]("network.numRequests")
val locks = ~get[JsObject]("locks")
val lockNumbers = for {
dbName List("lichess", ".")
statName List("timeLockedMicros", "timeAcquiringMicros")
opName List("r", "w", "R", "W")
} yield (locks \ dbName \ statName \ opName).asOpt[Long]
val lockTime = lockNumbers.flatten.map(_ / 1000).sum
val totalTime = ~get[Long]("uptimeMillis")
new MongoStatus(
memory = ~get[Int]("mem.resident"),
connection = ~get[Int]("connections.current"),
query = query,
totalTime = totalTime,
lockTime = lockTime,
qps = query - prev.query,
lock = (totalTime - prev.totalTime == 0).fold(
0d,
(lockTime - prev.lockTime).toDouble / (totalTime - prev.totalTime) * 100
)
)
}
}

View file

@ -1,26 +1,21 @@
package lila.app
package monitor
package lila.monitor
import socket.GetNbMembers
import round.GetNbHubs
import lila.socket.GetNbMembers
// import round.GetNbHubs
import akka.actor._
import akka.pattern.{ ask, pipe }
import scala.concurrent.duration._
import akka.util.{ Timeout }
import scala.concurrent.{ Future, Promise }
import play.api.libs.concurrent._
import play.api.Play.current
import scala.io.Source
import scala.util.{ Success, Failure }
import java.lang.management.ManagementFactory
import com.mongodb.casbah.MongoDB
final class Reporting(
private[monitor] final class Reporting(
rpsProvider: RpsProvider,
mpsProvider: RpsProvider,
mongodb: MongoDB,
hub: ActorRef) extends Actor {
db: lila.db.Env,
hub: lila.hub.Env) extends Actor {
case class SiteSocket(nbMembers: Int)
case class LobbySocket(nbMembers: Int)
@ -46,9 +41,8 @@ final class Reporting(
val osStats = ManagementFactory.getOperatingSystemMXBean
val threadStats = ManagementFactory.getThreadMXBean
val memoryStats = ManagementFactory.getMemoryMXBean
val cpuStats = new CPU()
implicit val executor = Akka.system.dispatcher
implicit val timeout = Timeout(100 millis)
val cpuStats = new CPU
implicit val timeout = makeTimeout(100 millis)
def receive = {
@ -62,42 +56,42 @@ final class Reporting(
case GetMonitorData sender ! monitorData
case Update(env) {
val before = nowMillis
Future.sequence(List(
(env.site.hub ? GetNbMembers).mapTo[Int],
(env.lobby.hub ? GetNbMembers).mapTo[Int],
(env.round.hubMaster ? GetNbHubs).mapTo[Int],
(env.round.hubMaster ? GetNbMembers).mapTo[Int]
)) onComplete {
case Failure(e) println("Reporting: " + e.getMessage)
case Success(List(
siteMembers,
lobbyMembers,
gameHubs,
gameMembers)) {
latency = (nowMillis - before).toInt
site = SiteSocket(siteMembers)
lobby = LobbySocket(lobbyMembers)
game = GameSocket(gameHubs, gameMembers)
mongoStatus = MongoStatus(mongodb)(mongoStatus)
nbGames = env.game.cached.nbGames
loadAvg = osStats.getSystemLoadAverage.toFloat
nbThreads = threadStats.getThreadCount
memory = memoryStats.getHeapMemoryUsage.getUsed / 1024 / 1024
rps = rpsProvider.rps
mps = mpsProvider.rps
cpu = ((cpuStats.getCpuUsage() * 1000).round / 10.0).toInt
clientAi = env.ai.clientPing
hub ! MonitorData(monitorData)
}
}
}
// TODO
// case Update {
// val before = nowMillis
// List(
// (env.site.hub ? GetNbMembers).mapTo[Int],
// (hub.actor.lobby ? GetNbMembers).mapTo[Int],
// (env.round.hubMaster ? GetNbHubs).mapTo[Int],
// (env.round.hubMaster ? GetNbMembers).mapTo[Int]
// ).sequence onComplete {
// case Failure(e) println("Reporting: " + e.getMessage)
// case Success(List(
// siteMembers,
// lobbyMembers,
// gameHubs,
// gameMembers)) {
// latency = (nowMillis - before).toInt
// site = SiteSocket(siteMembers)
// lobby = LobbySocket(lobbyMembers)
// game = GameSocket(gameHubs, gameMembers)
// mongoStatus = MongoStatus(mongodb)(mongoStatus)
// nbGames = env.game.cached.nbGames
// loadAvg = osStats.getSystemLoadAverage.toFloat
// nbThreads = threadStats.getThreadCount
// memory = memoryStats.getHeapMemoryUsage.getUsed / 1024 / 1024
// rps = rpsProvider.rps
// mps = mpsProvider.rps
// cpu = ((cpuStats.getCpuUsage() * 1000).round / 10.0).toInt
// clientAi = env.ai.clientPing
// hub ! MonitorData(monitorData)
// }
// }
}
private def display() {
private def display {
val data = Formatter.dataLine(List(
val data = dataLine(List(
"site" -> site.nbMembers,
"lobby" -> lobby.nbMembers,
"game" -> game.nbMembers,
@ -146,18 +140,15 @@ final class Reporting(
private def allMembers = site.nbMembers + lobby.nbMembers + game.nbMembers
object Formatter {
private def dataLine(data: List[(String, Any)]) = new {
def dataLine(data: List[(String, Any)]) = new {
def header = data map (_._1) mkString " "
def header = data map (_._1) mkString " "
def line = data map {
case (name, value) {
val s = value.toString
List.fill(name.size - s.size)(" ").mkString + s + " "
}
} mkString
}
def line = data map {
case (name, value) {
val s = value.toString
List.fill(name.size - s.size)(" ").mkString + s + " "
}
} mkString
}
}

View file

@ -0,0 +1,29 @@
package lila.monitor
import scala.concurrent.stm.Ref
import scala.concurrent.duration.FiniteDuration
private[monitor] final class RpsProvider(timeout: FiniteDuration) {
private val counter = Ref((0, (0, nowMillis)))
private val tms = timeout.toMillis
def countRequest = {
val current = nowMillis
counter.single.transform {
case (precedent, (count, millis)) if current > millis + tms (0, (1, current))
case (precedent, (count, millis)) if current > millis + (tms / 2) (count, (1, current))
case (precedent, (count, millis)) (precedent, (count + 1, millis))
}
}
def rps = math.round {
val current = nowMillis
val (precedent, (count, millis)) = counter.single()
val since = current - millis
if (since <= tms) ((count + precedent) * 1000) / (since + tms / 2)
else 0
} toInt
}

View file

@ -0,0 +1,33 @@
package lila.monitor
import akka.actor._
import akka.pattern.ask
import scala.concurrent.duration._
import play.api.libs.json._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
import play.api.libs.concurrent.Execution.Implicits._
import lila.socket.{ Ping, Quit, WithSocket }
import lila.common.PimpedJson._
private[monitor] final class Socket(hub: ActorRef) with WithSocket {
implicit val timeout = makeTimeout(300 millis)
// TODO
// def join(uid: String): SocketFuture = {
// val promise: Option[SocketFuture] = (hub ? Join(uid)) map {
// case Connected(enumerator, channel)
// val iteratee = Iteratee.foreach[JsValue] { e
// e str "t" match {
// case Some("p") hub ! Ping(uid)
// case _
// }
// } mapDone { _
// hub ! Quit(uid)
// }
// (iteratee, enumerator)
// }
// promise | connectionFail
}

View file

@ -1,15 +1,13 @@
package lila.app
package monitor
package lila.monitor
import core.CoreEnv
import socket.SocketMember
import lila.socket.SocketMember
case object GetNbGames
case object GetNbMoves
case object GetStatus
case object GetMonitorData
case class Update(env: CoreEnv)
case object Update
case class Member(channel: JsChannel) extends SocketMember {
val username = none

View file

@ -0,0 +1,3 @@
package lila
package object monitor extends PackageObject with WithPlay

View file

@ -1,47 +0,0 @@
package lila.app
package monitor
import com.mongodb.casbah.MongoDB
import com.mongodb.casbah.Imports._
case class MongoStatus(
memory: Int = 0,
connection: Int = 0,
query: Int = 0,
totalTime: Long = 0,
lockTime: Long = 0,
qps: Int = 0,
lock: Double = 0d) {
}
case object MongoStatus {
def default = new MongoStatus()
def apply(mongodb: MongoDB)(prev: MongoStatus): MongoStatus = {
val status: MongoDBObject = mongodb.command("serverStatus")
val query = status.expand[Int]("network.numRequests") | 0
val lockNumbers = for {
locks status.getAs[DBObject]("locks").toList
dbName List("lichess", ".")
dbLocks locks.getAs[DBObject](dbName).toList
statName List("timeLockedMicros", "timeAcquiringMicros")
statLocks dbLocks.getAs[DBObject](statName).toList
opName List("r", "w", "R", "W")
} yield statLocks.getAs[Long](opName) map (_ / 1000)
val lockTime = lockNumbers.flatten.sum
val totalTime = status.getAs[Long]("uptimeMillis") | 0
new MongoStatus(
memory = status.expand[Int]("mem.resident") | 0,
connection = status.expand[Int]("connections.current") | 0,
query = query,
totalTime = totalTime,
lockTime = lockTime,
qps = query - prev.query,
lock = (totalTime - prev.totalTime == 0).fold(
0d,
(lockTime - prev.lockTime).toDouble / (totalTime - prev.totalTime) * 100
)
)
}
}

View file

@ -1,30 +0,0 @@
package lila.app
package monitor
import play.api.libs.concurrent.Promise
import java.util.concurrent.TimeUnit
import scala.concurrent.stm._
import scala.math.round
final class RpsProvider(timeout: Int) {
private val counter = Ref((0, (0, nowMillis)))
def countRequest() = {
val current = nowMillis
counter.single.transform {
case (precedent, (count, millis)) if current > millis + timeout (0, (1, current))
case (precedent, (count, millis)) if current > millis + (timeout / 2) (count, (1, current))
case (precedent, (count, millis)) (precedent, (count + 1, millis))
}
}
def rps = round {
val current = nowMillis
val (precedent, (count, millis)) = counter.single()
val since = current - millis
if (since <= timeout) ((count + precedent) * 1000) / (since + timeout / 2)
else 0
} toInt
}

View file

@ -1,39 +0,0 @@
package lila.app
package monitor
import akka.actor._
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout
import play.api.libs.json._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
import scalaz.effects._
import play.api.libs.concurrent.Execution.Implicits._
import implicits.RichJs._
import socket.{ Util, Ping, Quit }
final class Socket(hub: ActorRef) {
implicit val timeout = Timeout(300 millis)
def join(
uidOption: Option[String]): SocketFuture = {
val promise: Option[SocketFuture] = for {
uid uidOption
} yield (hub ? Join(uid)) map {
case Connected(enumerator, channel)
val iteratee = Iteratee.foreach[JsValue] { e
e str "t" match {
case Some("p") hub ! Ping(uid)
case _
}
} mapDone { _
hub ! Quit(uid)
}
(iteratee, enumerator)
}
promise | Util.connectionFail
}
}

View file

@ -26,10 +26,10 @@ object ApplicationBuild extends Build {
)
lazy val modules = Seq(
chess, common, http, db, user, wiki, hub, websocket,
chess, common, http, db, user, wiki, hub, socket,
message, notification, i18n, game, bookmark, search,
gameSearch, timeline, forum, forumSearch, team, teamSearch,
ai, analyse, mod)
ai, analyse, mod, monitor)
lazy val moduleRefs = modules map projectToRef
lazy val moduleCPDeps = moduleRefs map classpathDependency
@ -147,7 +147,11 @@ object ApplicationBuild extends Build {
libraryDependencies ++= provided(playApi)
)
lazy val websocket = project("websocket", Seq(common, hub, memo)).settings(
lazy val monitor = project("monitor", Seq(common, hub, socket, db)).settings(
libraryDependencies ++= provided(playApi, reactivemongo, playReactivemongo)
)
lazy val socket = project("socket", Seq(common, hub, memo)).settings(
libraryDependencies ++= provided(playApi)
)

14
socket/src/main/Env.scala Normal file
View file

@ -0,0 +1,14 @@
package lila.socket
import com.typesafe.config.Config
import lila.common.PimpedConfig._
final class Env(config: Config) {
}
object Env {
lazy val current = "[boot] socket" describes new Env(
config = lila.common.PlayApp loadConfig "socket")
}

View file

@ -1,4 +1,4 @@
package lila.websocket
package lila.socket
import play.api.libs.json._

View file

@ -1,4 +1,4 @@
package lila.websocket
package lila.socket
import scala.math.max
import scala.concurrent.duration.Duration

View file

@ -1,4 +1,4 @@
package lila.websocket
package lila.socket
import lila.memo.BooleanExpiryMemo
@ -45,7 +45,7 @@ abstract class HubActor[M <: SocketMember](uidTimeout: Duration) extends Actor {
def receive = receiveSpecific orElse receiveGeneric
def notifyAll(t: String, data: JsValue) {
def notifyAll[A : Writes](t: String, data: A) {
val msg = makeMessage(t, data)
members.values.foreach(_.channel push msg)
}
@ -54,10 +54,10 @@ abstract class HubActor[M <: SocketMember](uidTimeout: Duration) extends Actor {
member.channel push makeMessage(t, data)
}
def makeMessage(t: String, data: JsValue) =
JsObject(Seq("t" -> JsString(t), "d" -> data))
def makeMessage[A : Writes](t: String, data: A) =
Json.obj("t" -> t, "d" -> data)
def makePong(nb: Int) = makeMessage("n", JsNumber(nb))
def makePong(nb: Int) = makeMessage("n", nb)
def ping(uid: String) {
setAlive(uid)

View file

@ -0,0 +1,24 @@
package lila.socket
import play.api.libs.json._
import play.api.libs.concurrent.Promise
import play.api.libs.iteratee.{ Iteratee, Enumerator }
import play.api.libs.iteratee.Concurrent.Channel
import play.api.Play.current
import play.api.libs.concurrent.Execution.Implicits._
import scalaz.{ Zero, Zeros }
trait WithSocket extends Zeros {
type JsChannel = Channel[JsValue]
type JsEnumerator = Enumerator[JsValue]
type SocketHandler = (Iteratee[JsValue, _], JsEnumerator)
implicit val LilaSocketHandlerZero = new Zero[SocketHandler] {
val zero: SocketHandler =
Iteratee.skipToEof[JsValue] ->
Enumerator[JsValue](Json.obj("error" -> "Invalid request"))
.andThen(Enumerator.eof)
}
}

View file

@ -1,4 +1,4 @@
package lila.websocket
package lila.socket
import play.api.libs.json.JsObject
import scala.collection.mutable

View file

@ -0,0 +1,7 @@
package lila
import play.api.libs.iteratee._
import play.api.libs.json._
package object socket
extends PackageObject with WithPlay with socket.WithSocket

View file

@ -1,14 +0,0 @@
package lila.websocket
import com.typesafe.config.Config
import lila.common.PimpedConfig._
final class Env(config: Config) {
}
object Env {
lazy val current = "[boot] websocket" describes new Env(
config = lila.common.PlayApp loadConfig "websocket")
}

View file

@ -1,13 +0,0 @@
package lila
import play.api.libs.iteratee._
import play.api.libs.json._
package object websocket extends PackageObject with WithPlay {
val connectionFail: SocketFuture = fuccess {
Done[JsValue, Unit]((), Input.EOF) -> (Enumerator[JsValue](
JsObject(Seq("error" -> JsString("Invalid request")))
) andThen Enumerator.enumInput(Input.EOF))
}
}