always send game messages on the same redis channel

to avoid race conditions where lila-ws receives versioned game events
on several channels and loses ordering
pull/7961/head
Thibault Duplessis 2021-01-20 10:13:06 +01:00
parent ae6532ce1b
commit d4a136b724
2 changed files with 14 additions and 7 deletions

View File

@ -75,7 +75,7 @@ final class RoundSocket(
val duct = new RoundDuct(
dependencies = roundDependencies,
gameId = id,
socketSend = send
socketSend = sendForGameId(id)
)(ec, proxy)
terminationDelay schedule Game.Id(id)
duct.getGame dforeach {
@ -160,17 +160,19 @@ final class RoundSocket(
private def finishRound(gameId: Game.Id): Unit =
rounds.terminate(gameId.value, _ ! RoundDuct.Stop)
private lazy val send: String => Unit = remoteSocketApi.makeSender("r-out", parallelism = 8).apply _
private lazy val send: Sender = remoteSocketApi.makeSender("r-out", parallelism = 8)
private lazy val sendForGameId: Game.ID => String => Unit = gameId => msg => send.sticky(gameId, msg)
remoteSocketApi.subscribeRoundRobin("r-in", Protocol.In.reader, parallelism = 8)(
roundHandler orElse remoteSocketApi.baseHandler
) >>- send(P.Out.boot)
Bus.subscribeFun("tvSelect", "roundSocket", "tourStanding", "startGame", "finishGame") {
case TvSelect(gameId, speed, json) => send(Protocol.Out.tvSelect(gameId, speed, json))
case TvSelect(gameId, speed, json) => sendForGameId(gameId)(Protocol.Out.tvSelect(gameId, speed, json))
case Tell(gameId, e @ BotConnected(color, v)) =>
rounds.tell(gameId, e)
send(Protocol.Out.botConnected(gameId, color, v))
sendForGameId(gameId)(Protocol.Out.botConnected(gameId, color, v))
case Tell(gameId, msg) => rounds.tell(gameId, msg)
case TellIfExists(gameId, msg) => rounds.tellIfPresent(gameId, msg)
case TellMany(gameIds, msg) => rounds.tellIds(gameIds, msg)
@ -179,11 +181,11 @@ final class RoundSocket(
case TourStanding(tourId, json) => send(Protocol.Out.tourStanding(tourId, json))
case lila.game.actorApi.StartGame(game) if game.hasClock =>
game.userIds.some.filter(_.nonEmpty) foreach { usersPlaying =>
send(Protocol.Out.startGame(usersPlaying))
sendForGameId(game.id)(Protocol.Out.startGame(usersPlaying))
}
case lila.game.actorApi.FinishGame(game, _, _) if game.hasClock =>
game.userIds.some.filter(_.nonEmpty) foreach { usersPlaying =>
send(Protocol.Out.finishGame(game.id, game.winnerColor, usersPlaying))
sendForGameId(game.id)(Protocol.Out.finishGame(game.id, game.winnerColor, usersPlaying))
}
}

View File

@ -118,7 +118,8 @@ final class RemoteSocket(
final class StoppableSender(conn: StatefulRedisPubSubConnection[String, String], channel: Channel)
extends Sender {
def apply(msg: String): Unit = if (!stopping) conn.async.publish(channel, msg).unit
def apply(msg: String): Unit = if (!stopping) conn.async.publish(channel, msg).unit
def sticky(_id: String, msg: String): Unit = apply(msg)
}
final class RoundRobinSender(
@ -128,6 +129,9 @@ final class RemoteSocket(
) extends Sender {
def apply(msg: String): Unit =
if (!stopping) conn.async.publish(s"$channel:${msg.hashCode.abs % parallelism}", msg).unit
// use the ID to select the channel, not the entire message
def sticky(id: String, msg: String): Unit =
if (!stopping) conn.async.publish(s"$channel:${id.hashCode.abs % parallelism}", msg).unit
}
def makeSender(channel: Channel, parallelism: Int = 1): Sender =
@ -195,6 +199,7 @@ object RemoteSocket {
trait Sender {
def apply(msg: String): Unit
def sticky(_id: String, msg: String): Unit
}
object Protocol {