review socket architecture

This commit is contained in:
Thibault Duplessis 2013-04-08 08:49:42 -03:00
parent 2a7139c7fe
commit 12cd6c4359
19 changed files with 166 additions and 172 deletions

View file

@ -36,7 +36,7 @@ final class PaginatorBuilder(maxPerPage: Int) {
.skip(offset)
.limit(length)
.cursor[JsObject].toList map2 { (obj: JsObject)
obj.get[String]("g") flatMap { gameId
obj str "g" flatMap { gameId
obj.get[DateTime]("d") map { (gameId, _) }
}
} map (_.flatten)

View file

@ -134,6 +134,7 @@ trait WithPlay { self: PackageObject ⇒
import akka.util.Timeout
import scala.concurrent.duration._
implicit val short = seconds(1)
implicit val large = seconds(5)
def apply(duration: FiniteDuration) = Timeout(duration)

View file

@ -9,47 +9,41 @@ trait PimpedJson {
implicit final class LilaPimpedJsObject(js: JsObject) {
def str(key: String): Option[String] =
js.value get key flatMap (_.asOpt[String])
(js \ key).asOpt[String]
def int(key: String): Option[Int] =
js.value get key flatMap (_.asOpt[Int])
(js \ key).asOpt[Int]
def long(key: String): Option[Long] =
js.value get key flatMap (_.asOpt[Long])
(js \ key).asOpt[Long]
def obj(key: String): Option[JsObject] =
js.value get key flatMap (_.asOpt[JsObject])
(js \ key).asOpt[JsObject]
def get[T: Reads](field: String): Option[T] = (js \ field) match {
case JsUndefined(_) none
case value value.asOpt[T]
}
def get[A : Reads](key: String): Option[A] =
(js \ key).asOpt[A]
}
implicit final class LilaPimpedJsValue(js: JsValue) {
def str(key: String): Option[String] = for {
obj js.asOpt[JsObject]
value obj.value get key
str value.asOpt[String]
} yield str
def str(key: String): Option[String] =
js.asOpt[JsObject] flatMap { obj
(obj \ key).asOpt[String]
}
def int(key: String): Option[Int] = for {
obj js.asOpt[JsObject]
value obj.value get key
int value.asOpt[Int]
} yield int
def int(key: String): Option[Int] =
js.asOpt[JsObject] flatMap { obj
(obj \ key).asOpt[Int]
}
def long(key: String): Option[Long] = for {
obj js.asOpt[JsObject]
value obj.value get key
int value.asOpt[Long]
} yield int
def long(key: String): Option[Long] =
js.asOpt[JsObject] flatMap { obj
(obj \ key).asOpt[Long]
}
def obj(key: String): Option[JsObject] = for {
obj js.asOpt[JsObject]
value obj.value get key
obj2 value.asOpt[JsObject]
} yield obj2
def obj(key: String): Option[JsObject] =
js.asOpt[JsObject] flatMap { obj
(obj \ key).asOpt[JsObject]
}
}
}

View file

@ -1,34 +0,0 @@
package lila.memo
import scalaz.effects._
import scala.collection.JavaConversions._
import scala.concurrent.duration.Duration
final class BooleanExpiryMemo(timeout: Duration) {
protected val cache = Builder.expiry[String, Boolean](timeout)
def get(key: String): Boolean = Option {
cache getIfPresent key
} getOrElse false
def put(key: String): IO[Unit] = io {
cache.put(key, true)
}
def putUnsafe(key: String): Unit = {
cache.put(key, true)
}
def putAll(keys: Iterable[String]): IO[Unit] = io {
keys.toList.distinct foreach { cache.put(_, true) }
}
def remove(key: String): IO[Unit] = io {
cache invalidate key
}
def keys: Iterable[String] = cache.asMap.keys
def preciseCount = keys.toList.size
}

View file

@ -0,0 +1,23 @@
package lila.memo
import scala.collection.JavaConversions._
import scala.concurrent.duration.Duration
final class ExpireSetMemo(timeout: Duration) {
protected val cache = Builder.expiry[String, Boolean](timeout)
def get(key: String): Boolean = ~Option(cache getIfPresent key)
def put(key: String) { cache.put(key, true) }
def putAll(keys: Iterable[String]) {
keys.toList.distinct foreach { cache.put(_, true) }
}
def remove(key: String) { cache invalidate key }
def keys: Iterable[String] = cache.asMap.keys
def count = keys.toList.size
}

View file

@ -13,14 +13,14 @@ final class Env(
hub: lila.hub.Env) {
private val ActorName = config getString "actor.name"
private val HubName = config getString "actor.name"
private val Timeout = config duration "timeout"
private val WebsocketUidTtl = config duration "socket.uid.ttl"
private val SocketName = config getString "socket.name"
private val RpsIntervall = config duration "rps.interval"
private val SocketUidTtl = config duration "socket.uid.ttl"
lazy val socket = system.actorOf(
Props(new Hub(timeout = WebsocketUidTtl)), name = HubName)
Props(new Socket(timeout = SocketUidTtl)), name = SocketName)
// lazy val socket = new Socket(hub = hub)
lazy val socketHandler = new SocketHandler(socket)
lazy val reporting = system.actorOf(
Props(new Reporting(
@ -31,10 +31,10 @@ final class Env(
)), name = ActorName)
// requests per second
val rpsProvider = new RpsProvider(Timeout)
val rpsProvider = new RpsProvider(RpsIntervall)
// moves per second
val mpsProvider = new RpsProvider(Timeout)
val mpsProvider = new RpsProvider(RpsIntervall)
}
object Env {

View file

@ -1,22 +0,0 @@
package lila.monitor
import lila.socket._
import akka.actor._
import play.api.libs.json._
import play.api.libs.iteratee._
import scala.concurrent.duration.Duration
private[monitor] final class Hub(timeout: Duration) extends HubActor[Member](timeout) {
def receiveSpecific = {
case Join(uid) {
val (enumerator, channel) = Concurrent.broadcast[JsValue]
addMember(uid, Member(channel))
sender ! Connected(enumerator, channel)
}
case MonitorData(data) notifyAll("monitor", data mkString ";")
}
}

View file

@ -1,6 +1,6 @@
package lila.monitor
import lila.socket.GetNbMembers
import lila.socket.actorApi.GetNbMembers
// import round.GetNbHubs
import akka.actor._

View file

@ -1,33 +1,24 @@
package lila.monitor
import lila.socket._
import lila.socket.actorApi.Connected
import akka.actor._
import akka.pattern.ask
import scala.concurrent.duration._
import play.api.libs.json._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
import play.api.libs.concurrent.Execution.Implicits._
import scala.concurrent.duration.Duration
import lila.socket.{ Ping, Quit }
import lila.common.PimpedJson._
private[monitor] final class Socket(timeout: Duration) extends SocketActor[Member](timeout) {
private[monitor] final class Socket(hub: ActorRef) {
def receiveSpecific = {
implicit val timeout = makeTimeout(300 millis)
case Join(uid) {
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = Member(channel)
addMember(uid, member)
sender ! Connected(enumerator, member)
}
// TODO
// def join(uid: String): SocketFuture = {
// val promise: Option[SocketFuture] = (hub ? Join(uid)) map {
// case Connected(enumerator, channel)
// val iteratee = Iteratee.foreach[JsValue] { e
// e str "t" match {
// case Some("p") hub ! Ping(uid)
// case _
// }
// } mapDone { _
// hub ! Quit(uid)
// }
// (iteratee, enumerator)
// }
// promise | connectionFail
case MonitorData(data) notifyAll("monitor", data mkString ";")
}
}

View file

@ -0,0 +1,14 @@
package lila.monitor
import akka.actor._
import lila.socket._
import lila.socket.actorApi.{ Ping, Quit }
private[monitor] final class SocketHandler(hub: ActorRef) {
def join(uid: String): Fu[JsSocketHandler] =
Handler(hub, Join(uid), Quit(uid)) {
case ("p", _) hub ! Ping(uid)
}
}

View file

@ -1,7 +1,5 @@
package lila.monitor
import lila.socket.SocketMember
case object GetNbGames
case object GetNbMoves
case object GetStatus
@ -9,12 +7,5 @@ case object GetMonitorData
case object Update
case class Member(channel: JsChannel) extends SocketMember {
val username = none
}
case class Join(uid: String)
case class Connected(
enumerator: JsEnumerator,
channel: JsChannel)
case class MonitorData(data: List[String])

View file

@ -0,0 +1,39 @@
package lila.socket
import akka.actor.ActorRef
import akka.pattern.ask
import play.api.libs.json._
import play.api.libs.iteratee.{ Iteratee, Enumerator }
import play.api.libs.concurrent.Execution.Implicits._
import play.api.libs.concurrent.Execution.Implicits._
import actorApi._
import makeTimeout.large
import lila.common.PimpedJson._
object Handler {
type Controller = PartialFunction[(String, JsObject), Unit]
def apply(socket: ActorRef, join: Any, quit: Any)(controller: Controller): Fu[JsSocketHandler] = (socket ? join map {
case Connected(enumerator, member)
(makeIteratee(controller) mapDone { _ socket ! quit }) -> enumerator
}) recover {
case t: Throwable errorHandler(t.getMessage)
}
def errorHandler(err: String): JsSocketHandler =
Iteratee.skipToEof[JsValue] ->
Enumerator[JsValue](Json.obj(
"error" -> "Invalid socket request: %s".format(err)
)).andThen(Enumerator.eof)
private def makeIteratee(controller: Controller) =
Iteratee.foreach[JsValue] { jsv
jsv.asOpt[JsObject] foreach { obj
obj str "t" foreach { t
~controller.lift(t -> obj)
}
}
}
}

View file

@ -2,7 +2,7 @@ package lila.socket
import play.api.libs.json._
trait Historical[M <: SocketMember] { self: HubActor[M]
trait Historical[M <: Member] { self: SocketActor[M]
val history: History

View file

@ -1,6 +1,6 @@
package lila.socket
import scala.math.max
import actorApi._
import scala.concurrent.duration.Duration
import play.api.libs.json._

View file

@ -0,0 +1,23 @@
package lila.socket
trait Member {
val channel: JsChannel
val userId: Option[String]
private val privateLiveGames = collection.mutable.Set[String]()
def liveGames: Set[String] = privateLiveGames.toSet
def addLiveGames(ids: List[String]) { ids foreach privateLiveGames.+= }
}
object Member {
def apply(c: JsChannel): Member = apply(c, none)
def apply(c: JsChannel, uid: Option[String]): Member = new Member {
val channel = c
val userId = uid
}
}

View file

@ -1,16 +1,17 @@
package lila.socket
import lila.memo.BooleanExpiryMemo
import actorApi._
import lila.memo.ExpireSetMemo
import akka.actor._
import play.api.libs.json._
import scala.util.Random
import scala.concurrent.duration.Duration
abstract class HubActor[M <: SocketMember](uidTimeout: Duration) extends Actor {
abstract class SocketActor[M <: Member](uidTimeout: Duration) extends Actor {
var members = Map.empty[String, M]
val aliveUids = new BooleanExpiryMemo(uidTimeout)
val aliveUids = new ExpireSetMemo(uidTimeout)
var pong = makePong(0)
// to be defined in subclassing actor
@ -30,7 +31,7 @@ abstract class HubActor[M <: SocketMember](uidTimeout: Duration) extends Actor {
case NbMembers(nb) pong = makePong(nb)
case GetUsernames sender ! usernames
case GetUserIds sender ! userIds
case LiveGames(uid, gameIds) registerLiveGames(uid, gameIds)
@ -110,9 +111,7 @@ abstract class HubActor[M <: SocketMember](uidTimeout: Duration) extends Actor {
setAlive(uid)
}
def setAlive(uid: String) {
aliveUids putUnsafe uid
}
def setAlive(uid: String) { aliveUids put uid }
def uids = members.keys
@ -124,7 +123,7 @@ abstract class HubActor[M <: SocketMember](uidTimeout: Duration) extends Actor {
def membersByUserIds(userIds: Set[String]): Iterable[M] =
members.values filter (member member.userId zmap userIds.contains)
def usernames: Iterable[String] = members.values.map(_.username).flatten
def userIds: Iterable[String] = members.values.map(_.userId).flatten
def notifyFen(gameId: String, fen: String, lastMove: Option[String]) {
val msg = makeMessage("fen", JsObject(Seq(

View file

@ -1,10 +1,8 @@
package lila.socket
import play.api.libs.json._
import play.api.libs.concurrent.Promise
import play.api.libs.iteratee.{ Iteratee, Enumerator }
import play.api.libs.iteratee.Concurrent.Channel
import play.api.Play.current
import play.api.libs.concurrent.Execution.Implicits._
import scalaz.{ Zero, Zeros }
@ -12,13 +10,5 @@ trait WithSocket extends Zeros {
type JsChannel = Channel[JsValue]
type JsEnumerator = Enumerator[JsValue]
type SocketHandler = (Iteratee[JsValue, _], JsEnumerator)
implicit val LilaSocketHandlerZero = new Zero[SocketHandler] {
val zero: SocketHandler =
Iteratee.skipToEof[JsValue] ->
Enumerator[JsValue](Json.obj("error" -> "Invalid request"))
.andThen(Enumerator.eof)
}
type JsSocketHandler = (Iteratee[JsValue, _], JsEnumerator)
}

View file

@ -1,24 +1,13 @@
package lila.socket
package actorApi
import play.api.libs.json.JsObject
import scala.collection.mutable
trait SocketMember {
val channel: JsChannel
val username: Option[String]
lazy val userId: Option[String] = username map (_.toLowerCase)
private val privateLiveGames = mutable.Set[String]()
def liveGames = privateLiveGames
def addLiveGames(ids: List[String]) {
ids foreach liveGames.+=
}
}
case class Connected[A <: Member](
enumerator: JsEnumerator,
member: A)
case object Close
case object GetUsernames
case object GetUserIds
case object GetNbMembers
case class NbMembers(nb: Int)
case class Ping(uid: String)

View file

@ -1,26 +1,22 @@
package lila.user
import lila.memo.BooleanExpiryMemo
import lila.memo.ExpireSetMemo
import scala.concurrent.duration._
final class UsernameMemo(ttl: Duration) {
private val internal = new BooleanExpiryMemo(ttl)
private val internal = new ExpireSetMemo(ttl)
def normalize(name: String) = name.toLowerCase
private def normalize(name: String) = name.toLowerCase
def get(key: String): Boolean = internal get normalize(key)
def put(key: String) {
internal put normalize(key)
}
def put(key: String) { internal put normalize(key) }
def putAll(keys: Iterable[String]) {
internal putAll (keys map normalize)
}
def putAll(keys: Iterable[String]) { internal putAll (keys map normalize) }
def keys = internal.keys
def preciseCount = internal.preciseCount
def count = internal.count
}