migrate lobby module

rm0193-mapreduce
Thibault Duplessis 2019-12-03 08:34:21 -06:00
parent 5ee1d9f0b7
commit 87931b3938
10 changed files with 105 additions and 133 deletions

View File

@ -303,14 +303,7 @@ mailgun {
reply_to = "lichess.org <noreply@lichess.org>"
}
lobby {
net.domain = ${net.domain}
resync_ids_period = 25 seconds
collection.seek = seek
collection.seek_archive = seek_archive
seek {
max_per_page = 13
max_per_user = 5
}
max_playing = ${setup.max_playing}
}
timeline {

View File

@ -3,7 +3,7 @@ package lila.game
// Wrapper around newly created games. We do not know if the id is unique, yet.
case class NewGame(sloppy: Game) extends AnyVal {
def withId(id: Game.ID): Game = sloppy.withId(id)
def withUniqueId(idGenerator: IdGenerator): Fu[Game] =
def withUniqueId(implicit idGenerator: IdGenerator): Fu[Game] =
idGenerator.game dmap sloppy.withId
def start: NewGame = NewGame(sloppy.start)

View File

@ -1,9 +1,9 @@
package lila.lobby
import lila.game.Pov
import lila.user.UserRepo
private[lobby] final class AbortListener(
private final class AbortListener(
userRepo: lila.user.UserRepo,
seekApi: SeekApi,
lobbyTrouper: LobbyTrouper
) {
@ -15,8 +15,8 @@ private[lobby] final class AbortListener(
private def cancelColorIncrement(pov: Pov): Unit = pov.game.userIds match {
case List(u1, u2) =>
UserRepo.incColor(u1, -1)
UserRepo.incColor(u2, 1)
userRepo.incColor(u1, -1)
userRepo.incColor(u2, 1)
case _ =>
}
@ -32,7 +32,7 @@ private[lobby] final class AbortListener(
}
}
private def worthRecreating(seek: Seek): Fu[Boolean] = UserRepo byId seek.user.id map {
private def worthRecreating(seek: Seek): Fu[Boolean] = userRepo byId seek.user.id map {
_ exists { u =>
u.enabled && !u.lame
}

View File

@ -3,11 +3,14 @@ package lila.lobby
import chess.{ Game => ChessGame, Situation, Color => ChessColor }
import actorApi.{ JoinHook, JoinSeek }
import lila.game.{ GameRepo, Game, Player, PerfPicker }
import lila.user.{ User, UserRepo }
import lila.game.{ Game, Player, PerfPicker }
import lila.socket.Socket.Sri
import lila.user.User
private[lobby] object Biter {
private final class Biter(
userRepo: lila.user.UserRepo,
gameRepo: lila.game.GameRepo
)(implicit idGenerator: lila.game.IdGenerator) {
def apply(hook: Hook, sri: Sri, user: Option[LobbyUser]): Fu[JoinHook] =
if (canJoin(hook, user)) join(hook, sri, user)
@ -18,30 +21,30 @@ private[lobby] object Biter {
else fufail(s"$user cannot join seek $seek")
private def join(hook: Hook, sri: Sri, lobbyUserOption: Option[LobbyUser]): Fu[JoinHook] = for {
userOption <- lobbyUserOption.map(_.id) ?? UserRepo.byId
ownerOption <- hook.userId ?? UserRepo.byId
userOption <- lobbyUserOption.map(_.id) ?? userRepo.byId
ownerOption <- hook.userId ?? userRepo.byId
creatorColor <- assignCreatorColor(ownerOption, userOption, hook.realColor)
game <- makeGame(
hook,
whiteUser = creatorColor.fold(ownerOption, userOption),
blackUser = creatorColor.fold(userOption, ownerOption)
).withUniqueId
_ <- GameRepo insertDenormalized game
_ <- gameRepo insertDenormalized game
} yield {
lila.mon.lobby.hook.join()
JoinHook(sri, hook, game, creatorColor)
}
private def join(seek: Seek, lobbyUser: LobbyUser): Fu[JoinSeek] = for {
user <- UserRepo byId lobbyUser.id flatten s"No such user: ${lobbyUser.id}"
owner <- UserRepo byId seek.user.id flatten s"No such user: ${seek.user.id}"
user <- userRepo byId lobbyUser.id orFail s"No such user: ${lobbyUser.id}"
owner <- userRepo byId seek.user.id orFail s"No such user: ${seek.user.id}"
creatorColor <- assignCreatorColor(owner.some, user.some, seek.realColor)
game <- makeGame(
seek,
whiteUser = creatorColor.fold(owner.some, user.some),
blackUser = creatorColor.fold(user.some, owner.some)
).withUniqueId
_ <- GameRepo insertDenormalized game
_ <- gameRepo insertDenormalized game
} yield JoinSeek(user.id, seek, game, creatorColor)
private def assignCreatorColor(
@ -49,7 +52,7 @@ private[lobby] object Biter {
joinerUser: Option[User],
color: Color
): Fu[chess.Color] = color match {
case Color.Random => UserRepo.firstGetsWhite(creatorUser.map(_.id), joinerUser.map(_.id)) map chess.Color.apply
case Color.Random => userRepo.firstGetsWhite(creatorUser.map(_.id), joinerUser.map(_.id)) map chess.Color.apply
case Color.White => fuccess(chess.White)
case Color.Black => fuccess(chess.Black)
}

View File

@ -1,89 +1,51 @@
package lila.lobby
import akka.actor._
import com.typesafe.config.Config
import com.softwaremill.macwire._
import play.api.Configuration
import scala.concurrent.duration._
import lila.user.User
import lila.common.config._
final class Env(
config: Config,
appConfig: Configuration,
db: lila.db.Env,
hub: lila.hub.Env,
onStart: String => Unit,
blocking: User.ID => Fu[Set[User.ID]],
playban: String => Fu[Option[lila.playban.TempBan]],
onStart: lila.round.OnStart,
relationApi: lila.relation.RelationApi,
playbanApi: lila.playban.PlaybanApi,
gameCache: lila.game.Cached,
userRepo: lila.user.UserRepo,
gameRepo: lila.game.GameRepo,
poolApi: lila.pool.PoolApi,
asyncCache: lila.memo.AsyncCache.Builder,
remoteSocketApi: lila.socket.RemoteSocket,
system: ActorSystem
remoteSocketApi: lila.socket.RemoteSocket
)(implicit
system: akka.actor.ActorSystem,
idGenerator: lila.game.IdGenerator
) {
private val settings = new {
val NetDomain = config getString "net.domain"
val ResyncIdsPeriod = config duration "resync_ids_period"
val CollectionSeek = config getString "collection.seek"
val CollectionSeekArchive = config getString "collection.seek_archive"
val SeekMaxPerPage = config getInt "seek.max_per_page"
val SeekMaxPerUser = config getInt "seek.max_per_user"
val MaxPlaying = config getInt "max_playing"
}
import settings._
private lazy val maxPlaying = appConfig.get[Max]("lobby.max_playing")
lazy val seekApi = new SeekApi(
coll = db(CollectionSeek),
archiveColl = db(CollectionSeekArchive),
blocking = blocking,
asyncCache = asyncCache,
maxPerPage = SeekMaxPerPage,
maxPerUser = SeekMaxPerUser
private lazy val seekApiConfig = new SeekApi.Config(
coll = db(CollName("seek")),
archiveColl = db(CollName("seek_archive")),
maxPerPage = MaxPerPage(13),
maxPerUser = Max(5)
)
private val lobbyTrouper = LobbyTrouper.start(
broomPeriod = 2.seconds,
resyncIdsPeriod = ResyncIdsPeriod
) { () =>
new LobbyTrouper(
system = system,
seekApi = seekApi,
gameCache = gameCache,
maxPlaying = MaxPlaying,
blocking = blocking,
playban = playban,
poolApi = poolApi,
onStart = onStart
)
}(system)
lazy val seekApi = wire[SeekApi]
private val remoteSocket: LobbySocket = new LobbySocket(
remoteSocketApi = remoteSocketApi,
lobby = lobbyTrouper,
blocking = blocking,
poolApi = poolApi,
system = system
)
private lazy val lobbyTrouper = LobbyTrouper.start(
broomPeriod = 2 seconds,
resyncIdsPeriod = 25 seconds
) { () => wire[LobbyTrouper] }
private val abortListener = new AbortListener(seekApi, lobbyTrouper)
private lazy val remoteSocket: LobbySocket = wire[LobbySocket]
private lazy val abortListener = wire[AbortListener]
private lazy val biter = wire[Biter]
lila.common.Bus.subscribeFun("abortGame") {
case lila.game.actorApi.AbortedBy(pov) => abortListener(pov)
}
}
object Env {
lazy val current = "lobby" boot new Env(
config = lila.common.PlayApp loadConfig "lobby",
db = lila.db.Env.current,
hub = lila.hub.Env.current,
onStart = lila.round.Env.current.onStart,
blocking = lila.relation.Env.current.api.fetchBlocking,
playban = lila.playban.Env.current.api.currentBan _,
gameCache = lila.game.Env.current.cached,
poolApi = lila.pool.Env.current.api,
asyncCache = lila.memo.Env.current.asyncCache,
remoteSocketApi = lila.socket.Env.current.remoteSocket,
system = lila.common.PlayApp.system
)
}

View File

@ -15,12 +15,14 @@ import lila.pool.{ PoolApi, PoolConfig }
import lila.rating.RatingRange
import lila.socket.RemoteSocket.{ Protocol => P, _ }
import lila.socket.Socket.{ makeMessage, Sri, Sris }
import lila.user.{ User, UserRepo }
import lila.user.User
final class LobbySocket(
biter: Biter,
userRepo: lila.user.UserRepo,
remoteSocketApi: lila.socket.RemoteSocket,
lobby: LobbyTrouper,
blocking: User.ID => Fu[Set[User.ID]],
relationApi: lila.relation.RelationApi,
poolApi: PoolApi,
system: akka.actor.ActorSystem
) {
@ -41,13 +43,13 @@ final class LobbySocket(
case GetMember(sri, promise) => promise success members.get(sri.value)
case GetSrisP(promise) =>
promise success Sris(members.keySet.map(Sri.apply)(scala.collection.breakOut))
promise success Sris(members.keySet.view.map(Sri.apply).toSet)
lila.mon.lobby.socket.idle(idleSris.size)
lila.mon.lobby.socket.hookSubscribers(hookSubscriberSris.size)
case Cleanup =>
idleSris retain members.contains
hookSubscriberSris retain members.contains
idleSris filterInPlace members.contains
hookSubscriberSris filterInPlace members.contains
case Join(member) => members += (member.sri.value -> member)
@ -65,7 +67,7 @@ final class LobbySocket(
case AddHook(hook) => send(P.Out.tellSris(
hookSubscriberSris diff idleSris filter { sri =>
members get sri exists { Biter.showHookTo(hook, _) }
members get sri exists { biter.showHookTo(hook, _) }
} map Sri.apply,
makeMessage("had", hook.render)
))
@ -110,7 +112,7 @@ final class LobbySocket(
lila.common.Bus.subscribe(this, "changeFeaturedGame", "streams", "poolPairings", "lobbySocket")
system.scheduler.scheduleOnce(7 seconds)(this ! SendHookRemovals)
system.scheduler.schedule(1 minute, 1 minute)(this ! Cleanup)
system.scheduler.scheduleWithFixedDelay(1 minute, 1 minute)(() => this ! Cleanup)
private def tellActive(msg: JsObject): Unit = send(Out.tellLobbyActive(msg))
@ -180,7 +182,7 @@ final class LobbySocket(
PoolApi.Joiner(
userId = user.id,
sri = member.sri,
ratingMap = user.perfMap.mapValues(_.rating),
ratingMap = user.perfMap.view.mapValues(_.rating).toMap,
ratingRange = ratingRange,
lame = user.lame,
blocking = user.blocking ++ blocking
@ -205,10 +207,10 @@ final class LobbySocket(
private def getOrConnect(sri: Sri, userOpt: Option[User.ID]): Fu[Member] =
trouper.ask[Option[Member]](GetMember(sri, _)) getOrElse {
userOpt ?? UserRepo.enabledById flatMap { user =>
userOpt ?? userRepo.enabledById flatMap { user =>
(user ?? { u =>
remoteSocketApi.baseHandler(P.In.ConnectUser(u.id))
blocking(u.id)
relationApi.fetchBlocking(u.id)
}) map { blocks =>
val member = Member(sri, user map { LobbyUser.make(_, blocks) })
trouper ! Join(member)

View File

@ -6,20 +6,22 @@ import org.joda.time.DateTime
import actorApi._
import lila.common.{ Every, AtMost }
import lila.common.config.Max
import lila.game.Game
import lila.hub.Trouper
import lila.socket.Socket.{ Sri, Sris }
import lila.user.User
private[lobby] final class LobbyTrouper(
private final class LobbyTrouper(
system: akka.actor.ActorSystem,
seekApi: SeekApi,
biter: Biter,
gameCache: lila.game.Cached,
maxPlaying: Int,
blocking: String => Fu[Set[String]],
playban: String => Fu[Option[lila.playban.TempBan]],
maxPlaying: Max,
relationApi: lila.relation.RelationApi,
playbanApi: lila.playban.PlaybanApi,
poolApi: lila.pool.PoolApi,
onStart: Game.ID => Unit
onStart: lila.round.OnStart
) extends Trouper {
import LobbyTrouper._
@ -68,11 +70,11 @@ private[lobby] final class LobbyTrouper(
case BiteSeek(seekId, user) => NoPlayban(user.some) {
gameCache.nbPlaying(user.id) foreach { nbPlaying =>
if (nbPlaying < maxPlaying) {
if (maxPlaying > nbPlaying) {
lila.mon.lobby.seek.join()
seekApi find seekId foreach {
_ foreach { seek =>
Biter(seek, user) foreach this.!
biter(seek, user) foreach this.!
}
}
}
@ -121,7 +123,7 @@ private[lobby] final class LobbyTrouper(
case Resync => socket ! HookIds(HookRepo.vector.map(_.id))
case HookSub(member, true) =>
socket ! AllHooksFor(member, HookRepo.vector.filter { Biter.showHookTo(_, member) })
socket ! AllHooksFor(member, HookRepo.vector.filter { biter.showHookTo(_, member) })
case lila.pool.HookThieve.GetCandidates(clock, promise) =>
promise success lila.pool.HookThieve.PoolHooks(HookRepo poolCandidates clock)
@ -131,7 +133,7 @@ private[lobby] final class LobbyTrouper(
}
private def NoPlayban(user: Option[LobbyUser])(f: => Unit): Unit = {
user.?? { u => playban(u.id) } foreach {
user.?? { u => playbanApi.currentBan(u.id) } foreach {
case None => f
case _ =>
}
@ -141,7 +143,7 @@ private[lobby] final class LobbyTrouper(
HookRepo byId hookId foreach { hook =>
remove(hook)
HookRepo bySri sri foreach remove
Biter(hook, sri, user) foreach this.!
biter(hook, sri, user) foreach this.!
}
private def findCompatible(hook: Hook): Option[Hook] =
@ -149,7 +151,7 @@ private[lobby] final class LobbyTrouper(
private def findCompatibleIn(hook: Hook, in: Vector[Hook]): Option[Hook] = in match {
case Vector() => none
case h +: rest => if (Biter.canJoin(h, hook.user) && !(
case h +: rest => if (biter.canJoin(h, hook.user) && !(
(h.user |@| hook.user).tupled ?? {
case (u1, u2) => recentlyAbortedUserIdPairs.exists(u1.id, u2.id)
}
@ -195,7 +197,7 @@ private object LobbyTrouper {
)(makeTrouper: () => LobbyTrouper)(implicit system: akka.actor.ActorSystem) = {
val trouper = makeTrouper()
lila.common.Bus.subscribe(trouper, "lobbyTrouper")
system.scheduler.schedule(15 seconds, resyncIdsPeriod)(trouper ! actorApi.Resync)
system.scheduler.scheduleWithFixedDelay(15 seconds, resyncIdsPeriod)(() => trouper ! actorApi.Resync)
lila.common.ResilientScheduler(
every = Every(broomPeriod),
atMost = AtMost(10 seconds),

View File

@ -31,10 +31,10 @@ private[lobby] object LobbyUser {
)
private def perfMapOf(perfs: lila.user.Perfs): PerfMap =
perfs.perfs.collect {
perfs.perfs.view.collect {
case (key, perf) if key != PerfType.Puzzle.key && perf.nonEmpty =>
key -> LobbyPerf(perf.intRating, perf.provisional)
}(scala.collection.breakOut)
}.toMap
}
case class LobbyPerf(rating: Int, provisional: Boolean)

View File

@ -102,13 +102,13 @@ object Seek {
createdAt = DateTime.now
)
import reactivemongo.api.bson.{ MapReader => _, MapWriter => _, _ }
import lila.db.BSON.MapValue.MapHandler
import reactivemongo.api.bson._
import lila.db.BSON.BSONJodaDateTimeHandler
implicit val lobbyPerfBSONHandler = new BSONHandler[BSONInteger, LobbyPerf] {
def read(b: BSONInteger) = LobbyPerf(b.value.abs, b.value < 0)
def write(x: LobbyPerf) = BSONInteger(x.rating * (if (x.provisional) -1 else 1))
}
implicit val lobbyPerfBSONHandler =
BSONIntegerHandler.as[LobbyPerf](
b => LobbyPerf(b.abs, b < 0),
x => x.rating * (if (x.provisional) -1 else 1)
)
private[lobby] implicit val lobbyUserBSONHandler = Macros.handler[LobbyUser]
private[lobby] implicit val seekBSONHandler = Macros.handler[Seek]
}

View File

@ -3,17 +3,17 @@ package lila.lobby
import org.joda.time.DateTime
import scala.concurrent.duration._
import lila.common.config._
import lila.db.dsl._
import lila.user.User
final class SeekApi(
coll: Coll,
archiveColl: Coll,
blocking: String => Fu[Set[User.ID]],
asyncCache: lila.memo.AsyncCache.Builder,
maxPerPage: Int,
maxPerUser: Int
config: SeekApi.Config,
biter: Biter,
relationApi: lila.relation.RelationApi,
asyncCache: lila.memo.AsyncCache.Builder
) {
import config._
private sealed trait CacheKey
private object ForAnon extends CacheKey
@ -27,7 +27,7 @@ final class SeekApi(
private val cache = asyncCache.clearable[CacheKey, List[Seek]](
name = "lobby.seek.list",
f = {
case ForAnon => allCursor.gather[List](maxPerPage)
case ForAnon => allCursor.gather[List](maxPerPage.value)
case ForUser => allCursor.gather[List]()
},
maxCapacity = 2,
@ -42,15 +42,15 @@ final class SeekApi(
def forAnon = cache get ForAnon
def forUser(user: User): Fu[List[Seek]] =
blocking(user.id) flatMap { blocking =>
relationApi.fetchBlocking(user.id) flatMap { blocking =>
forUser(LobbyUser.make(user, blocking))
}
def forUser(user: LobbyUser): Fu[List[Seek]] = cache get ForUser map { seeks =>
val filtered = seeks.filter { seek =>
seek.user.id == user.id || Biter.canJoin(seek, user)
seek.user.id == user.id || biter.canJoin(seek, user)
}
noDupsFor(user, filtered) take maxPerPage
noDupsFor(user, filtered) take maxPerPage.value
}
private def noDupsFor(user: LobbyUser, seeks: List[Seek]) =
@ -66,8 +66,8 @@ final class SeekApi(
coll.find($id(id)).uno[Seek]
def insert(seek: Seek) = coll.insert(seek) >> findByUser(seek.user.id).flatMap {
case seeks if seeks.size <= maxPerUser => funit
case seeks => seeks.drop(maxPerUser).map(remove).sequenceFu
case seeks if maxPerUser >= seeks.size => funit
case seeks => seeks.drop(maxPerUser.value).map(remove).sequenceFu
}.void >>- cacheClear
def findByUser(userId: String): Fu[List[Seek]] =
@ -79,7 +79,7 @@ final class SeekApi(
coll.remove($doc("_id" -> seek.id)).void >>- cacheClear
def archive(seek: Seek, gameId: String) = {
val archiveDoc = Seek.seekBSONHandler.write(seek) ++ $doc(
val archiveDoc = Seek.seekBSONHandler.writeTry(seek).get ++ $doc(
"gameId" -> gameId,
"archivedAt" -> DateTime.now
)
@ -100,3 +100,13 @@ final class SeekApi(
def removeByUser(user: User) =
coll.remove($doc("user.id" -> user.id)).void >>- cacheClear
}
private object SeekApi {
final class Config(
val coll: Coll,
val archiveColl: Coll,
val maxPerPage: MaxPerPage,
val maxPerUser: Max
)
}