asynchrous indexation

pull/867/merge
Thibault Duplessis 2015-09-01 20:28:20 +02:00
parent acdf4c52c0
commit b7940846ec
2 changed files with 5 additions and 5 deletions

View File

@ -58,14 +58,15 @@ final class GameSearchApi(client: ESClient) extends SearchReadApi[Game, Query] {
for {
size <- $count($select.all)
batchSize = 1000
limit = 20 * 1000
limit = Int.MaxValue
_ <- $enumerate.bulk[Option[Game]]($query.all, batchSize, limit) { gameOptions =>
val games = gameOptions.flatten filter storable
val nbGames = games.size
(GameRepo filterAnalysed games.map(_.id).toSeq flatMap { analysedIds =>
client.storeBulk(games map { g =>
Id(g.id) -> toDoc(g, analysedIds(g.id))
})
}).logFailure("game bulk")
funit // async!
}) >>- {
nb = nb + nbGames
nbSkipped = nbSkipped + gameOptions.size - nbGames

View File

@ -28,7 +28,7 @@ final class ESClientHttp(endpoint: String, index: Index) extends ESClient {
def storeBulk(docs: Seq[(Id, JsObject)]) =
HTTP(s"store/bulk/${index.name}", JsObject(docs map {
case (Id(id), doc) => id -> doc
case (Id(id), doc) => id -> JsString(Json.stringify(doc))
}))
def search[Q: Writes](query: Q, from: From, size: Size) =
@ -48,8 +48,7 @@ final class ESClientHttp(endpoint: String, index: Index) extends ESClient {
private def HTTP[D: Writes, R](url: String, data: D, read: String => R): Fu[R] =
WS.url(s"$endpoint/$url").post(Json toJson data) flatMap {
case res if res.status == 200 => fuccess(read(res.body))
case res if res.status == 500 => fufail(s"$url ${res.status}")
case res => fufail(s"$url ${res.status} ${res.body}")
case res => fufail(s"$url ${res.status}")
}
private def HTTP(url: String, data: JsObject): Funit = HTTP(url, data, _ => ())