more fighting with akka streams

play25-ws
Thibault Duplessis 2016-04-06 09:48:34 +07:00
parent 6f1dc19e8a
commit 4975f6c9ca
7 changed files with 37 additions and 34 deletions

View File

@ -14,12 +14,7 @@ object WorldMap extends LilaController {
}
def stream = Action.async {
import akka.pattern.ask
import makeTimeout.short
import lila.worldMap.Stream
import akka.stream.scaladsl.Source
Env.worldMap.stream ? Stream.GetPublisher mapTo manifest[Stream.PublisherType] map { publisher =>
val source = Source fromPublisher publisher
Env.worldMap.getSource map { source =>
Ok.chunked(source via EventSource.flow).as("text/event-stream")
}
}

View File

@ -1,7 +1,6 @@
@()
<!DOCTYPE html>
<html>
<head>
<title>Lichess World Map</title>

View File

@ -9,7 +9,7 @@ object AkkaStream {
def actorPublisher[T](
bufferSize: Int,
overflowStrategy: OverflowStrategy)(implicit materializer: Materializer): (ActorRef, Publisher[T]) = {
overflowStrategy: OverflowStrategy = OverflowStrategy.dropHead)(implicit materializer: Materializer): (ActorRef, Publisher[T]) = {
Source.actorRef[T](20, OverflowStrategy.dropHead)
.toMat(Sink asPublisher false)(Keep.both).run()
}

View File

@ -174,6 +174,8 @@ trait WithPlay { self: PackageObject =>
implicit final class LilaPimpedActorSystem(self: akka.actor.ActorSystem) {
def lilaBus = lila.common.Bus(self)
def lilaMat = materializer(self)
}
object makeTimeout {

View File

@ -17,7 +17,16 @@ final class Env(
Props(new Stream(
geoIp = MaxMindIpGeo(GeoIPFile, 0),
geoIpCacheTtl = GeoIPCacheTtl)))
system.lilaBus.subscribe(stream, 'changeFeaturedGame, 'streams, 'nbMembers, 'nbRounds)
system.lilaBus.subscribe(stream, 'roundDoor)
import akka.pattern.ask
import akka.stream.scaladsl.Source
import scala.concurrent.duration._
import play.api.libs.json.JsValue
implicit val timeout = akka.util.Timeout(5 seconds)
def getSource: Fu[Stream.SourceType] =
stream ? Stream.GetSource mapTo manifest[Stream.SourceType]
}
object Env {

View File

@ -15,60 +15,58 @@ private final class Stream(
geoIp: MaxMindIpGeo,
geoIpCacheTtl: Duration) extends Actor {
import Stream.game2json
import Stream._
implicit val mat = context.system.lilaMat
val games = scala.collection.mutable.Map.empty[String, Stream.Game]
val games = scala.collection.mutable.Map.empty[String, Game]
val (out, realtime) =
lila.common.AkkaStream.actorSource(20, OverflowStrategy.dropHead)(materializer(context.system))
val (out, publisher) = lila.common.AkkaStream.actorPublisher[Event](100)
def preload = Source[JsValue](games.values.map(game2json(makeMd5)).toList)
val preloadComplete = Source[JsValue](List(Json.obj("loadComplete" -> true): JsValue))
def preload = Source[JsValue](games.values.pp.map(game2json(makeMd5)).toList.pp)
val preloadComplete = Source[JsValue](List(Json.obj("loadComplete" -> true)))
val transformer = Flow.fromFunction[Stream.Event, JsValue] {
case Stream.Event.Add(game) => game2json(makeMd5)(game)
case Stream.Event.Remove(id) => Json.obj("id" -> id)
val transformer = Flow.fromFunction[Event, JsValue] {
case Event.Add(game) => game2json(makeMd5)(game)
case Event.Remove(id) => Json.obj("id" -> id)
}
def realtime = Source.fromPublisher[Event](publisher)
def makeSource =
Source.combine(preload, preloadComplete, realtime via transformer)(_ => Concat())
def makePublisher =
makeSource.toMat(Sink asPublisher false)(Keep.both).run()
def receive = {
case SocketEvent.OwnerJoin(id, color, ip) =>
ipCache get ip foreach { point =>
(ipCache get "58.8.28.97").pp foreach { point =>
val game = games get id match {
case Some(game) => game withPoint point
case None => Stream.Game(id, List(point))
case None => Game(id, List(point))
}
games += (id -> game)
out ! Stream.Event.Add(game)
out ! Event.Add(game).pp
}
case SocketEvent.Stop(id) =>
games -= id
out ! Stream.Event.Remove(id)
out ! Event.Remove(id)
case Stream.GetPublisher => sender ! makePublisher
case GetSource =>
println(games)
sender ! makeSource
}
implicit val mat = materializer(context.system)
def makeMd5 = MessageDigest getInstance "MD5"
val ipCache = lila.memo.Builder.cache(geoIpCacheTtl, ipToPoint)
def ipToPoint(ip: String): Option[Stream.Point] =
geoIp getLocation ip flatMap Stream.toPoint
def ipToPoint(ip: String): Option[Point] =
geoIp getLocation ip flatMap toPoint
}
object Stream {
case object GetPublisher
case object GetSource
import org.reactivestreams.Publisher
type PublisherType = Publisher[JsValue]
type SourceType = Source[JsValue, akka.NotUsed]
case class Game(id: String, points: List[Point]) {

View File

@ -39,7 +39,7 @@ $(function() {
point[1] + Math.random() - 0.5);
};
var source = new EventSource("http://en.lichess.org/network/stream");
var source = new EventSource("http://" + location.hostname + "/network/stream");
var removeFunctions = {};