stream mass-sending of blog messages
parent
99de25867a
commit
ca69fa0966
|
@ -94,7 +94,7 @@ blog {
|
|||
delay = 20 minutes
|
||||
sender = "lichess-blog"
|
||||
}
|
||||
last_post_cache.ttl = 10 minutes
|
||||
last_post_cache.ttl = 5 minutes
|
||||
rss {
|
||||
email = "lichess.contact@gmail.com"
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ final class Env(
|
|||
private lazy val notifier = new Notifier(
|
||||
blogApi = api,
|
||||
messageApi = messageApi,
|
||||
lastPostCache = lastPostCache,
|
||||
lichessUserId = NotifySender)
|
||||
|
||||
def cli = new lila.common.Cli {
|
||||
|
|
|
@ -1,14 +1,17 @@
|
|||
package lila.blog
|
||||
|
||||
import io.prismic.Document
|
||||
import org.joda.time.DateTime
|
||||
import play.api.libs.iteratee._
|
||||
import reactivemongo.api._
|
||||
import reactivemongo.bson._
|
||||
|
||||
import lila.message.{ ThreadRepo, Api => MessageApi }
|
||||
import lila.user.UserRepo
|
||||
import org.joda.time.DateTime
|
||||
|
||||
private[blog] final class Notifier(
|
||||
blogApi: BlogApi,
|
||||
messageApi: MessageApi,
|
||||
lastPostCache: LastPostCache,
|
||||
lichessUserId: String) {
|
||||
|
||||
def sendMessages(prismicId: String): Funit =
|
||||
|
@ -23,14 +26,19 @@ private[blog] final class Notifier(
|
|||
}
|
||||
|
||||
private def doSend(post: Document): Funit =
|
||||
UserRepo recentlySeenNotKidIds DateTime.now.minusWeeks(1) flatMap { userIds =>
|
||||
(ThreadRepo reallyDeleteByCreatorId lichessUserId) >> {
|
||||
val thread = makeThread(post)
|
||||
val futures = userIds.toStream map { userId =>
|
||||
messageApi.lichessThread(thread.copy(to = userId))
|
||||
}
|
||||
lila.common.Future.lazyFold(futures)(())((_, _) => ()) >>- lastPostCache.clear
|
||||
}
|
||||
ThreadRepo.reallyDeleteByCreatorId(lichessUserId) >> {
|
||||
val thread = makeThread(post)
|
||||
UserRepo.recentlySeenNotKidIdsCursor(DateTime.now minusWeeks 1)
|
||||
.enumerate(500 * 1000, stopOnError = true) &>
|
||||
Enumeratee.map {
|
||||
_.getAs[String]("_id") err "User without an id"
|
||||
} |>>>
|
||||
Iteratee.foldM[String, Int](0) {
|
||||
case (count, userId) =>
|
||||
messageApi.lichessThread(thread.copy(to = userId)) inject (count + 1)
|
||||
} addEffect { count =>
|
||||
logger.info(s"Sent $count messages")
|
||||
} void
|
||||
}
|
||||
|
||||
private def makeThread(doc: Document) =
|
||||
|
|
|
@ -307,13 +307,13 @@ object UserRepo {
|
|||
$update.fieldUnchecked(id, "seenAt", $date(DateTime.now))
|
||||
}
|
||||
|
||||
def recentlySeenNotKidIds(since: DateTime) =
|
||||
coll.distinct("_id", BSONDocument(
|
||||
def recentlySeenNotKidIdsCursor(since: DateTime) =
|
||||
coll.find(BSONDocument(
|
||||
F.enabled -> true,
|
||||
"seenAt" -> BSONDocument("$gt" -> since),
|
||||
"count.game" -> BSONDocument("$gt" -> 9),
|
||||
"kid" -> BSONDocument("$ne" -> true)
|
||||
).some) map lila.db.BSON.asStrings
|
||||
), BSONDocument("_id" -> true)).cursor[BSONDocument]()
|
||||
|
||||
def setLang(id: ID, lang: String) = $update.field(id, "lang", lang)
|
||||
|
||||
|
|
Loading…
Reference in New Issue