fully streamed data export - closes #8602
parent
823bd99128
commit
55d98bf70f
|
@ -441,15 +441,15 @@ final class Account(
|
|||
val userId = get("user")
|
||||
.map(lila.user.User.normalize)
|
||||
.filter(id => me.id == id || isGranted(_.Impersonate)) | me.id
|
||||
env.user.repo byId userId flatMap {
|
||||
env.user.repo byId userId map {
|
||||
_ ?? { user =>
|
||||
env.api.personalDataExport(user) map { raw =>
|
||||
if (getBool("text"))
|
||||
Ok(raw)
|
||||
.withHeaders(CONTENT_DISPOSITION -> s"attachment; filename=lichess_${user.username}.txt")
|
||||
.as(TEXT)
|
||||
else Ok(html.account.bits.data(user, raw))
|
||||
}
|
||||
if (getBool("text"))
|
||||
Ok.chunked(env.api.personalDataExport(user))
|
||||
.withHeaders(
|
||||
noProxyBufferHeader,
|
||||
CONTENT_DISPOSITION -> s"attachment; filename=lichess_${user.username}.txt"
|
||||
)
|
||||
else Ok(html.account.bits.data(user))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,16 +10,15 @@ import controllers.routes
|
|||
|
||||
object bits {
|
||||
|
||||
def data(u: User, raw: String)(implicit ctx: Context) =
|
||||
def data(u: User)(implicit ctx: Context) =
|
||||
account.layout(title = s"${u.username} - personal data", active = "security") {
|
||||
div(cls := "account security personal-data box box-pad")(
|
||||
h1("My personal data"),
|
||||
div(cls := "personal-data__header")(
|
||||
p("Here is all personal information Lichess has about ", userLink(u)),
|
||||
a(cls := "button", href := s"${routes.Account.data}?user=${u.id}&text=1")(trans.downloadRaw())
|
||||
),
|
||||
st.section(
|
||||
pre(cls := "raw-text")(raw)
|
||||
a(cls := "button", href := s"${routes.Account.data}?user=${u.id}&text=1", downloadAttr)(
|
||||
trans.downloadRaw()
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
|
|
@ -21,88 +21,83 @@ final class PersonalDataExport(
|
|||
gameEnv: lila.game.Env,
|
||||
chatEnv: lila.chat.Env,
|
||||
relationEnv: lila.relation.Env,
|
||||
userRepo: lila.user.UserRepo,
|
||||
mongoCacheApi: lila.memo.MongoCache.Api
|
||||
)(implicit ec: ExecutionContext, mat: Materializer) {
|
||||
|
||||
def apply(user: User) = cache get user.id
|
||||
private val lightPerSecond = 60
|
||||
private val heavyPerSecond = 30
|
||||
|
||||
private val cache = mongoCacheApi[User.ID, String](64, "personal-data-export", 1 day, identity) { loader =>
|
||||
_.expireAfterAccess(1 minute)
|
||||
.buildAsyncFuture {
|
||||
loader(fetch)
|
||||
def apply(user: User): Source[String, _] = {
|
||||
|
||||
val intro =
|
||||
Source.futureSource {
|
||||
userRepo.currentOrPrevEmail(user.id) map { email =>
|
||||
Source(
|
||||
List(
|
||||
textTitle(s"Personal data export for ${user.username}"),
|
||||
"All dates are UTC",
|
||||
bigSep,
|
||||
s"Signup date: ${textDate(user.createdAt)}",
|
||||
s"Last seen: ${user.seenAt ?? textDate}",
|
||||
s"Public profile: ${user.profile.??(_.toString)}",
|
||||
s"Email: ${email.??(_.value)}"
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def fetch(userId: User.ID) =
|
||||
for {
|
||||
sessions <- securityEnv.store.allSessions(userId)
|
||||
posts <- forumEnv.postApi.allByUser(userId)
|
||||
msgs <- msgEnv.api.allMessagesOf(userId)
|
||||
following <- relationEnv.api.fetchFollowing(userId)
|
||||
chats <-
|
||||
gameEnv.gameRepo.coll
|
||||
.find($doc(Game.BSONFields.playerUids -> userId), $id(true).some)
|
||||
.cursor[Bdoc](ReadPreference.secondaryPreferred)
|
||||
.documentSource(90_000)
|
||||
.mapConcat { doc =>
|
||||
doc string "_id" toList
|
||||
val connections =
|
||||
Source(List(textTitle("Connections"))) concat
|
||||
securityEnv.store.allSessions(user.id).documentSource().throttle(lightPerSecond, 1 second).map { s =>
|
||||
s"${s.date.??(textDate)} ${s.ip} ${s.ua}"
|
||||
}
|
||||
|
||||
val followedUsers =
|
||||
Source.futureSource {
|
||||
relationEnv.api.fetchFollowing(user.id) map { userIds =>
|
||||
Source(List(textTitle("Followed players")) ++ userIds)
|
||||
}
|
||||
}
|
||||
|
||||
val forumPosts =
|
||||
Source(List(textTitle("Forum posts"))) concat
|
||||
forumEnv.postApi.allByUser(user.id).documentSource().throttle(heavyPerSecond, 1 second).map { p =>
|
||||
s"${textDate(p.createdAt)}\n${p.text}$bigSep"
|
||||
}
|
||||
|
||||
val privateMessages =
|
||||
Source(List(textTitle("Direct messages"))) concat
|
||||
msgEnv.api
|
||||
.allMessagesOf(user.id)
|
||||
.throttle(heavyPerSecond, 1 second)
|
||||
.map { case (text, date) =>
|
||||
s"${textDate(date)}\n$text$bigSep"
|
||||
}
|
||||
|
||||
val gameChats =
|
||||
Source(List(textTitle("Game chat messages"))) concat
|
||||
gameEnv.gameRepo.coll
|
||||
.find($doc(Game.BSONFields.playerUids -> user.id), $id(true).some)
|
||||
.cursor[Bdoc](ReadPreference.secondaryPreferred)
|
||||
.documentSource()
|
||||
.map { doc => ~doc.string("_id") }
|
||||
.throttle(heavyPerSecond, 1 second)
|
||||
.mapAsyncUnordered(8) { id =>
|
||||
chatEnv.api.userChat.findLinesBy(Chat.Id(id), userId)
|
||||
chatEnv.api.userChat.findLinesBy(Chat.Id(id), user.id)
|
||||
}
|
||||
.mapConcat(identity)
|
||||
.runWith(Sink.seq)
|
||||
} yield List(
|
||||
render.connections(sessions),
|
||||
render.followedUsers(following),
|
||||
render.forumPosts(posts),
|
||||
render.privateMessages(msgs),
|
||||
render.gameChats(chats)
|
||||
).flatten mkString "\n\n"
|
||||
|
||||
private object render {
|
||||
val outro = Source(List(textTitle("That's all we got.")))
|
||||
|
||||
def connections(sessions: List[lila.security.UserSession]) =
|
||||
List(
|
||||
textTitle(s"${sessions.size} Connections"),
|
||||
sessions.map { s =>
|
||||
s"${s.ip} ${s.date.??(textDate)}\n${s.ua}"
|
||||
} mkString "\n\n"
|
||||
)
|
||||
|
||||
def forumPosts(posts: List[lila.forum.Post]) =
|
||||
List(
|
||||
textTitle(s"${posts.size} Forum posts"),
|
||||
posts.map { p =>
|
||||
s"${textDate(p.createdAt)}\n${p.text}"
|
||||
} mkString bigSep
|
||||
)
|
||||
|
||||
def privateMessages(msgs: Seq[(String, DateTime)]) =
|
||||
List(
|
||||
textTitle(s"${msgs.size} Direct messages"),
|
||||
msgs.map { case (text, date) =>
|
||||
s"${textDate(date)}\n$text"
|
||||
} mkString bigSep
|
||||
)
|
||||
|
||||
def gameChats(lines: Seq[String]) =
|
||||
List(
|
||||
textTitle(s"${lines.size} Game chat messages"),
|
||||
lines mkString bigSep
|
||||
)
|
||||
|
||||
def followedUsers(userIds: Iterable[User.ID]) =
|
||||
List(
|
||||
textTitle(s"${userIds.size} Followed players"),
|
||||
userIds mkString "\n"
|
||||
)
|
||||
|
||||
private val bigSep = "\n\n------------------------------------------\n\n"
|
||||
|
||||
private def textTitle(t: String) = s"\n\n${"=" * t.length}\n$t\n${"=" * t.length}\n\n\n"
|
||||
|
||||
private val englishDateTimeFormatter = DateTimeFormat forStyle "MS"
|
||||
private def textDate(date: DateTime) = englishDateTimeFormatter print date
|
||||
List(intro, connections, followedUsers, forumPosts, privateMessages, gameChats, outro)
|
||||
.foldLeft(Source.empty[String])(_ concat _)
|
||||
}
|
||||
|
||||
private val bigSep = "\n------------------------------------------\n"
|
||||
|
||||
private def textTitle(t: String) = s"\n${"=" * t.length}\n$t\n${"=" * t.length}\n"
|
||||
|
||||
private val englishDateTimeFormatter = DateTimeFormat forStyle "MS"
|
||||
private def textDate(date: DateTime) = englishDateTimeFormatter print date
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package lila.forum
|
|||
|
||||
import actorApi._
|
||||
import org.joda.time.DateTime
|
||||
import reactivemongo.akkastream.{ cursorProducer, AkkaStreamCursor }
|
||||
import reactivemongo.api.ReadPreference
|
||||
import scala.util.chaining._
|
||||
|
||||
|
@ -230,12 +231,11 @@ final class PostApi(
|
|||
|
||||
def nbByUser(userId: String) = env.postRepo.coll.countSel($doc("userId" -> userId))
|
||||
|
||||
def allByUser(userId: User.ID) =
|
||||
def allByUser(userId: User.ID): AkkaStreamCursor[Post] =
|
||||
env.postRepo.coll
|
||||
.find($doc("userId" -> userId))
|
||||
.sort($doc("createdAt" -> -1))
|
||||
.cursor[Post](ReadPreference.secondaryPreferred)
|
||||
.list(2000)
|
||||
|
||||
private def recentUserIds(topic: Topic, newPostNumber: Int) =
|
||||
env.postRepo.coll
|
||||
|
|
|
@ -2,6 +2,7 @@ package lila.msg
|
|||
|
||||
import akka.stream.scaladsl._
|
||||
import org.joda.time.DateTime
|
||||
import reactivemongo.akkastream.{ cursorProducer, AkkaStreamCursor }
|
||||
import reactivemongo.api.ReadPreference
|
||||
import scala.util.Try
|
||||
|
||||
|
@ -9,8 +10,8 @@ import lila.common.config.MaxPerPage
|
|||
import lila.common.LilaStream
|
||||
import lila.common.{ Bus, LightUser }
|
||||
import lila.db.dsl._
|
||||
import lila.user.{ User, UserRepo }
|
||||
import lila.user.Holder
|
||||
import lila.user.{ User, UserRepo }
|
||||
|
||||
final class MsgApi(
|
||||
colls: MsgColls,
|
||||
|
@ -228,7 +229,7 @@ final class MsgApi(
|
|||
(res.nModified > 0) ?? notifier.onRead(threadId, me.id, contactId)
|
||||
}
|
||||
|
||||
def allMessagesOf(userId: User.ID): Fu[Vector[(String, DateTime)]] =
|
||||
def allMessagesOf(userId: User.ID): Source[(String, DateTime), _] =
|
||||
colls.thread
|
||||
.aggregateWith[Bdoc](
|
||||
readPreference = ReadPreference.secondaryPreferred
|
||||
|
@ -248,7 +249,15 @@ final class MsgApi(
|
|||
"$expr" -> $doc(
|
||||
"$and" -> $arr(
|
||||
$doc("$eq" -> $arr("$user", userId)),
|
||||
$doc("$eq" -> $arr("$tid", "$$t"))
|
||||
$doc("$eq" -> $arr("$tid", "$$t")),
|
||||
$doc(
|
||||
"$not" -> $doc(
|
||||
"$regexMatch" -> $doc(
|
||||
"input" -> "$text",
|
||||
"regex" -> "You received this because you are subscribed to messages of the team"
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -262,14 +271,13 @@ final class MsgApi(
|
|||
Project($doc("_id" -> false, "msg.text" -> true, "msg.date" -> true))
|
||||
)
|
||||
}
|
||||
.collect[Vector](maxDocs = 9000)
|
||||
.map { docs =>
|
||||
for {
|
||||
doc <- docs
|
||||
.documentSource()
|
||||
.mapConcat { doc =>
|
||||
(for {
|
||||
msg <- doc child "msg"
|
||||
text <- msg string "text"
|
||||
date <- msg.getAsOpt[DateTime]("date")
|
||||
} yield (text, date)
|
||||
} yield (text, date)).toList
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@ package lila.security
|
|||
|
||||
import org.joda.time.DateTime
|
||||
import play.api.mvc.RequestHeader
|
||||
import reactivemongo.akkastream.{ cursorProducer, AkkaStreamCursor }
|
||||
import reactivemongo.api.bson.BSONNull
|
||||
import reactivemongo.api.bson.{ BSONHandler, Macros }
|
||||
import reactivemongo.api.CursorProducer
|
||||
import reactivemongo.api.ReadPreference
|
||||
|
@ -11,7 +13,6 @@ import scala.concurrent.duration._
|
|||
import lila.common.{ ApiVersion, HTTPRequest, IpAddress }
|
||||
import lila.db.dsl._
|
||||
import lila.user.User
|
||||
import reactivemongo.api.bson.BSONNull
|
||||
|
||||
final class Store(val coll: Coll, cacheApi: lila.memo.CacheApi)(implicit
|
||||
ec: scala.concurrent.ExecutionContext
|
||||
|
@ -119,12 +120,11 @@ final class Store(val coll: Coll, cacheApi: lila.memo.CacheApi)(implicit
|
|||
.cursor[UserSession]()
|
||||
.gather[List](nb)
|
||||
|
||||
def allSessions(userId: User.ID): Fu[List[UserSession]] =
|
||||
def allSessions(userId: User.ID): AkkaStreamCursor[UserSession] =
|
||||
coll
|
||||
.find($doc("user" -> userId))
|
||||
.sort($doc("date" -> -1))
|
||||
.cursor[UserSession](ReadPreference.secondaryPreferred)
|
||||
.gather[List](200)
|
||||
|
||||
def setFingerPrint(id: String, fp: FingerPrint): Fu[FingerHash] =
|
||||
FingerHash(fp) match {
|
||||
|
|
Loading…
Reference in New Issue