Work on realtime message notification

This commit is contained in:
Thibault Duplessis 2012-05-28 13:17:01 +02:00
parent 9a120752e7
commit 43ea087785
8 changed files with 70 additions and 13 deletions

View file

@ -24,7 +24,9 @@ final class ThreadRepo(
find(visibleByUserQuery(user)).sort(sortQuery).toList
}
def userNbUnread(user: User): IO[Int] = io {
def userNbUnread(user: User): IO[Int] = userNbUnread(user.id)
def userNbUnread(userId: ObjectId): IO[Int] = io {
val result = collection.mapReduce(
mapFunction = """function() {
var thread = this, nb = 0;
@ -36,14 +38,14 @@ final class ThreadRepo(
}
});
if (nb > 0) emit("n", nb);
}""" format user.idString,
}""" format userId.toString,
reduceFunction = """function(key, values) {
var sum = 0;
for(var i in values) { sum += values[i]; }
return sum;
}""",
output = MapReduceInlineOutput,
query = visibleByUserQuery(user).some)
query = visibleByUserQuery(userId).some)
(for {
row result.hasNext option result.next
sum row.getAs[Double]("value")
@ -76,7 +78,11 @@ final class ThreadRepo(
def userQuery(user: User) = DBObject("userIds" -> user.id)
def visibleByUserQuery(user: User) = DBObject("visibleByUserIds" -> user.id)
def visibleByUserQuery(user: User): DBObject =
visibleByUserQuery(user.id)
def visibleByUserQuery(userId: ObjectId): DBObject =
DBObject("visibleByUserIds" -> userId)
def selectId(id: String) = DBObject("_id" -> id)

View file

@ -10,8 +10,10 @@ final class UnreadCache(threadRepo: ThreadRepo) {
// username, nb unread
val cache = mutable.Map[String, Int]()
def get(user: User) =
cache.getOrElseUpdate(user.usernameCanonical, {
def get(user: User): Int = get(user.id)
def get(userId: ObjectId): Int =
cache.getOrElseUpdate(username.toLowerCase, {
(threadRepo userNbUnread user).unsafePerformIO
})

View file

@ -1,8 +1,9 @@
package lila
package socket
import akka.actor._
import core.Global.env // fuck. need it for message unread cache
import akka.actor._
import play.api.libs.json._
abstract class HubActor[M <: SocketMember](uidTimeout: Int) extends Actor {
@ -45,7 +46,11 @@ abstract class HubActor[M <: SocketMember](uidTimeout: Int) extends Actor {
def ping(uid: String) {
setAlive(uid)
member(uid) foreach (_.channel push pong)
member(uid) foreach { m
m.channel push m.username.fold(
u pong ++ JsObject(Seq("m" -> env.message.unreadCache.get(u))),
pong)
}
}
def broom() {
@ -75,7 +80,7 @@ abstract class HubActor[M <: SocketMember](uidTimeout: Int) extends Actor {
def uids = members.keys
def member(uid: String) = members get uid
def member(uid: String): Option[M] = members get uid
def usernames: Iterable[String] = members.values.map(_.username).flatten
}

View file

@ -2,14 +2,25 @@ package lila
package user
import scala.collection.mutable
import com.mongodb.casbah.Imports.ObjectId
final class Cached(userRepo: UserRepo) {
// idString => username|Anonymous
val usernameCache = mutable.Map[String, String]()
// username => Option[ObjectId]
val idCache = mutable.Map[String, Option[ObjectId]]()
def username(userId: String) =
usernameCache.getOrElseUpdate(
userId,
(userRepo username userId).unsafePerformIO | "Anonymous"
)
def id(username: String): Option[ObjectId] =
idCache.getOrElseUpdate(
username.toLowerCase,
(userRepo id username).unsafePerformIO
)
}

View file

@ -19,6 +19,8 @@ class UserRepo(
private def byUsernameQuery(username: String) =
DBObject("usernameCanonical" -> username.toLowerCase)
private def byIdQuery(id: String) = DBObject("_id" -> new ObjectId(id))
def byId(userId: String): IO[Option[User]] = byId(new ObjectId(userId))
def byId(userId: ObjectId): IO[Option[User]] = io {
@ -26,9 +28,11 @@ class UserRepo(
}
def username(userId: String): IO[Option[String]] = io {
primitiveProjection[String](
DBObject("_id" -> new ObjectId(userId)),
"username")
primitiveProjection[String](byIdQuery(userId), "username")
}
def id(username: String): IO[Option[ObjectId]] = io {
primitiveProjection[ObjectId](byUsernameQuery(username), "_id")
}
def byUsername(username: String): IO[Option[User]] = io {

View file

@ -9,7 +9,7 @@ $.websocket = function(url, version, settings) {
},
options: {
name: "unnamed",
debug: false,
debug: true,
offlineDelay: 5000,
offlineTag: false,
pingData: JSON.stringify({t: "p"}),
@ -58,6 +58,7 @@ $.websocket.prototype = {
var m = JSON.parse(e.originalEvent.data);
if (m.t == "n") {
self.keepAlive();
self._debug(m);
} else {
self._debug(m);
}

1
todo
View file

@ -21,6 +21,7 @@ use play-navigator router case class MyRegexStr(value: String); implicit val MyR
http://codetunes.com/2012/05/09/scala-dsl-tutorial-writing-web-framework-router
use POST instead of GET where it makes sense
endgame sound http://en.lichess.org/forum/lichess-feedback/checkmate-sound-feature?page=1#1
cached username option app/user/Cached.scala
next deploy:
mongo lichess mongo_migration_user.js

27
unread.js Normal file
View file

@ -0,0 +1,27 @@
var userId = ObjectId("4cea2e4030c353af2b000000"); // thibault
var op = db.runCommand({
mapreduce: "m_thread",
query: {visibleByUserIds: userId},
out: {inline:1},
map: function() {
var thread = this, nb = 0;
thread.posts.forEach(function(p) {
if (!p.isRead) {
if (thread.creatorId.equals(ObjectId("4cea2e4030c353af2b000000"))) {
if (!p.isByCreator) nb++;
} else {
if (p.isByCreator) nb++;
}
}
});
if (nb > 0) emit("n", nb);
},
reduce: function(key, values) {
var sum = 0;
for(var i in values) { sum += values[i]; }
return sum;
}
});
printjson(op);