Start implementing the bidirectional ping
parent
0307bb01b6
commit
5f78ca97c5
|
@ -12,6 +12,10 @@ final class Cron(env: SystemEnv) {
|
|||
implicit val current = env.app
|
||||
implicit val timeout = Timeout(500 millis)
|
||||
|
||||
message(5 seconds) {
|
||||
env.siteHub -> socket.Cleanup
|
||||
}
|
||||
|
||||
message(2 seconds) {
|
||||
env.reporting -> report.Update(env)
|
||||
}
|
||||
|
|
|
@ -16,6 +16,10 @@ abstract class BooleanExpiryMemo(timeout: Int) {
|
|||
cache.put(key, true)
|
||||
}
|
||||
|
||||
def putUnsafe(key: String): Unit = {
|
||||
cache.put(key, true)
|
||||
}
|
||||
|
||||
def putAll(keys: Iterable[String]): IO[Unit] = io {
|
||||
keys map { cache.put(_, true) }
|
||||
}
|
||||
|
|
|
@ -10,22 +10,34 @@ import play.api.libs.iteratee._
|
|||
final class Hub extends Actor {
|
||||
|
||||
private var members = Map.empty[String, Member]
|
||||
private val pinger = new Pinger
|
||||
|
||||
def receive = {
|
||||
|
||||
case Ping(uid) ⇒ {
|
||||
members get uid foreach { _.channel push Util.pong }
|
||||
}
|
||||
|
||||
case Cleanup ⇒ members.keys filterNot pingers.get foreach { uid ⇒
|
||||
self ! Quit(uid)
|
||||
}
|
||||
|
||||
case WithUsernames(op) ⇒ op(usernames).unsafePerformIO
|
||||
|
||||
case Join(uid, username) ⇒ {
|
||||
val channel = new LilaEnumerator[JsValue](Nil)
|
||||
members = members + (uid -> Member(channel, username))
|
||||
sender ! Connected(channel)
|
||||
pingers putUnsafe uid
|
||||
}
|
||||
|
||||
case NbMembers ⇒ notifyAll("n", JsNumber(members.size))
|
||||
|
||||
case GetNbMembers ⇒ sender ! members.size
|
||||
|
||||
case Quit(uid) ⇒ { members = members - uid }
|
||||
case Quit(uid) ⇒ {
|
||||
members = members - uid
|
||||
}
|
||||
}
|
||||
|
||||
private def notifyAll(t: String, data: JsValue) {
|
||||
|
|
|
@ -11,7 +11,7 @@ import play.api.libs.concurrent._
|
|||
import scalaz.effects._
|
||||
|
||||
import RichJs._
|
||||
import socket.Util
|
||||
import socket.{ Util, Ping }
|
||||
|
||||
final class Socket(hub: ActorRef) {
|
||||
|
||||
|
@ -26,9 +26,7 @@ final class Socket(hub: ActorRef) {
|
|||
case Connected(channel) ⇒
|
||||
val iteratee = Iteratee.foreach[JsValue] { e ⇒
|
||||
e str "t" match {
|
||||
case Some("p") ⇒ {
|
||||
channel push Util.pong
|
||||
}
|
||||
case Some("p") ⇒ hub ! Ping(uid)
|
||||
case _ ⇒
|
||||
}
|
||||
Unit
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
package lila
|
||||
package socket
|
||||
|
||||
// keys = uid
|
||||
final class PingMemo(timeout: Int) extends BooleanExpiryMemo(timeout)
|
|
@ -0,0 +1,11 @@
|
|||
package lila
|
||||
package socket
|
||||
|
||||
final class Pinger {
|
||||
|
||||
private val uids = new memo.PingMemo(20 * 1000)
|
||||
|
||||
def apply(uid: String) {
|
||||
uids putUnsafe uid
|
||||
}
|
||||
}
|
|
@ -3,3 +3,5 @@ package socket
|
|||
|
||||
case object Close
|
||||
case object GetNbMembers
|
||||
case class Ping(uid: String)
|
||||
case object Cleanup
|
||||
|
|
Loading…
Reference in New Issue