diff --git a/app/core/CoreEnv.scala b/app/core/CoreEnv.scala index d8f92330ae..eada923fe0 100644 --- a/app/core/CoreEnv.scala +++ b/app/core/CoreEnv.scala @@ -94,7 +94,8 @@ final class CoreEnv private (application: Application, val settings: Settings) { i18nKeys = i18n.keys, ai = ai.ai, countMove = monitor.mpsProvider.countRequest, - flood = security.flood) + flood = security.flood, + indexGame = search.indexGame) lazy val analyse = new lila.analyse.AnalyseEnv( settings = settings, @@ -122,6 +123,7 @@ final class CoreEnv private (application: Application, val settings: Settings) { lazy val search = new lila.search.SearchEnv( settings = settings, + mongodb = mongodb.apply _, gameRepo = game.gameRepo) lazy val metaHub = new lila.socket.MetaHub( diff --git a/app/core/Cron.scala b/app/core/Cron.scala index a34bb73735..1e04f4cc16 100644 --- a/app/core/Cron.scala +++ b/app/core/Cron.scala @@ -72,6 +72,10 @@ object Cron { env.ai.clientDiagnose } + effect(15 seconds, "index finished games") { + env.search.indexer.indexQueue + } + def message(freq: Duration)(to: (ActorRef, Any)) { Akka.system.scheduler.schedule(freq, freq.randomize(), to._1, to._2) } diff --git a/app/core/Settings.scala b/app/core/Settings.scala index ea1f3ca8e8..715c255d1b 100644 --- a/app/core/Settings.scala +++ b/app/core/Settings.scala @@ -29,6 +29,7 @@ final class Settings(config: Config) { val SearchESPort = getInt("search.elasticsearch.port") val SearchESCluster = getString("search.elasticsearch.cluster") val SearchPaginatorMaxPerPage = getInt("search.paginator.max_per_page") + val SearchCollectionQueue = getString("search.collection.queue") val RoundMessageLifetime = millis("round.message.lifetime") val RoundUidTimeout = millis("round.uid.timeout") diff --git a/app/round/Finisher.scala b/app/round/Finisher.scala index aa2b583b8f..55166b55a8 100644 --- a/app/round/Finisher.scala +++ b/app/round/Finisher.scala @@ -17,7 +17,8 @@ final class Finisher( messenger: Messenger, eloUpdater: EloUpdater, eloCalculator: EloCalculator, - finisherLock: FinisherLock) { + finisherLock: FinisherLock, + indexGame: DbGame ⇒ IO[Unit]) { type ValidIOEvents = Valid[IO[List[Event]]] @@ -85,6 +86,7 @@ final class Finisher( _ ← updateElo(g) _ ← incNbGames(g, White) _ ← incNbGames(g, Black) + _ ← indexGame(g) } yield p2.events) private def incNbGames(game: DbGame, color: Color): IO[Unit] = diff --git a/app/round/RoundEnv.scala b/app/round/RoundEnv.scala index bf925a4840..b31f710c7b 100644 --- a/app/round/RoundEnv.scala +++ b/app/round/RoundEnv.scala @@ -25,7 +25,8 @@ final class RoundEnv( i18nKeys: I18nKeys, ai: () ⇒ Ai, countMove: () ⇒ Unit, - flood: Flood) { + flood: Flood, + indexGame: DbGame ⇒ IO[Unit]) { implicit val ctx = app import settings._ @@ -68,7 +69,8 @@ final class RoundEnv( messenger = messenger, eloUpdater = eloUpdater, eloCalculator = eloCalculator, - finisherLock = finisherLock) + finisherLock = finisherLock, + indexGame = indexGame) lazy val eloCalculator = new chess.EloCalculator(true) diff --git a/app/search/Indexer.scala b/app/search/Indexer.scala index d617f73b10..f5b219723a 100644 --- a/app/search/Indexer.scala +++ b/app/search/Indexer.scala @@ -9,15 +9,19 @@ import com.codahale.jerkson.Json import org.elasticsearch.action.search.SearchResponse import com.traackr.scalastic.elasticsearch.{ Indexer ⇒ EsIndexer } +import com.mongodb.casbah.query.Imports._ -final class Indexer(es: EsIndexer, gameRepo: GameRepo) { +final class Indexer( + es: EsIndexer, + gameRepo: GameRepo, + queue: Queue) { val indexName = "lila" val typeName = "game" def rebuildAll: IO[Unit] = for { _ ← clear - nb ← indexAll + nb ← indexQuery(DBObject()) _ ← io { es.waitTillCountAtLeast(Seq(indexName), typeName, nb) } _ ← optimize } yield () @@ -37,6 +41,18 @@ final class Indexer(es: EsIndexer, gameRepo: GameRepo) { putStrLn("Search index: fail to produce a document from game " + game.id) ) + def indexQueue: IO[Unit] = for { + ids ← queue next 1000 + _ ← ids.toNel.fold( + neIds ⇒ for { + _ ← putStrLn("Search indexing %d games" format neIds.list.size) + _ ← indexQuery("_id" $in neIds.list) + _ ← queue remove neIds.list + } yield (), + io() + ) + } yield () + private def clear: IO[Unit] = io { es.deleteIndex(Seq(indexName)) es.createIndex(indexName, settings = Map()) @@ -45,11 +61,11 @@ final class Indexer(es: EsIndexer, gameRepo: GameRepo) { es.refresh() } - private def indexAll: IO[Int] = io { - val cursor = gameRepo find GameQuery.finished sort GameQuery.sortCreated //limit 3000 + private def indexQuery(query: DBObject): IO[Int] = io { + val cursor = gameRepo find (GameQuery.finished ++ query) sort GameQuery.sortCreated //limit 3000 var nb = 0 for (games ← cursor grouped 5000) { - println("Indexing " + nb) + println("Indexing from %d to %d".format(nb, nb + games.size)) val actions = games map (_.decode flatMap Game.from) collect { case Some((id, doc)) ⇒ es.index_prepare(indexName, typeName, id, Json generate doc).request diff --git a/app/search/Queue.scala b/app/search/Queue.scala new file mode 100644 index 0000000000..0c2768daa3 --- /dev/null +++ b/app/search/Queue.scala @@ -0,0 +1,25 @@ +package lila +package search + +import game.DbGame + +import com.mongodb.casbah.MongoCollection +import com.mongodb.casbah.query.Imports._ +import scalaz.effects._ + +final class Queue(collection: MongoCollection) { + + def enqueue(game: DbGame): IO[Unit] = enqueue(game.id) + + def enqueue(id: String): IO[Unit] = io { + collection += DBObject("_id" -> id) + } + + def next(size: Int): IO[List[String]] = io { + collection.find().limit(size).toList.map(_.getAs[String]("_id")).flatten + } + + def remove(ids: List[String]): IO[Unit] = io { + collection.remove("_id" $in ids) + } +} diff --git a/app/search/SearchEnv.scala b/app/search/SearchEnv.scala index c989c94c3b..bd09c9dd5d 100644 --- a/app/search/SearchEnv.scala +++ b/app/search/SearchEnv.scala @@ -1,15 +1,15 @@ package lila package search -import game.GameRepo +import game.{ GameRepo, DbGame } import core.Settings -import com.traackr.scalastic.elasticsearch.{ Indexer ⇒ EsIndexer } -import scalaz.effects._ -import akka.dispatch.Future +import com.traackr.scalastic.elasticsearch +import com.mongodb.casbah.MongoCollection final class SearchEnv( settings: Settings, + mongodb: String ⇒ MongoCollection, gameRepo: GameRepo) { import settings._ @@ -18,13 +18,19 @@ final class SearchEnv( lazy val indexer = new Indexer( es = esIndexer, - gameRepo = gameRepo) + gameRepo = gameRepo, + queue = queue) lazy val paginator = new PaginatorBuilder( indexer = indexer, maxPerPage = SearchPaginatorMaxPerPage) - private lazy val esIndexer = EsIndexer.transport( + def indexGame(game: DbGame) = queue enqueue game + + private lazy val queue = new Queue( + collection = mongodb(SearchCollectionQueue)) + + private lazy val esIndexer = elasticsearch.Indexer.transport( settings = Map( "cluster.name" -> SearchESCluster ),