Add reporting actor

This commit is contained in:
Thibault Duplessis 2012-04-16 13:27:32 +02:00
parent 44ce0b5bbc
commit 1dd772b5b3
11 changed files with 106 additions and 16 deletions

View file

@ -5,8 +5,9 @@ import play.api.libs.concurrent.Akka
import akka.actor.ActorRef
import akka.util.duration._
import akka.util.{ Duration, Timeout }
import scalaz.effects.IO
import scalaz.effects._
import socket.GetNbMembers
import site.{ NbMembers, WithUsernames }
import lobby.WithHooks
import RichDuration._
@ -16,19 +17,23 @@ final class Cron(env: SystemEnv)(implicit app: Application) {
implicit val timeout = Timeout(200 millis)
implicit val executor = Akka.system.dispatcher
message(2 seconds) {
env.reporting -> report.Update(env)
}
message(1 second) {
env.lobbyHub -> WithHooks(env.hookMemo.putAll)
}
message(1 seconds) {
message(2 seconds) {
env.siteHub -> NbMembers
}
io(2 seconds) {
effect(2 seconds) {
env.lobbyFisherman.cleanup
}
io(10 seconds) {
effect(10 seconds) {
env.hookRepo.cleanupOld
}
@ -36,19 +41,19 @@ final class Cron(env: SystemEnv)(implicit app: Application) {
env.siteHub -> WithUsernames(env.userRepo.updateOnlineUsernames)
}
io(2 hours) {
effect(2 hours) {
env.gameRepo.cleanupUnplayed
}
io(1 hour) {
effect(1 hour) {
env.gameFinishCommand.apply
}
io(10 seconds) {
effect(1 minute) {
env.remoteAi.diagnose
}
def io(freq: Duration)(op: IO[Unit]) {
def effect(freq: Duration)(op: IO[_]) {
Akka.system.scheduler.schedule(freq, freq.randomize()) { op.unsafePerformIO }
}

View file

@ -16,7 +16,11 @@ import command._
final class SystemEnv(config: Config) {
lazy val siteHub = Akka.system.actorOf(Props(new site.Hub), name = "site_hub")
lazy val reporting = Akka.system.actorOf(
Props(new report.Reporting), name = "reporting")
lazy val siteHub = Akka.system.actorOf(
Props(new site.Hub), name = "site_hub")
lazy val siteSocket = new site.Socket(
hub = siteHub)

View file

@ -0,0 +1,29 @@
package lila
package controllers
import play.api.mvc._
import play.api.libs.concurrent._
import scalaz.effects._
import akka.pattern.ask
import akka.util.duration._
import akka.util.{ Duration, Timeout }
import socket.GetNbMembers
import report.GetStatus
object ReportC extends LilaController {
val reporting = env.reporting
implicit val timeout = Timeout(200 millis)
def status = Action {
Async {
(env.reporting ? GetStatus).mapTo[String].asPromise map { Ok(_) }
}
}
def nbPlayers = Action {
Async {
(env.reporting ? GetNbMembers).mapTo[Int].asPromise map { Ok(_) }
}
}
}

View file

@ -144,9 +144,7 @@ class GameRepo(collection: MongoCollection)
).toList.map(decode).flatten
}
val countPlaying: IO[Int] = io {
count("updatedAt" $gt (DateTime.now - 15.seconds)).toInt
}
val countAll: IO[Int] = io { count().toInt }
def ensureIndexes: IO[Unit] = io {
collection.underlying |> { coll

View file

@ -34,6 +34,8 @@ final class HubMemo(makeHistory: () ⇒ History) {
cache invalidate gameId
}
def count = cache.size
private def compute(gameId: String): ActorRef = {
println("create actor game " + gameId)
Akka.system.actorOf(Props(new Hub(gameId, makeHistory())))

View file

@ -0,0 +1,38 @@
package lila
package report
import socket.GetNbMembers
import akka.actor._
import akka.pattern.{ ask, pipe }
import akka.util.duration._
import akka.util.{ Duration, Timeout }
final class Reporting extends Actor {
private var nbMembers = 0
private var nbGames = 0
private var nbPlaying = 0
implicit val timeout = Timeout(200 millis)
def receive = {
case GetNbMembers sender ! nbMembers
case GetNbGames sender ! nbGames
case GetStatus sender ! status
case Update(env) {
(env.siteHub ? GetNbMembers).mapTo[Int] onSuccess {
case nb nbMembers = nb
}
nbGames = env.gameRepo.countAll.unsafePerformIO
nbPlaying = env.gameHubMemo.count.toInt
}
}
private def status = List(
nbMembers,
nbGames,
nbPlaying) mkString " "
}

7
app/report/model.scala Normal file
View file

@ -0,0 +1,7 @@
package lila
package report
case object GetNbGames
case object GetStatus
case class Update(env: SystemEnv)

View file

@ -13,7 +13,7 @@ final class Hub extends Actor {
def receive = {
case WithUsernames(op) op(usernames).unsafePerformIO
case WithUsernames(op) op(usernames).unsafePerformIO
case Join(uid, username) {
val channel = new LilaEnumerator[JsValue](Nil)
@ -21,9 +21,11 @@ final class Hub extends Actor {
sender ! Connected(channel)
}
case NbMembers notifyAll("nbp", JsNumber(members.size))
case NbMembers notifyAll("nbp", JsNumber(members.size))
case Quit(uid) { members = members - uid }
case GetNbMembers sender ! members.size
case Quit(uid) { members = members - uid }
}
private def notifyMember(t: String, data: JsValue)(member: Member) {

View file

@ -2,3 +2,4 @@ package lila
package socket
case object Close
case object GetNbMembers

View file

@ -109,5 +109,5 @@ case class PausedClock(
object Clock {
val httpDelay = 0.5f
val httpDelay = 0.4f
}

View file

@ -29,5 +29,9 @@ POST /api/lobby/join/:gameId/:color lila.controllers.LobbyC.join(gameId: String
GET /api/lobby/preload lila.controllers.LobbyC.preload
POST /api/lobby/create/:hookOwnerId lila.controllers.LobbyC.create(hookOwnerId: String)
# Reporting API
GET /nb-players lila.controllers.ReportC.nbPlayers
GET /status lila.controllers.ReportC.status
# Useless, but play2 needs it
GET /assets/*file controllers.Assets.at(path="/public", file)