tournament hub

This commit is contained in:
Thibault Duplessis 2012-09-10 01:14:57 +02:00
parent 4f0c71c41d
commit 5bc81b027d
18 changed files with 310 additions and 37 deletions

View file

@ -12,9 +12,10 @@ import play.api.libs.iteratee._
object Tournament extends LilaController {
val repo = env.tournament.repo
val forms = env.tournament.forms
val api = env.tournament.api
private def repo = env.tournament.repo
private def forms = env.tournament.forms
private def api = env.tournament.api
private def socket = env.tournament.socket
val home = Open { implicit ctx
IOk(repo.created map { tournaments
@ -46,13 +47,8 @@ object Tournament extends LilaController {
}
}
def websocket(fullId: String) = WebSocket.async[JsValue] { req
def websocket(id: String) = WebSocket.async[JsValue] { req
implicit val ctx = reqToCtx(req)
throw new Exception("oups")
//socket.joinPlayer(
//fullId,
//getInt("version"),
//get("uid"),
//ctx.me).unsafePerformIO
socket.join(id, getInt("version"), get("uid"), ctx.me).unsafePerformIO
}
}

View file

@ -98,7 +98,10 @@ final class CoreEnv private (application: Application, val settings: Settings) {
indexGame = search.indexGame)
lazy val tournament = new lila.tournament.TournamentEnv(
app = app,
settings = settings,
getUser = user.userRepo.byId,
flood = security.flood,
mongodb = mongodb.apply _)
lazy val analyse = new lila.analyse.AnalyseEnv(

View file

@ -41,6 +41,10 @@ final class Settings(config: Config) {
val RoundCollectionWatcherRoom = getString("round.collection.watcher_room")
val TournamentCollectionTournament = getString("tournament.collection.tournament")
val TournamentCollectionRoom = getString("tournament.collection.room")
val TournamentMessageLifetime = millis("tournament.message.lifetime")
val TournamentUidTimeout = millis("tournament.uid.timeout")
val TournamentHubTimeout = millis("tournament.hub.timeout")
val AnalyseCachedNbTtl = millis("analyse.cached.nb.ttl")
@ -126,6 +130,7 @@ final class Settings(config: Config) {
val ActorRoundHubMaster = "game_hub_master"
val ActorLobbyHub = "lobby_hub"
val ActorMonitorHub = "monitor_hub"
val ActorTournamentHubMaster = "tournament_hub_master"
val ModlogCollectionModlog = getString("modlog.collection.modlog")

View file

@ -9,8 +9,8 @@ import play.api.libs.iteratee._
final class Hub(
messenger: Messenger,
history: History,
timeout: Int) extends HubActor[Member](timeout) {
val history: History,
timeout: Int) extends HubActor[Member](timeout) with Historical[Member] {
def receiveSpecific = {
@ -62,19 +62,6 @@ final class Hub(
case ChangeFeatured(oldId, newId) notifyFeatured(oldId, newId)
}
def notifyMember(t: String, data: JsValue)(member: Member) {
val msg = JsObject(Seq("t" -> JsString(t), "d" -> data))
member.channel push msg
}
def notifyVersion(t: String, data: JsValue) {
val vmsg = history += makeMessage(t, data)
members.values.foreach(_.channel push vmsg)
}
def notifyVersion(t: String, data: Seq[(String, JsValue)]) {
notifyVersion(t, JsObject(data))
}
def notifyFeatured(oldId: Option[String], newId: String) {
val msg = makeMessage("featured", JsObject(Seq(
"oldId" -> oldId.fold(JsString(_), JsNull),

View file

@ -13,6 +13,7 @@ import timeline.Entry
import user.{ User, UserRepo }
import game.{ DbGame, Featured }
import round.{ Socket RoundSocket, Messenger RoundMessenger }
import socket.History
import security.Flood
import core.Settings
@ -36,7 +37,7 @@ final class LobbyEnv(
messageRepo = messageRepo,
userRepo = userRepo)
lazy val hub = Akka.system.actorOf(Props(new Hub(
lazy val hub: ActorRef = Akka.system.actorOf(Props(new Hub(
messenger = messenger,
history = history,
timeout = SiteUidTimeout

View file

@ -5,6 +5,7 @@ import timeline.Entry
import game.{ DbGame, Featured }
import forum.PostView
import controllers.routes
import socket.History
import play.api.mvc.Call
import play.api.libs.concurrent.Akka

View file

@ -4,7 +4,6 @@ package round
import socket._
import chess.{ Color, White, Black }
import game.PovRef
import user.User
import akka.actor._
import akka.util.duration._
@ -40,8 +39,7 @@ final class Hub(
}
}
case Ack(uid) => withMember(uid) { _.channel push ackEvent }
case Ack(uid) withMember(uid) { _.channel push ackEvent }
case Broom {
broom()

View file

@ -0,0 +1,17 @@
package lila
package socket
import play.api.libs.json._
trait Historical[M <: SocketMember] { self: HubActor[M] =>
val history: History
def notifyVersion(t: String, data: JsValue) {
val vmsg = history += makeMessage(t, data)
members.values.foreach(_.channel push vmsg)
}
def notifyVersion(t: String, data: Seq[(String, JsValue)]) {
notifyVersion(t, JsObject(data))
}
}

View file

@ -1,5 +1,5 @@
package lila
package lobby
package socket
import scala.math.max
import play.api.libs.json._

View file

@ -45,6 +45,10 @@ abstract class HubActor[M <: SocketMember](uidTimeout: Int) extends Actor {
members.values.foreach(_.channel push msg)
}
def notifyMember(t: String, data: JsValue)(member: M) {
member.channel push makeMessage(t, data)
}
def makeMessage(t: String, data: JsValue) =
JsObject(Seq("t" -> JsString(t), "d" -> data))

52
app/tournament/Hub.scala Normal file
View file

@ -0,0 +1,52 @@
package lila
package tournament
import socket._
import akka.actor._
import akka.util.duration._
import play.api.libs.json._
import play.api.libs.iteratee._
import play.api.Play.current
import scalaz.effects._
final class Hub(
tournamentId: String,
val history: History,
uidTimeout: Int,
hubTimeout: Int) extends HubActor[Member](uidTimeout) with Historical[Member] {
var lastPingTime = nowMillis
def receiveSpecific = {
case PingVersion(uid, v) {
ping(uid)
lastPingTime = nowMillis
withMember(uid) { m
history.since(v).fold(_ foreach m.channel.push, resync(m))
}
}
case Broom {
broom()
if (lastPingTime < (nowMillis - hubTimeout)) {
context.parent ! CloseTournament(tournamentId)
}
}
case GetTournamentVersion(_) sender ! history.version
case Join(uid, user, version) {
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = Member(channel, user)
addMember(uid, member)
sender ! Connected(enumerator, member)
}
case Close {
members.values foreach { _.channel.end() }
self ! PoisonPill
}
}
}

View file

@ -0,0 +1,68 @@
package lila
package tournament
import socket.{ History, Broom, Close, GetNbMembers, GetUsernames, NbMembers, SendTo }
import akka.actor._
import akka.actor.ReceiveTimeout
import akka.util.duration._
import akka.util.Timeout
import akka.pattern.{ ask, pipe }
import akka.dispatch.{ Future, Promise }
import play.api.libs.json._
import play.api.libs.concurrent._
import play.api.Play.current
final class HubMaster(
makeHistory: () History,
uidTimeout: Int,
hubTimeout: Int) extends Actor {
implicit val timeout = Timeout(1 second)
implicit val executor = Akka.system.dispatcher
var hubs = Map.empty[String, ActorRef]
def receive = {
case Broom hubs.values foreach (_ ! Broom)
case msg @ SendTo(_, _) hubs.values foreach (_ ! msg)
case GetHub(id: String) sender ! {
(hubs get id) | {
mkHub(id) ~ { h hubs = hubs + (id -> h) }
}
}
case msg @ GetTournamentVersion(id) (hubs get id).fold(
_ ? msg pipeTo sender,
sender ! 0
)
case CloseTournament(id) hubs get id foreach { hub
hub ! Close
hubs = hubs - id
}
case GetNbHubs sender ! hubs.size
case GetNbMembers Future.traverse(hubs.values) { hub
(hub ? GetNbMembers).mapTo[Int]
} map (_.sum) pipeTo sender
case GetUsernames Future.traverse(hubs.values) { hub
(hub ? GetUsernames).mapTo[Iterable[String]]
} map (_.flatten) pipeTo sender
case msg @ NbMembers(_) hubs.values foreach (_ ! msg)
}
private def mkHub(tournamentId: String): ActorRef =
context.actorOf(Props(new Hub(
tournamentId = tournamentId,
history = makeHistory(),
uidTimeout = uidTimeout,
hubTimeout = hubTimeout
)), name = "tournament_hub_" + tournamentId)
}

View file

@ -3,22 +3,22 @@ package tournament
import scalaz.effects._
import user.{ User, UserRepo }
import user.User
final class Messenger(
roomRepo: RoomRepo,
userRepo: UserRepo) extends core.Room {
getUser: String => IO[Option[User]]) extends core.Room {
import Room._
def init(tour: Created): IO[List[Message]] = for {
userOption userRepo byId tour.data.createdBy
userOption getUser(tour.data.createdBy)
username = userOption.fold(_.username, tour.data.createdBy)
message systemMessage(tour, "%s creates the tournament" format username)
} yield List(message)
def userMessage(tour: Tournament, text: String, username: String): IO[Valid[Message]] = for {
userOption userRepo byId username
userOption getUser(username)
message = for {
user userOption filter (_.canChat) toValid "This user cannot chat"
msg createMessage(user, text)

View file

@ -0,0 +1,73 @@
package lila
package tournament
import akka.actor._
import akka.pattern.ask
import akka.util.duration._
import akka.util.Timeout
import play.api.libs.json._
import play.api.libs.iteratee._
import play.api.libs.concurrent._
import play.api.Play.current
import scalaz.effects._
import user.User
import socket.{ PingVersion, Quit, Resync }
import socket.Util.connectionFail
import security.Flood
import implicits.RichJs._
final class Socket(
getTournament: String IO[Option[Tournament]],
hubMaster: ActorRef,
messenger: Messenger,
flood: Flood) {
private val timeoutDuration = 1 second
implicit private val timeout = Timeout(timeoutDuration)
def join(
tournamentId: String,
version: Option[Int],
uid: Option[String],
user: Option[User]): IO[SocketPromise] =
getTournament(tournamentId) map { tourOption
((tourOption |@| uid |@| version) apply {
(tour: Tournament, uid: String, version: Int)
(for {
hub hubMaster ? GetHub(tournamentId) mapTo manifest[ActorRef]
socket hub ? Join(
uid = uid,
user = user,
version = version
) map {
case Connected(enumerator, member) (
Iteratee.foreach[JsValue](
controller(hub, uid, member, tournamentId)
) mapDone { _
hub ! Quit(uid)
},
enumerator)
}
} yield socket).asPromise: SocketPromise
}) | connectionFail
}
private def controller(
hub: ActorRef,
uid: String,
member: Member,
tournamentId: String): JsValue Unit =
(e: JsValue) e str "t" match {
case Some("p") e int "v" foreach { v
hub ! PingVersion(uid, v)
}
case Some("talk") for {
username member.username
data e obj "d"
txt data str "txt"
if flood.allowMessage(uid, txt)
} hub ! Talk(username, txt)
case _
}
}

View file

@ -2,15 +2,26 @@ package lila
package tournament
import game.{ GameRepo, DbGame }
import user.User
import core.Settings
import security.Flood
import socket.History
import com.traackr.scalastic.elasticsearch
import com.mongodb.casbah.MongoCollection
import scalaz.effects._
import akka.actor.Props
import play.api.libs.concurrent._
import play.api.Application
final class TournamentEnv(
app: Application,
settings: Settings,
getUser: String => IO[Option[User]],
flood: Flood,
mongodb: String MongoCollection) {
implicit val ctx = app
import settings._
lazy val forms = new DataForm
@ -20,4 +31,24 @@ final class TournamentEnv(
lazy val api = new TournamentApi(
repo = repo)
lazy val roomRepo = new RoomRepo(
collection = mongodb(TournamentCollectionRoom)
)
lazy val messenger = new Messenger(roomRepo, getUser)
lazy val socket = new Socket(
getTournament = repo.byId,
hubMaster = hubMaster,
messenger = messenger,
flood = flood)
lazy val history = () new History(timeout = TournamentMessageLifetime)
lazy val hubMaster = Akka.system.actorOf(Props(new HubMaster(
makeHistory = history,
uidTimeout = TournamentUidTimeout,
hubTimeout = TournamentHubTimeout
)), name = ActorTournamentHubMaster)
}

View file

@ -0,0 +1,37 @@
package lila
package tournament
import socket.SocketMember
import user.User
import akka.actor.ActorRef
import scalaz.effects.IO
case class Member(
channel: JsChannel,
username: Option[String],
muted: Boolean) extends SocketMember {
def canChat = !muted
}
object Member {
def apply(channel: JsChannel, user: Option[User]): Member = Member(
channel = channel,
username = user map (_.username),
muted = user.fold(_.muted, false))
}
case class Join(
uid: String,
user: Option[User],
version: Int)
case class Connected(
enumerator: JsEnumerator,
member: Member)
case class Talk(u: String, txt: String)
case class GetTournamentVersion(tournamentId: String)
case class CloseTournament(tournamentId: String)
case class GetHub(tournamentId: String)
case object HubTimeout
case object GetNbHubs

View file

@ -43,7 +43,7 @@ GET /tournament controllers.Tournament.home
GET /tournament/new controllers.Tournament.form
POST /tournament/new controllers.Tournament.create
GET /tournament/$id<[\w\-]{8}> controllers.Tournament.show(id: String)
GET /tournament/$fullId<[\w\-]{12}>/socket controllers.Tournament.websocket(fullId: String)
GET /tournament/$id<[\w\-]{12}>/socket controllers.Tournament.websocket(id: String)
# Analyse
GET /analyse/$gameId<[\w\-]{8}> controllers.Analyse.replay(gameId: String, color: String = "white")

View file

@ -9,7 +9,7 @@ $(function() {
var $chat = $("div.lichess_chat");
var $chatToggle = $chat.find('input.toggle_chat');
var chatExists = $chat.length > 0;
var websocketUrl = $wrap.data("socket-url");
var socketUrl = $wrap.data("socket-url");
if (chatExists) {
var $form = $chat.find('form');
@ -56,7 +56,7 @@ $(function() {
return html;
}
lichess.socket = new $.websocket(lichess.socketUrl + socketUrl, lichess_preload.version, $.extend(true, lichess.socketDefaults, {
lichess.socket = new $.websocket(lichess.socketUrl + socketUrl, lichess_data.version, $.extend(true, lichess.socketDefaults, {
events: {
talk: function(e) { if (chatExists && e.txt) addToChat(buildChatMessage(e.txt, e.u)); }
},