challenge remote sockets
parent
a6dac218ce
commit
dcb6f9c43c
|
@ -203,14 +203,4 @@ object Challenge extends LilaController {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
def websocket(id: String, apiVersion: Int) = SocketOption[JsValue] { implicit ctx =>
|
||||
env.api byId id flatMap {
|
||||
_ ?? { c =>
|
||||
getSocketSri("sri") ?? { sri =>
|
||||
env.socketHandler.join(id, sri, ctx.userId, isMine(c), getSocketVersion, apiVersion) map some
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ object bits {
|
|||
jsTag("challenge.js", defer = true),
|
||||
embedJsUnsafe(s"""lichess=window.lichess||{};customWs=true;lichess_challenge = ${
|
||||
safeJsonValue(Json.obj(
|
||||
"socketUrl" -> routes.Challenge.websocket(c.id, apiVersion.value).url,
|
||||
"socketUrl" -> s"/challenge/${c.id}/socket/v$apiVersion",
|
||||
"xhrUrl" -> routes.Challenge.show(c.id).url,
|
||||
"owner" -> owner,
|
||||
"data" -> json
|
||||
|
|
|
@ -265,8 +265,8 @@ lazy val shutup = module("shutup", Seq(common, db, hub, game, relation)).setting
|
|||
libraryDependencies ++= provided(play.api, reactivemongo.driver)
|
||||
)
|
||||
|
||||
lazy val challenge = module("challenge", Seq(common, db, hub, setup, game, relation, pref)).settings(
|
||||
libraryDependencies ++= provided(play.api, scalatags, reactivemongo.driver)
|
||||
lazy val challenge = module("challenge", Seq(common, db, hub, setup, game, relation, pref, socket, room)).settings(
|
||||
libraryDependencies ++= provided(play.api, scalatags, reactivemongo.driver, lettuce)
|
||||
)
|
||||
|
||||
lazy val study = module("study", Seq(
|
||||
|
|
|
@ -495,11 +495,6 @@ setup {
|
|||
challenge {
|
||||
collection.challenge = challenge
|
||||
max_per_user = 20
|
||||
socket {
|
||||
timeout = 1 minute
|
||||
}
|
||||
history.message.ttl = 30 seconds
|
||||
sri.timeout = 7 seconds
|
||||
max_playing = ${setup.max_playing}
|
||||
}
|
||||
evalCache {
|
||||
|
|
|
@ -316,7 +316,6 @@ POST /challenge/$id<\w{8}>/accept controllers.Challenge.accept(id: String)
|
|||
POST /challenge/$id<\w{8}>/decline controllers.Challenge.decline(id: String)
|
||||
POST /challenge/$id<\w{8}>/cancel controllers.Challenge.cancel(id: String)
|
||||
POST /challenge/$id<\w{8}>/to-friend controllers.Challenge.toFriend(id: String)
|
||||
GET /challenge/$id<\w{8}>/socket/v:apiVersion controllers.Challenge.websocket(id: String, apiVersion: Int)
|
||||
POST /challenge/rematch-of/$id<\w{8}> controllers.Challenge.rematchOf(id: String)
|
||||
|
||||
# Notify
|
||||
|
|
|
@ -15,7 +15,6 @@ final class ChallengeApi(
|
|||
jsonView: JsonView,
|
||||
gameCache: lila.game.Cached,
|
||||
maxPlaying: Int,
|
||||
socketMap: SocketMap,
|
||||
asyncCache: lila.memo.AsyncCache.Builder,
|
||||
lilaBus: lila.common.Bus
|
||||
) {
|
||||
|
@ -113,7 +112,7 @@ final class ChallengeApi(
|
|||
}
|
||||
|
||||
private def socketReload(id: Challenge.ID): Unit =
|
||||
socketMap.tell(id, ChallengeSocket.Reload)
|
||||
socket foreach (_ reload id)
|
||||
|
||||
private def notify(userId: User.ID): Funit = for {
|
||||
all <- allFor(userId)
|
||||
|
@ -124,4 +123,8 @@ final class ChallengeApi(
|
|||
SendTo(userId, lila.socket.Socket.makeMessage("challenges", jsonView(all, lang))),
|
||||
'socketUsers
|
||||
)
|
||||
|
||||
// work around circular dependency
|
||||
private var socket: Option[ChallengeSocket] = null
|
||||
private[challenge] def registerSocket(s: ChallengeSocket) = { socket = s.some }
|
||||
}
|
||||
|
|
|
@ -1,66 +1,51 @@
|
|||
package lila.challenge
|
||||
|
||||
import akka.actor._
|
||||
import play.api.libs.iteratee._
|
||||
import play.api.libs.json._
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.Promise
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import lila.socket.actorApi.{ Connected => _, _ }
|
||||
import lila.socket.SocketTrouper
|
||||
import lila.socket.Socket.{ Sri, GetVersion, SocketVersion }
|
||||
import lila.socket.{ History, Historical, DirectSocketMember }
|
||||
import lila.game.Pov
|
||||
import lila.room.RoomSocket.{ Protocol => RP, _ }
|
||||
import lila.socket.RemoteSocket.{ Protocol => P, _ }
|
||||
import lila.socket.Socket.makeMessage
|
||||
|
||||
private final class ChallengeSocket(
|
||||
system: ActorSystem,
|
||||
challengeId: String,
|
||||
protected val history: History[Unit],
|
||||
getChallenge: Challenge.ID => Fu[Option[Challenge]],
|
||||
sriTtl: Duration,
|
||||
keepMeAlive: () => Unit
|
||||
) extends SocketTrouper[ChallengeSocket.Member](system, sriTtl) with Historical[ChallengeSocket.Member, Unit] {
|
||||
api: ChallengeApi,
|
||||
remoteSocketApi: lila.socket.RemoteSocket,
|
||||
bus: lila.common.Bus
|
||||
) {
|
||||
|
||||
def receiveSpecific = {
|
||||
import ChallengeSocket._
|
||||
|
||||
case ChallengeSocket.Reload =>
|
||||
getChallenge(challengeId) foreach {
|
||||
_ foreach { challenge =>
|
||||
notifyVersion("reload", JsNull, ())
|
||||
}
|
||||
def reload(challengeId: Challenge.ID): Unit =
|
||||
rooms.tell(challengeId, NotifyVersion("reload", JsNull))
|
||||
|
||||
lazy val rooms = makeRoomMap(send, bus)
|
||||
|
||||
private lazy val send: String => Unit = remoteSocketApi.makeSender("chal-out").apply _
|
||||
|
||||
private lazy val challengeHandler: Handler = {
|
||||
case Protocol.In.OwnerPing(roomId) => api ping roomId.value
|
||||
}
|
||||
|
||||
remoteSocketApi.subscribe("chal-in", Protocol.In.reader)(
|
||||
challengeHandler orElse minRoomHandler(rooms) orElse remoteSocketApi.baseHandler
|
||||
)
|
||||
|
||||
api registerSocket this
|
||||
}
|
||||
|
||||
object ChallengeSocket {
|
||||
|
||||
object Protocol {
|
||||
|
||||
object In {
|
||||
|
||||
case class OwnerPing(roomId: RoomId) extends P.In
|
||||
|
||||
val reader: P.In.Reader = raw => raw.path match {
|
||||
case "challenge/ping" => OwnerPing(RoomId(raw.args)).some
|
||||
case _ => RP.In.reader(raw)
|
||||
}
|
||||
|
||||
case GetVersion(promise) => promise success history.version
|
||||
|
||||
case ChallengeSocket.Join(sri, userId, owner, version, promise) =>
|
||||
val (enumerator, channel) = Concurrent.broadcast[JsValue]
|
||||
val member = ChallengeSocket.Member(channel, userId, owner)
|
||||
addMember(sri, member)
|
||||
promise success ChallengeSocket.Connected(
|
||||
prependEventsSince(version, enumerator, member),
|
||||
member
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def broom: Unit = {
|
||||
super.broom
|
||||
if (members.nonEmpty) keepMeAlive()
|
||||
}
|
||||
|
||||
protected def shouldSkipMessageFor(message: Message, member: ChallengeSocket.Member) = false
|
||||
}
|
||||
|
||||
private object ChallengeSocket {
|
||||
|
||||
case class Member(
|
||||
channel: JsChannel,
|
||||
userId: Option[String],
|
||||
owner: Boolean
|
||||
) extends DirectSocketMember {
|
||||
val troll = false
|
||||
}
|
||||
|
||||
case class Join(sri: Sri, userId: Option[String], owner: Boolean, version: Option[SocketVersion], promise: Promise[Connected])
|
||||
case class Connected(enumerator: JsEnumerator, member: Member)
|
||||
|
||||
case object Reload
|
||||
}
|
||||
|
|
|
@ -20,42 +20,19 @@ final class Env(
|
|||
asyncCache: lila.memo.AsyncCache.Builder,
|
||||
getPref: User => Fu[lila.pref.Pref],
|
||||
getRelation: (User, User) => Fu[Option[lila.relation.Relation]],
|
||||
remoteSocketApi: lila.socket.RemoteSocket,
|
||||
scheduler: lila.common.Scheduler
|
||||
) {
|
||||
|
||||
private val settings = new {
|
||||
val CollectionChallenge = config getString "collection.challenge"
|
||||
val MaxPerUser = config getInt "max_per_user"
|
||||
val HistoryMessageTtl = config duration "history.message.ttl"
|
||||
val SriTimeout = config duration "sri.timeout"
|
||||
val SocketTimeout = config duration "socket.timeout"
|
||||
val MaxPlaying = config getInt "max_playing"
|
||||
}
|
||||
import settings._
|
||||
|
||||
private val socketMap: SocketMap = lila.socket.SocketMap[ChallengeSocket](
|
||||
system = system,
|
||||
mkTrouper = (challengeId: String) => new ChallengeSocket(
|
||||
system = system,
|
||||
challengeId = challengeId,
|
||||
history = new lila.socket.History(ttl = HistoryMessageTtl),
|
||||
getChallenge = repo.byId,
|
||||
sriTtl = SriTimeout,
|
||||
keepMeAlive = () => socketMap touch challengeId
|
||||
),
|
||||
accessTimeout = SocketTimeout,
|
||||
monitoringName = "challenge.socketMap",
|
||||
broomFrequency = 3677 millis
|
||||
)
|
||||
|
||||
def version(challengeId: Challenge.ID): Fu[SocketVersion] =
|
||||
socketMap.askIfPresentOrZero[SocketVersion](challengeId)(GetVersion)
|
||||
|
||||
lazy val socketHandler = new SocketHandler(
|
||||
hub = hub,
|
||||
socketMap = socketMap,
|
||||
pingChallenge = api.ping
|
||||
)
|
||||
socket.rooms.ask[SocketVersion](challengeId)(GetVersion)
|
||||
|
||||
lazy val api = new ChallengeApi(
|
||||
repo = repo,
|
||||
|
@ -63,11 +40,16 @@ final class Env(
|
|||
jsonView = jsonView,
|
||||
gameCache = gameCache,
|
||||
maxPlaying = MaxPlaying,
|
||||
socketMap = socketMap,
|
||||
asyncCache = asyncCache,
|
||||
lilaBus = system.lilaBus
|
||||
)
|
||||
|
||||
private lazy val socket = new ChallengeSocket(
|
||||
api = api,
|
||||
remoteSocketApi = remoteSocketApi,
|
||||
bus = system.lilaBus
|
||||
)
|
||||
|
||||
lazy val granter = new ChallengeGranter(
|
||||
getPref = getPref,
|
||||
getRelation = getRelation
|
||||
|
@ -99,6 +81,7 @@ object Env {
|
|||
asyncCache = lila.memo.Env.current.asyncCache,
|
||||
getPref = lila.pref.Env.current.api.getPref,
|
||||
getRelation = lila.relation.Env.current.api.fetchRelation,
|
||||
remoteSocketApi = lila.socket.Env.current.remoteSocket,
|
||||
scheduler = lila.common.PlayApp.scheduler
|
||||
)
|
||||
}
|
||||
|
|
|
@ -1,50 +0,0 @@
|
|||
package lila.challenge
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.ask
|
||||
|
||||
import lila.hub.actorApi.map._
|
||||
import lila.socket.actorApi.{ Connected => _, _ }
|
||||
import lila.socket.Handler
|
||||
import lila.socket.Socket.{ Sri, SocketVersion }
|
||||
import lila.user.User
|
||||
import lila.common.ApiVersion
|
||||
|
||||
private[challenge] final class SocketHandler(
|
||||
hub: lila.hub.Env,
|
||||
socketMap: SocketMap,
|
||||
pingChallenge: Challenge.ID => Funit
|
||||
) {
|
||||
|
||||
import ChallengeSocket._
|
||||
|
||||
def join(
|
||||
challengeId: Challenge.ID,
|
||||
sri: Sri,
|
||||
userId: Option[User.ID],
|
||||
owner: Boolean,
|
||||
version: Option[SocketVersion],
|
||||
apiVersion: ApiVersion
|
||||
): Fu[JsSocketHandler] = {
|
||||
val socket = socketMap getOrMake challengeId
|
||||
socket.ask[Connected](Join(sri, userId, owner, version, _)) map {
|
||||
case Connected(enum, member) => Handler.iteratee(
|
||||
hub,
|
||||
controller(socket, challengeId, sri, member),
|
||||
member,
|
||||
socket,
|
||||
sri,
|
||||
apiVersion
|
||||
) -> enum
|
||||
}
|
||||
}
|
||||
|
||||
private def controller(
|
||||
socket: ChallengeSocket,
|
||||
challengeId: Challenge.ID,
|
||||
sri: Sri,
|
||||
member: Member
|
||||
): Handler.Controller = {
|
||||
case ("ping", _) if member.owner => pingChallenge(challengeId)
|
||||
}
|
||||
}
|
|
@ -8,7 +8,5 @@ package object challenge extends PackageObject with WithSocket {
|
|||
|
||||
type EitherChallenger = Either[Challenge.Anonymous, Challenge.Registered]
|
||||
|
||||
private[challenge] type SocketMap = lila.hub.TrouperMap[ChallengeSocket]
|
||||
|
||||
private[challenge] def inTwoWeeks = DateTime.now plusWeeks 2
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ object RoomSocket {
|
|||
chat: akka.actor.ActorSelection,
|
||||
publicSource: RoomId => PublicSource.type => Option[PublicSource],
|
||||
localTimeout: Option[(RoomId, User.ID, User.ID) => Fu[Boolean]] = None
|
||||
): Handler = {
|
||||
): Handler = ({
|
||||
case Protocol.In.ChatSay(roomId, userId, msg) =>
|
||||
chat ! lila.chat.actorApi.UserTalk(Chat.Id(roomId.value), userId, msg, publicSource(roomId)(PublicSource))
|
||||
case Protocol.In.ChatTimeout(roomId, modId, suspect, reason) => lila.chat.ChatTimeout.Reason(reason) foreach { r =>
|
||||
|
@ -66,6 +66,9 @@ object RoomSocket {
|
|||
chat ! lila.chat.actorApi.Timeout(Chat.Id(roomId.value), modId, suspect, r, local = local)
|
||||
}
|
||||
}
|
||||
}: Handler) orElse minRoomHandler(rooms)
|
||||
|
||||
def minRoomHandler(rooms: TrouperMap[RoomState]): Handler = {
|
||||
case Protocol.In.KeepAlives(roomIds) => roomIds foreach { roomId =>
|
||||
rooms touchOrMake roomId.value
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ final class JsonView(
|
|||
.add("onGame" -> (opponent.isAi || socket.onGame(opponent.color))),
|
||||
"url" -> Json.obj(
|
||||
"socket" -> {
|
||||
if (useRemoteSocket(game.id)) s"/ws/play/$fullId/v$apiVersion"
|
||||
if (useRemoteSocket(game.id)) s"/play/$fullId/v$apiVersion"
|
||||
else s"/$fullId/socket/v$apiVersion"
|
||||
},
|
||||
"round" -> s"/$fullId"
|
||||
|
@ -173,7 +173,7 @@ final class JsonView(
|
|||
"orientation" -> pov.color.name,
|
||||
"url" -> Json.obj(
|
||||
"socket" -> {
|
||||
if (useRemoteSocket(game.id)) s"/ws/watch/$gameId/${color.name}/v$apiVersion"
|
||||
if (useRemoteSocket(game.id)) s"/watch/$gameId/${color.name}/v$apiVersion"
|
||||
else s"/$gameId/${color.name}/socket/v$apiVersion"
|
||||
},
|
||||
"round" -> s"/$gameId/${color.name}"
|
||||
|
|
|
@ -309,14 +309,14 @@ private final class StudySocket(
|
|||
def reloadSriBecauseOf(sri: Sri, chapterId: Chapter.Id) = notifySri(sri, "reload", Json.obj("chapterId" -> chapterId))
|
||||
def validationError(error: String, sri: Sri) = notifySri(sri, "validationError", Json.obj("error" -> error))
|
||||
|
||||
api registerSocket this
|
||||
|
||||
private val InviteLimitPerUser = new lila.memo.RateLimit[User.ID](
|
||||
credits = 50,
|
||||
duration = 24 hour,
|
||||
name = "study invites per user",
|
||||
key = "study_invite.user"
|
||||
)
|
||||
|
||||
api registerSocket this
|
||||
}
|
||||
|
||||
object StudySocket {
|
||||
|
|
Loading…
Reference in New Issue