asynchronously index finished games

This commit is contained in:
Thibault Duplessis 2012-09-06 18:21:28 +02:00
parent 8d02347e86
commit 8711d82ec4
8 changed files with 73 additions and 15 deletions

View file

@ -94,7 +94,8 @@ final class CoreEnv private (application: Application, val settings: Settings) {
i18nKeys = i18n.keys, i18nKeys = i18n.keys,
ai = ai.ai, ai = ai.ai,
countMove = monitor.mpsProvider.countRequest, countMove = monitor.mpsProvider.countRequest,
flood = security.flood) flood = security.flood,
indexGame = search.indexGame)
lazy val analyse = new lila.analyse.AnalyseEnv( lazy val analyse = new lila.analyse.AnalyseEnv(
settings = settings, settings = settings,
@ -122,6 +123,7 @@ final class CoreEnv private (application: Application, val settings: Settings) {
lazy val search = new lila.search.SearchEnv( lazy val search = new lila.search.SearchEnv(
settings = settings, settings = settings,
mongodb = mongodb.apply _,
gameRepo = game.gameRepo) gameRepo = game.gameRepo)
lazy val metaHub = new lila.socket.MetaHub( lazy val metaHub = new lila.socket.MetaHub(

View file

@ -72,6 +72,10 @@ object Cron {
env.ai.clientDiagnose env.ai.clientDiagnose
} }
effect(15 seconds, "index finished games") {
env.search.indexer.indexQueue
}
def message(freq: Duration)(to: (ActorRef, Any)) { def message(freq: Duration)(to: (ActorRef, Any)) {
Akka.system.scheduler.schedule(freq, freq.randomize(), to._1, to._2) Akka.system.scheduler.schedule(freq, freq.randomize(), to._1, to._2)
} }

View file

@ -29,6 +29,7 @@ final class Settings(config: Config) {
val SearchESPort = getInt("search.elasticsearch.port") val SearchESPort = getInt("search.elasticsearch.port")
val SearchESCluster = getString("search.elasticsearch.cluster") val SearchESCluster = getString("search.elasticsearch.cluster")
val SearchPaginatorMaxPerPage = getInt("search.paginator.max_per_page") val SearchPaginatorMaxPerPage = getInt("search.paginator.max_per_page")
val SearchCollectionQueue = getString("search.collection.queue")
val RoundMessageLifetime = millis("round.message.lifetime") val RoundMessageLifetime = millis("round.message.lifetime")
val RoundUidTimeout = millis("round.uid.timeout") val RoundUidTimeout = millis("round.uid.timeout")

View file

@ -17,7 +17,8 @@ final class Finisher(
messenger: Messenger, messenger: Messenger,
eloUpdater: EloUpdater, eloUpdater: EloUpdater,
eloCalculator: EloCalculator, eloCalculator: EloCalculator,
finisherLock: FinisherLock) { finisherLock: FinisherLock,
indexGame: DbGame IO[Unit]) {
type ValidIOEvents = Valid[IO[List[Event]]] type ValidIOEvents = Valid[IO[List[Event]]]
@ -85,6 +86,7 @@ final class Finisher(
_ updateElo(g) _ updateElo(g)
_ incNbGames(g, White) _ incNbGames(g, White)
_ incNbGames(g, Black) _ incNbGames(g, Black)
_ indexGame(g)
} yield p2.events) } yield p2.events)
private def incNbGames(game: DbGame, color: Color): IO[Unit] = private def incNbGames(game: DbGame, color: Color): IO[Unit] =

View file

@ -25,7 +25,8 @@ final class RoundEnv(
i18nKeys: I18nKeys, i18nKeys: I18nKeys,
ai: () Ai, ai: () Ai,
countMove: () Unit, countMove: () Unit,
flood: Flood) { flood: Flood,
indexGame: DbGame IO[Unit]) {
implicit val ctx = app implicit val ctx = app
import settings._ import settings._
@ -68,7 +69,8 @@ final class RoundEnv(
messenger = messenger, messenger = messenger,
eloUpdater = eloUpdater, eloUpdater = eloUpdater,
eloCalculator = eloCalculator, eloCalculator = eloCalculator,
finisherLock = finisherLock) finisherLock = finisherLock,
indexGame = indexGame)
lazy val eloCalculator = new chess.EloCalculator(true) lazy val eloCalculator = new chess.EloCalculator(true)

View file

@ -9,15 +9,19 @@ import com.codahale.jerkson.Json
import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.action.search.SearchResponse
import com.traackr.scalastic.elasticsearch.{ Indexer EsIndexer } import com.traackr.scalastic.elasticsearch.{ Indexer EsIndexer }
import com.mongodb.casbah.query.Imports._
final class Indexer(es: EsIndexer, gameRepo: GameRepo) { final class Indexer(
es: EsIndexer,
gameRepo: GameRepo,
queue: Queue) {
val indexName = "lila" val indexName = "lila"
val typeName = "game" val typeName = "game"
def rebuildAll: IO[Unit] = for { def rebuildAll: IO[Unit] = for {
_ clear _ clear
nb indexAll nb indexQuery(DBObject())
_ io { es.waitTillCountAtLeast(Seq(indexName), typeName, nb) } _ io { es.waitTillCountAtLeast(Seq(indexName), typeName, nb) }
_ optimize _ optimize
} yield () } yield ()
@ -37,6 +41,18 @@ final class Indexer(es: EsIndexer, gameRepo: GameRepo) {
putStrLn("Search index: fail to produce a document from game " + game.id) putStrLn("Search index: fail to produce a document from game " + game.id)
) )
def indexQueue: IO[Unit] = for {
ids queue next 1000
_ ids.toNel.fold(
neIds for {
_ putStrLn("Search indexing %d games" format neIds.list.size)
_ indexQuery("_id" $in neIds.list)
_ queue remove neIds.list
} yield (),
io()
)
} yield ()
private def clear: IO[Unit] = io { private def clear: IO[Unit] = io {
es.deleteIndex(Seq(indexName)) es.deleteIndex(Seq(indexName))
es.createIndex(indexName, settings = Map()) es.createIndex(indexName, settings = Map())
@ -45,11 +61,11 @@ final class Indexer(es: EsIndexer, gameRepo: GameRepo) {
es.refresh() es.refresh()
} }
private def indexAll: IO[Int] = io { private def indexQuery(query: DBObject): IO[Int] = io {
val cursor = gameRepo find GameQuery.finished sort GameQuery.sortCreated //limit 3000 val cursor = gameRepo find (GameQuery.finished ++ query) sort GameQuery.sortCreated //limit 3000
var nb = 0 var nb = 0
for (games cursor grouped 5000) { for (games cursor grouped 5000) {
println("Indexing " + nb) println("Indexing from %d to %d".format(nb, nb + games.size))
val actions = games map (_.decode flatMap Game.from) collect { val actions = games map (_.decode flatMap Game.from) collect {
case Some((id, doc)) case Some((id, doc))
es.index_prepare(indexName, typeName, id, Json generate doc).request es.index_prepare(indexName, typeName, id, Json generate doc).request

25
app/search/Queue.scala Normal file
View file

@ -0,0 +1,25 @@
package lila
package search
import game.DbGame
import com.mongodb.casbah.MongoCollection
import com.mongodb.casbah.query.Imports._
import scalaz.effects._
final class Queue(collection: MongoCollection) {
def enqueue(game: DbGame): IO[Unit] = enqueue(game.id)
def enqueue(id: String): IO[Unit] = io {
collection += DBObject("_id" -> id)
}
def next(size: Int): IO[List[String]] = io {
collection.find().limit(size).toList.map(_.getAs[String]("_id")).flatten
}
def remove(ids: List[String]): IO[Unit] = io {
collection.remove("_id" $in ids)
}
}

View file

@ -1,15 +1,15 @@
package lila package lila
package search package search
import game.GameRepo import game.{ GameRepo, DbGame }
import core.Settings import core.Settings
import com.traackr.scalastic.elasticsearch.{ Indexer EsIndexer } import com.traackr.scalastic.elasticsearch
import scalaz.effects._ import com.mongodb.casbah.MongoCollection
import akka.dispatch.Future
final class SearchEnv( final class SearchEnv(
settings: Settings, settings: Settings,
mongodb: String MongoCollection,
gameRepo: GameRepo) { gameRepo: GameRepo) {
import settings._ import settings._
@ -18,13 +18,19 @@ final class SearchEnv(
lazy val indexer = new Indexer( lazy val indexer = new Indexer(
es = esIndexer, es = esIndexer,
gameRepo = gameRepo) gameRepo = gameRepo,
queue = queue)
lazy val paginator = new PaginatorBuilder( lazy val paginator = new PaginatorBuilder(
indexer = indexer, indexer = indexer,
maxPerPage = SearchPaginatorMaxPerPage) maxPerPage = SearchPaginatorMaxPerPage)
private lazy val esIndexer = EsIndexer.transport( def indexGame(game: DbGame) = queue enqueue game
private lazy val queue = new Queue(
collection = mongodb(SearchCollectionQueue))
private lazy val esIndexer = elasticsearch.Indexer.transport(
settings = Map( settings = Map(
"cluster.name" -> SearchESCluster "cluster.name" -> SearchESCluster
), ),