Merge branch 'chatRefactor'

* chatRefactor:
  new chat lila-ws msgs
  lazy eval duct msg
  more chat refactoring
  refactor chats, round ducts no longer subscribe to bus events
pull/6447/head
Thibault Duplessis 2020-04-22 13:35:10 -06:00
commit 66f0fec9b5
14 changed files with 100 additions and 130 deletions

View File

@ -15,7 +15,7 @@ final private class ChallengeSocket(
def reload(challengeId: Challenge.ID): Unit =
rooms.tell(challengeId, NotifyVersion("reload", JsNull))
lazy val rooms = makeRoomMap(send, false)
lazy val rooms = makeRoomMap(send)
private lazy val send: String => Unit = remoteSocketApi.makeSender("chal-out").apply _

View File

@ -6,6 +6,7 @@ import scala.concurrent.duration._
import lila.common.config.NetDomain
import lila.common.String.noShouting
import lila.common.Bus
import lila.db.dsl._
import lila.hub.actorApi.shutup.{ PublicSource, RecordPrivateChat, RecordPublicChat }
import lila.memo.CacheApi._
@ -24,7 +25,7 @@ final class ChatApi(
netDomain: NetDomain
)(implicit ec: scala.concurrent.ExecutionContext) {
import Chat.{ chanOf, chatIdBSONHandler, userChatBSONHandler }
import Chat.{ chatIdBSONHandler, userChatBSONHandler }
object userChat {
@ -74,7 +75,7 @@ final class ChatApi(
}
}
def write(chatId: Chat.Id, userId: String, text: String, publicSource: Option[PublicSource]): Funit =
def write(chatId: Chat.Id, userId: User.ID, text: String, publicSource: Option[PublicSource]): Funit =
makeLine(chatId, userId, text) flatMap {
_ ?? { line =>
pushLine(chatId, line) >>- {
@ -86,8 +87,7 @@ final class ChatApi(
}
}
publish(chatId, actorApi.ChatLine(chatId, line))
val parentName = publicSource.fold("player")(_.parentName)
lila.mon.chat.message(parentName, line.troll).increment()
lila.mon.chat.message(publicSource.fold("player")(_.parentName), line.troll).increment()
}
}
}
@ -147,7 +147,7 @@ final class ChatApi(
coll.update.one($id(chat.id), chat).void >>
chatTimeout.add(c, mod, user, reason, scope) >>- {
cached invalidate chat.id
publish(chat.id, actorApi.OnTimeout(user.id))
publish(chat.id, actorApi.OnTimeout(chat.id, user.id))
line foreach { l =>
publish(chat.id, actorApi.ChatLine(chat.id, l))
}
@ -166,14 +166,15 @@ final class ChatApi(
val chat = c.markDeleted(user)
coll.update.one($id(chat.id), chat).void >>- {
cached invalidate chat.id
publish(chat.id, actorApi.OnTimeout(user.id))
publish(chat.id, actorApi.OnTimeout(chat.id, user.id))
}
}
private def isMod(user: User) = lila.security.Granter(_.ChatTimeout)(user)
def reinstate(list: List[ChatTimeout.Reinstate]) = list.foreach { r =>
publish(Chat.Id(r.chat), actorApi.OnReinstate(r.user))
val chatId = Chat.Id(r.chat)
publish(chatId, actorApi.OnReinstate(chatId, r.user))
}
private[ChatApi] def makeLine(chatId: Chat.Id, userId: String, t1: String): Fu[Option[UserLine]] =
@ -227,8 +228,10 @@ final class ChatApi(
}
}
private def publish(chatId: Chat.Id, msg: Any): Unit =
lila.common.Bus.publish(msg, chanOf(chatId))
private def publish(chatId: Chat.Id, msg: Any): Unit = {
Bus.publish(msg, "chat")
Bus.publish(msg, Chat chanOf chatId)
}
def remove(chatId: Chat.Id) = coll.delete.one($id(chatId)).void

View File

@ -2,7 +2,8 @@ package lila.chat
package actorApi
case class ChatLine(chatId: Chat.Id, line: Line)
case class RoundLine(line: Line, watcher: Boolean)
case class Timeout(chatId: Chat.Id, mod: String, userId: String, reason: ChatTimeout.Reason, local: Boolean)
case class OnTimeout(userId: String)
case class OnReinstate(userId: String)
case class OnTimeout(chatId: Chat.Id, userId: String)
case class OnReinstate(chatId: Chat.Id, userId: String)

View File

@ -12,9 +12,7 @@ object Bus {
type Channel = String
type Subscriber = Tellable
def publish(payload: Any, channel: Channel): Unit = {
publish(Bus.Event(payload, channel))
}
def publish(payload: Any, channel: Channel): Unit = bus.publish(payload, channel)
def subscribe = bus.subscribe _
@ -43,8 +41,6 @@ object Bus {
}
def unsubscribe(ref: ActorRef, from: Iterable[Channel]) = from foreach { bus.unsubscribe(Tellable(ref), _) }
def publish(event: Event): Unit = bus.publish(event.payload, event.channel)
def ask[A](channel: Channel, timeout: FiniteDuration = 1.second)(makeMsg: Promise[A] => Any)(
implicit
ec: scala.concurrent.ExecutionContext,

View File

@ -28,8 +28,7 @@ case class MoveGameEvent(
move: String
)
object MoveGameEvent {
def makeChan(gameId: Game.ID) = s"moveEvent:$gameId"
def makeBusEvent(event: MoveGameEvent) = lila.common.Bus.Event(event, makeChan(event.game.id))
def makeChan(gameId: Game.ID) = s"moveEvent:$gameId"
}
case class BoardDrawOffer(pov: Pov)

View File

@ -17,7 +17,7 @@ final class DuctConcMap[D <: Duct](
def tell(id: String, msg: Any): Unit = getOrMake(id) ! msg
def tellIfPresent(id: String, msg: Any): Unit = getIfPresent(id) foreach (_ ! msg)
def tellIfPresent(id: String, msg: => Any): Unit = getIfPresent(id) foreach (_ ! msg)
def tellAll(msg: Any) =
ducts.forEachValue(16, _ ! msg)

View File

@ -1,7 +1,6 @@
package lila.room
import lila.chat.{ Chat, ChatApi, UserLine, ChatTimeout, actorApi => chatApi }
import lila.common.Bus
import lila.chat.{ Chat, ChatApi, ChatTimeout, UserLine }
import lila.hub.actorApi.shutup.PublicSource
import lila.hub.{ Trouper, TrouperMap }
import lila.log.Logger
@ -21,9 +20,7 @@ object RoomSocket {
def msg = makeMessage(tpe, data)
}
case class RoomChat(classifier: String)
final class RoomState(roomId: RoomId, send: Send, chat: Option[RoomChat])(
final class RoomState(roomId: RoomId, send: Send)(
implicit ec: ExecutionContext
) extends Trouper {
@ -34,38 +31,27 @@ object RoomSocket {
case SetVersion(v) => version = v
case nv: NotifyVersion[_] =>
version = version.inc
send(Protocol.Out.tellRoomVersion(roomId, nv.msg, version, nv.troll))
case lila.chat.actorApi.ChatLine(_, line) =>
line match {
case line: UserLine => this ! NotifyVersion("message", lila.chat.JsonView(line), line.troll)
case _ =>
send {
val tell =
if (chatMsgs(nv.tpe)) Protocol.Out.tellRoomChat _
else Protocol.Out.tellRoomVersion _
tell(roomId, nv.msg, version, nv.troll)
}
case chatApi.OnTimeout(userId) =>
this ! NotifyVersion("chat_timeout", userId, false)
case chatApi.OnReinstate(userId) =>
this ! NotifyVersion("chat_reinstate", userId, false)
}
override def stop() = {
super.stop()
send(Protocol.Out.stop(roomId))
chat foreach { c =>
Bus.unsubscribe(this, c.classifier)
}
}
chat foreach { c =>
Bus.subscribe(this, c.classifier)
}
}
def makeRoomMap(send: Send, chatBus: Boolean)(
def makeRoomMap(send: Send)(
implicit ec: ExecutionContext,
mode: play.api.Mode
) = new TrouperMap(
mkTrouper = roomId =>
new RoomState(
RoomId(roomId),
send,
chatBus option RoomChat(Chat chanOf Chat.Id(roomId))
send
),
accessTimeout = 5 minutes
)
@ -103,15 +89,30 @@ object RoomSocket {
}
}
private val chatMsgs = Set("message", "chat_timeout", "chat_reinstate")
def subscribeChat(rooms: TrouperMap[RoomState]) = {
import lila.chat.actorApi._
lila.common.Bus.subscribeFun("chat") {
case ChatLine(id, line: UserLine) =>
rooms.tellIfPresent(id.value, NotifyVersion("message", lila.chat.JsonView(line), line.troll))
case OnTimeout(id, userId) =>
rooms.tellIfPresent(id.value, NotifyVersion("chat_timeout", userId, false))
case OnReinstate(id, userId) =>
rooms.tellIfPresent(id.value, NotifyVersion("chat_reinstate", userId, false))
}
}
object Protocol {
object In {
case class ChatSay(roomId: RoomId, userId: String, msg: String) extends P.In
case class ChatTimeout(roomId: RoomId, userId: String, suspect: String, reason: String, text: String) extends P.In
case class KeepAlives(roomIds: Iterable[RoomId]) extends P.In
case class TellRoomSri(roomId: RoomId, tellSri: P.In.TellSri) extends P.In
case class SetVersions(versions: Iterable[(String, SocketVersion)]) extends P.In
case class ChatSay(roomId: RoomId, userId: String, msg: String) extends P.In
case class ChatTimeout(roomId: RoomId, userId: String, suspect: String, reason: String, text: String)
extends P.In
case class KeepAlives(roomIds: Iterable[RoomId]) extends P.In
case class TellRoomSri(roomId: RoomId, tellSri: P.In.TellSri) extends P.In
case class SetVersions(versions: Iterable[(String, SocketVersion)]) extends P.In
val reader: P.In.Reader = raw =>
raw.path match {
@ -152,6 +153,8 @@ object RoomSocket {
s"tell/room/user $roomId $userId ${Json stringify payload}"
def tellRoomUsers(roomId: RoomId, userIds: Iterable[User.ID], payload: JsObject) =
s"tell/room/users $roomId ${P.Out.commas(userIds)} ${Json stringify payload}"
def tellRoomChat(roomId: RoomId, payload: JsObject, version: SocketVersion, isTroll: Boolean) =
s"tell/room/chat $roomId $version ${P.Out.boolean(isTroll)} ${Json stringify payload}"
def stop(roomId: RoomId) =
s"room/stop $roomId"
}

View File

@ -25,27 +25,23 @@ final class Messenger(api: ChatApi) {
api.userChat.system(chatId, message)
}
def watcher(chatId: Chat.Id, userId: User.ID, text: String) =
api.userChat.write(watcherId(chatId), userId, text, PublicSource.Watcher(chatId.value).some)
def watcher(gameId: Game.Id, userId: User.ID, text: String) =
api.userChat.write(watcherId(gameId), userId, text, PublicSource.Watcher(gameId.value).some)
private val whisperCommands = List("/whisper ", "/w ")
def owner(chatId: Chat.Id, userId: User.ID, text: String): Unit =
def owner(gameId: Game.Id, userId: User.ID, text: String): Unit =
whisperCommands.collectFirst {
case command if text startsWith command =>
val source = PublicSource.Watcher(chatId.value)
api.userChat.write(watcherId(chatId), userId, text drop command.size, source.some)
val source = PublicSource.Watcher(gameId.value)
api.userChat.write(watcherId(gameId), userId, text drop command.size, source.some)
} getOrElse {
if (!text.startsWith("/")) // mistyped command?
api.userChat.write(chatId, userId, text, publicSource = none).some
api.userChat.write(Chat.Id(gameId.value), userId, text, publicSource = none).some
}
def owner(chatId: Chat.Id, anonColor: chess.Color, text: String): Unit =
api.playerChat.write(chatId, anonColor, text)
// simul or tour chat from a game
def external(setup: Chat.Setup, userId: User.ID, text: String): Unit =
api.userChat.write(setup.id, userId, text, setup.publicSource.some)
def owner(gameId: Game.Id, anonColor: chess.Color, text: String): Unit =
api.playerChat.write(Chat.Id(gameId.value), anonColor, text)
def timeout(chatId: Chat.Id, modId: User.ID, suspect: User.ID, reason: String, text: String): Unit =
ChatTimeout.Reason(reason) foreach { r =>
@ -53,4 +49,5 @@ final class Messenger(api: ChatApi) {
}
private def watcherId(chatId: Chat.Id) = Chat.Id(s"$chatId/w")
private def watcherId(gameId: Game.Id) = Chat.Id(s"$gameId/w")
}

View File

@ -144,7 +144,7 @@ final private class Player(
// I checked and the bus doesn't do much if there's no subscriber for a classifier,
// so we should be good here.
// also used for targeted TvBroadcast subscription
Bus.publish(MoveGameEvent makeBusEvent MoveGameEvent(game, moveEvent.fen, moveEvent.move))
Bus.publish(MoveGameEvent(game, moveEvent.fen, moveEvent.move), MoveGameEvent makeChan game.id)
// publish correspondence moves
if (game.isCorrespondence && game.nonAi)

View File

@ -9,7 +9,6 @@ import scala.util.chaining._
import actorApi._, round._
import chess.{ Black, Centis, Color, White }
import lila.chat.Chat
import lila.common.Bus
import lila.game.actorApi.UserStartGame
import lila.game.Game.{ FullId, PlayerId }
@ -46,11 +45,7 @@ final private[round] class RoundDuct(
private var version = SocketVersion(0)
private var mightBeSimul = true // until proven false
private var chatIds = ChatIds(
priv = Left(Chat.Id(gameId)), // until replaced with tourney/simul chat
pub = Chat.Id(s"$gameId/w")
)
private var mightBeSimul = true // until proven otherwise
final private class Player(color: Color) {
@ -123,10 +118,8 @@ final private[round] class RoundDuct(
whitePlayer.userId = game.player(White).userId
blackPlayer.userId = game.player(Black).userId
mightBeSimul = game.isSimul
chatIds = chatIds update game
whitePlayer.goneWeight = whiteGoneWeight
blackPlayer.goneWeight = blackGoneWeight
buscriptions.chat()
if (game.playableByAi) player.requestFishnet(game, this)
}
@ -179,38 +172,14 @@ final private[round] class RoundDuct(
}
}
// chat
case lila.chat.actorApi.ChatLine(chatId, line) =>
case lila.chat.actorApi.RoundLine(line, watcher) =>
fuccess {
if (chatId.value contains gameId)
publish(List(line match {
case l: lila.chat.UserLine => Event.UserMessage(l, chatId == chatIds.pub)
case l: lila.chat.PlayerLine => Event.PlayerMessage(l)
}))
else // external chat - not versioned
socketSend(RP.Out.tellRoom(roomId, makeMessage("message", lila.chat.JsonView(line))))
publish(List(line match {
case l: lila.chat.UserLine => Event.UserMessage(l, watcher)
case l: lila.chat.PlayerLine => Event.PlayerMessage(l)
}))
}
case lila.chat.actorApi.OnTimeout(userId) =>
fuccess {
socketSend(RP.Out.tellRoom(roomId, makeMessage("chat_timeout", userId)))
}
case lila.chat.actorApi.OnReinstate(userId) =>
fuccess {
socketSend(RP.Out.tellRoom(roomId, makeMessage("chat_reinstate", userId)))
}
case Protocol.In.PlayerChatSay(_, Right(color), msg) =>
fuccess {
chatIds.priv.left.toOption foreach { messenger.owner(_, color, msg) }
}
case Protocol.In.PlayerChatSay(_, Left(userId), msg) =>
fuccess(chatIds.priv match {
case Left(chatId) => messenger.owner(chatId, userId, msg)
case Right(setup) => messenger.external(setup, userId, msg)
})
// chat end
case Protocol.In.HoldAlert(fullId, ip, mean, sd) =>
handle(fullId.playerId) { pov =>
gameRepo hasHoldAlert pov flatMap {
@ -484,13 +453,7 @@ final private[round] class RoundDuct(
}
} | funit
case Stop =>
fuccess {
if (buscriptions.started) {
buscriptions.unsubAll
socketSend(RP.Out.stop(roomId))
}
}
case Stop => fuccess { socketSend(RP.Out.stop(roomId)) }
}
private object buscriptions {
@ -503,18 +466,12 @@ final private[round] class RoundDuct(
chans = chans + chan
}
def started = chans.nonEmpty
def unsubAll() = {
def unsubAll() = if (chans.nonEmpty) {
Bus.unsubscribe(RoundDuct.this, chans)
chans = Set.empty
}
def tv(userId: User.ID): Unit = sub(s"userStartGame:$userId")
def chat() = chatIds.allIds foreach { chatId =>
sub(lila.chat.Chat chanOf chatId)
}
}
private def getPlayer(color: Color): Player = color.fold(whitePlayer, blackPlayer)
@ -607,14 +564,6 @@ object RoundDuct {
case object WsBoot
case class LilaStop(promise: Promise[Unit])
case class ChatIds(priv: Either[Chat.Id, Chat.Setup], pub: Chat.Id) {
def allIds = Seq(priv.fold(identity, _.id), pub)
def update(g: Game) = {
g.tournamentId.map(Chat.tournamentSetup) orElse
g.simulId.map(Chat.simulSetup)
}.fold(this)(setup => copy(priv = Right(setup)))
}
private[round] case class TakebackSituation(nbDeclined: Int, lastDeclined: Option[DateTime]) {
def decline = TakebackSituation(nbDeclined + 1, DateTime.now.some)

View File

@ -20,7 +20,7 @@ import lila.hub.actorApi.tv.TvSelect
import lila.hub.DuctConcMap
import lila.room.RoomSocket.{ Protocol => RP, _ }
import lila.socket.RemoteSocket.{ Protocol => P, _ }
import lila.socket.Socket.SocketVersion
import lila.socket.Socket.{ makeMessage, SocketVersion }
import lila.user.User
final class RoundSocket(
@ -80,10 +80,10 @@ final class RoundSocket(
}
duct
},
initialCapacity = 32768
initialCapacity = 65536
)
def tellRound(gameId: Game.Id, msg: Any): Unit = rounds.tell(gameId.value, msg)
private def tellRound(gameId: Game.Id, msg: Any): Unit = rounds.tell(gameId.value, msg)
private lazy val roundHandler: Handler = {
case Protocol.In.PlayerMove(fullId, uci, blur, lag) if !stopping =>
@ -106,9 +106,12 @@ final class RoundSocket(
case t => logger.warn(s"Unhandled round socket message: $t")
}
case Protocol.In.Flag(gameId, color, fromPlayerId) => tellRound(gameId, ClientFlag(color, fromPlayerId))
case c: Protocol.In.PlayerChatSay => tellRound(c.gameId, c)
case Protocol.In.WatcherChatSay(gameId, userId, msg) =>
messenger.watcher(Chat.Id(gameId.value), userId, msg)
case Protocol.In.PlayerChatSay(id, Right(color), msg) =>
messenger.owner(id, color, msg)
case Protocol.In.PlayerChatSay(id, Left(userId), msg) =>
messenger.owner(id, userId, msg)
case Protocol.In.WatcherChatSay(id, userId, msg) =>
messenger.watcher(id, userId, msg)
case RP.In.ChatTimeout(roomId, modId, suspect, reason, text) =>
messenger.timeout(Chat.Id(s"$roomId/w"), modId, suspect, reason, text)
case Protocol.In.Berserk(gameId, userId) => tournamentActor ! Berserk(gameId.value, userId)
@ -168,6 +171,19 @@ final class RoundSocket(
}
}
{
import lila.chat.actorApi._
Bus.subscribeFun("chat") {
case ChatLine(Chat.Id(id), l) =>
val line = RoundLine(l, id endsWith "/w")
rounds.tellIfPresent(if (line.watcher) id take Game.gameIdSize else id, line)
case OnTimeout(Chat.Id(id), userId) if rounds exists id =>
send(RP.Out.tellRoom(RoomId(id), makeMessage("chat_timeout", userId)))
case OnReinstate(Chat.Id(id), userId) if rounds exists id =>
send(RP.Out.tellRoom(RoomId(id), makeMessage("chat_reinstate", userId)))
}
}
system.scheduler.scheduleWithFixedDelay(25 seconds, tickInterval) { () =>
rounds.tellAll(RoundDuct.Tick)
}
@ -346,7 +362,7 @@ object RoundSocket {
)(implicit ec: scala.concurrent.ExecutionContext) {
import java.util.concurrent.ConcurrentHashMap
private[this] val terminations = new ConcurrentHashMap[String, Cancellable](32768)
private[this] val terminations = new ConcurrentHashMap[String, Cancellable](65536)
def schedule(gameId: Game.Id): Unit = terminations.compute(
gameId.value,

View File

@ -44,7 +44,9 @@ final private class SimulSocket(
send(RP.Out.tellRoomUser(RoomId(simul.id), userId, makeMessage("redirect", pov.fullId)))
}
lazy val rooms = makeRoomMap(send, true)
lazy val rooms = makeRoomMap(send)
subscribeChat(rooms)
private lazy val handler: Handler = roomHandler(rooms, chat, logger, roomId => _.Simul(roomId.value).some)

View File

@ -26,7 +26,9 @@ final private class StudySocket(
implicit def roomIdToStudyId(roomId: RoomId) = Study.Id(roomId.value)
implicit def studyIdToRoomId(studyId: Study.Id) = RoomId(studyId.value)
lazy val rooms = makeRoomMap(send, true)
lazy val rooms = makeRoomMap(send)
subscribeChat(rooms)
def isPresent(studyId: Study.Id, userId: User.ID): Fu[Boolean] =
remoteSocketApi.request[Boolean](

View File

@ -65,7 +65,9 @@ final private class TournamentSocket(
reload(tourId)
}
lazy val rooms = makeRoomMap(send, true)
lazy val rooms = makeRoomMap(send)
subscribeChat(rooms)
private lazy val handler: Handler =
roomHandler(rooms, chat, logger, roomId => _.Tournament(roomId.value).some)