kamon traces WIP
parent
acb657ed45
commit
5e0aaaffc1
|
@ -0,0 +1,14 @@
|
|||
package lila.common
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
/**
|
||||
* For small code blocks that don't need to be run on a separate thread.
|
||||
*/
|
||||
object SameThread extends ExecutionContext {
|
||||
|
||||
val logger = lila.log.sameThread
|
||||
|
||||
override def execute(runnable: Runnable): Unit = runnable.run
|
||||
override def reportFailure(t: Throwable): Unit = logger.error(t.getMessage, t)
|
||||
}
|
|
@ -6,6 +6,8 @@ object log {
|
|||
|
||||
val boot = apply("boot")
|
||||
|
||||
val sameThread = apply("same-thread")
|
||||
|
||||
final class Logger(name: String) extends play.api.LoggerLike {
|
||||
|
||||
val logger = org.slf4j.LoggerFactory getLogger name
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
package lila
|
||||
|
||||
import kamon.Kamon.metrics
|
||||
import scala.concurrent.Future
|
||||
|
||||
import kamon.Kamon.{ metrics, tracer }
|
||||
import kamon.trace.TraceContext
|
||||
import kamon.util.RelativeNanoTimestamp
|
||||
|
||||
object mon {
|
||||
|
||||
|
@ -63,14 +67,10 @@ object mon {
|
|||
}
|
||||
object move {
|
||||
object full {
|
||||
val time = rec("round.move.full")
|
||||
val count = inc("round.move.full")
|
||||
}
|
||||
object segment {
|
||||
val queue = rec("round.move.segment.queue")
|
||||
val fetch = rec("round.move.segment.fetch")
|
||||
val logic = rec("round.move.segment.logic")
|
||||
val save = rec("round.move.segment.save")
|
||||
object trace {
|
||||
def create = makeTrace("round.move.trace")
|
||||
}
|
||||
val networkLag = rec("round.move.network_lag")
|
||||
}
|
||||
|
@ -328,6 +328,40 @@ object mon {
|
|||
}
|
||||
}
|
||||
|
||||
trait Trace {
|
||||
|
||||
def segment[A](name: String, categ: String)(f: => Future[A]): Future[A]
|
||||
|
||||
def segmentSync[A](name: String, categ: String)(f: => A): A
|
||||
|
||||
def finish(): Unit
|
||||
}
|
||||
|
||||
private final class KamonTrace(context: TraceContext) extends Trace {
|
||||
|
||||
def segment[A](name: String, categ: String)(f: => Future[A]): Future[A] = {
|
||||
val seg = context.startSegment(name, categ, "mon")
|
||||
f.onComplete(_ => seg.finish())(common.SameThread)
|
||||
f
|
||||
}
|
||||
|
||||
def segmentSync[A](name: String, categ: String)(f: => A): A = {
|
||||
val seg = context.startSegment(name, categ, "mon")
|
||||
val res = f
|
||||
seg.finish()
|
||||
res
|
||||
}
|
||||
|
||||
def finish() = context.finish()
|
||||
}
|
||||
|
||||
private def makeTrace(name: String): Trace = new KamonTrace(tracer.newContext(
|
||||
name = name,
|
||||
token = None,
|
||||
timestamp = RelativeNanoTimestamp.now,
|
||||
isOpen = true,
|
||||
isLocal = false))
|
||||
|
||||
private def nodots(s: String) = s.replace(".", "_")
|
||||
|
||||
private val logger = lila.log("monitor")
|
||||
|
|
|
@ -55,8 +55,6 @@ final class Env(
|
|||
|
||||
private val moveTimeChannel = system.actorOf(Props(classOf[lila.socket.Channel]), name = ChannelMoveTime)
|
||||
|
||||
private val moveMonitor = new MoveMonitor(system, moveTimeChannel)
|
||||
|
||||
lazy val eventHistory = History(db(CollectionHistory)) _
|
||||
|
||||
val roundMap = system.actorOf(Props(new lila.hub.ActorMap {
|
||||
|
@ -70,7 +68,6 @@ final class Env(
|
|||
drawer = drawer,
|
||||
forecastApi = forecastApi,
|
||||
socketHub = socketHub,
|
||||
monitorMove = moveMonitor.record,
|
||||
moretimeDuration = Moretime,
|
||||
activeTtl = ActiveTtl)
|
||||
def receive: Receive = ({
|
||||
|
@ -182,6 +179,8 @@ final class Env(
|
|||
|
||||
lazy val noteApi = new NoteApi(db(CollectionNote))
|
||||
|
||||
new MoveMonitor(system, moveTimeChannel)
|
||||
|
||||
scheduler.message(2.1 seconds)(roundMap -> actorApi.GetNbRounds)
|
||||
|
||||
system.actorOf(
|
||||
|
|
|
@ -8,20 +8,30 @@ private final class MoveMonitor(
|
|||
system: ActorSystem,
|
||||
channel: ActorRef) {
|
||||
|
||||
def record(nanos: Option[Long]) = {
|
||||
nanos foreach lila.mon.round.move.full.time
|
||||
lila.mon.round.move.full.count()
|
||||
}
|
||||
|
||||
Kamon.metrics.subscribe("histogram", "round.move.full", system.actorOf(Props(new Actor {
|
||||
Kamon.metrics.subscribe("trace", "**", system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case tick: TickMetricSnapshot => tick.metrics.collectFirst {
|
||||
case (entity, snapshot) if entity.category == "histogram" => snapshot
|
||||
} flatMap (_ histogram "histogram") foreach { h =>
|
||||
if (!h.isEmpty) channel ! lila.socket.Channel.Publish(
|
||||
lila.socket.Socket.makeMessage("mlat", (h.sum / h.numberOfMeasurements / 1000000).toInt)
|
||||
)
|
||||
case tick: TickMetricSnapshot => tick.metrics foreach {
|
||||
case (entity, snapshot) =>
|
||||
println("-------- " + entity)
|
||||
println(snapshot)
|
||||
println(snapshot.histogram("histogram"))
|
||||
println(snapshot.histogram("histogram").foreach { h =>
|
||||
println(h.sum)
|
||||
println(h.numberOfMeasurements)
|
||||
})
|
||||
}
|
||||
}
|
||||
})))
|
||||
|
||||
// Kamon.metrics.subscribe("histogram", "round.move.trace.elapsed-time", system.actorOf(Props(new Actor {
|
||||
// def receive = {
|
||||
// case tick: TickMetricSnapshot => tick.pp.metrics.pp.collectFirst {
|
||||
// case (entity, snapshot) if entity.category == "histogram" => snapshot
|
||||
// } flatMap (_ histogram "histogram") foreach { h =>
|
||||
// if (!h.pp.isEmpty.pp) channel ! lila.socket.Channel.Publish(
|
||||
// lila.socket.Socket.makeMessage("mlat", (h.sum / h.numberOfMeasurements / 1000000).toInt)
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
// })))
|
||||
}
|
||||
|
|
|
@ -20,12 +20,12 @@ private[round] final class Player(
|
|||
uciMemo: UciMemo) {
|
||||
|
||||
def human(play: HumanPlay, round: ActorRef)(pov: Pov): Fu[Events] = play match {
|
||||
case HumanPlay(playerId, uci, blur, lag, promiseOption) => pov match {
|
||||
case p@HumanPlay(playerId, uci, blur, lag, promiseOption) => pov match {
|
||||
case Pov(game, color) if game playableBy color =>
|
||||
lila.mon.measure(_.round.move.segment.logic)(applyUci(game, uci, blur, lag)).prefixFailuresWith(s"$pov ")
|
||||
p.trace.segmentSync("applyUci", "logic")(applyUci(game, uci, blur, lag)).prefixFailuresWith(s"$pov ")
|
||||
.fold(errs => fufail(ClientError(errs.shows)), fuccess).flatMap {
|
||||
case (progress, moveOrDrop) =>
|
||||
(GameRepo save progress).mon(_.round.move.segment.save) >>-
|
||||
p.trace.segment("save", "db")(GameRepo save progress) >>-
|
||||
(pov.game.hasAi ! uciMemo.add(pov.game, moveOrDrop)) >>-
|
||||
notifyMove(moveOrDrop, progress.game) >>
|
||||
progress.game.finished.fold(
|
||||
|
|
|
@ -25,7 +25,6 @@ private[round] final class Round(
|
|||
drawer: Drawer,
|
||||
forecastApi: ForecastApi,
|
||||
socketHub: ActorRef,
|
||||
monitorMove: Option[Long] => Unit,
|
||||
moretimeDuration: Duration,
|
||||
activeTtl: Duration) extends SequentialActor {
|
||||
|
||||
|
@ -56,19 +55,21 @@ private[round] final class Round(
|
|||
}
|
||||
|
||||
case p: HumanPlay =>
|
||||
lila.mon.since(_.round.move.segment.queue)(p.atNanos)
|
||||
handleHumanPlay(p.playerId) { pov =>
|
||||
handleHumanPlay(p) { pov =>
|
||||
if (pov.game outoftime lags.get) outOfTime(pov.game)
|
||||
else {
|
||||
lags.set(pov.color, p.lag.toMillis.toInt)
|
||||
reportNetworkLag(pov)
|
||||
player.human(p, self)(pov)
|
||||
}
|
||||
} >>- monitorMove((nowNanos - p.atNanos).some)
|
||||
} >>- {
|
||||
p.trace.finish()
|
||||
lila.mon.round.move.full.count()
|
||||
}
|
||||
|
||||
case FishnetPlay(uci, currentFen) => handle { game =>
|
||||
player.fishnet(game, uci, currentFen)
|
||||
} >>- monitorMove(none)
|
||||
} >>- lila.mon.round.move.full.count()
|
||||
|
||||
case Abort(playerId) => handle(playerId) { pov =>
|
||||
pov.game.abortable ?? finisher.abort(pov)
|
||||
|
@ -215,8 +216,12 @@ private[round] final class Round(
|
|||
protected def handle(playerId: String)(op: Pov => Fu[Events]): Funit =
|
||||
handlePov((GameRepo pov PlayerRef(gameId, playerId)))(op)
|
||||
|
||||
protected def handleHumanPlay(playerId: String)(op: Pov => Fu[Events]): Funit =
|
||||
handlePov((GameRepo pov PlayerRef(gameId, playerId)).mon(_.round.move.segment.fetch))(op)
|
||||
protected def handleHumanPlay(p: HumanPlay)(op: Pov => Fu[Events]): Funit =
|
||||
handlePov {
|
||||
p.trace.segment("fetch", "db") {
|
||||
GameRepo pov PlayerRef(gameId, p.playerId)
|
||||
}
|
||||
}(op)
|
||||
|
||||
protected def handle(color: Color)(op: Pov => Fu[Events]): Funit =
|
||||
handlePov(GameRepo pov PovRef(gameId, color))(op)
|
||||
|
|
|
@ -50,9 +50,7 @@ private[round] final class SocketHandler(
|
|||
promise.future onFailure {
|
||||
case _: Exception => socket ! Resync(uid)
|
||||
}
|
||||
send(HumanPlay(
|
||||
playerId, move, blur, lag.millis, promise.some
|
||||
))
|
||||
send(HumanPlay(playerId, move, blur, lag.millis, promise.some))
|
||||
member push ackEvent
|
||||
}
|
||||
case ("drop", o) => parseDrop(o) foreach {
|
||||
|
@ -62,9 +60,7 @@ private[round] final class SocketHandler(
|
|||
promise.future onFailure {
|
||||
case _: Exception => socket ! Resync(uid)
|
||||
}
|
||||
send(HumanPlay(
|
||||
playerId, drop, blur, lag.millis, promise.some
|
||||
))
|
||||
send(HumanPlay(playerId, drop, blur, lag.millis, promise.some))
|
||||
}
|
||||
case ("rematch-yes", _) => send(RematchYes(playerId))
|
||||
case ("rematch-no", _) => send(RematchNo(playerId))
|
||||
|
|
|
@ -97,7 +97,7 @@ case class HumanPlay(
|
|||
lag: FiniteDuration,
|
||||
promise: Option[Promise[Unit]] = None) {
|
||||
|
||||
val atNanos = nowNanos
|
||||
val trace = lila.mon.round.move.trace.create
|
||||
}
|
||||
|
||||
case class PlayResult(events: Events, fen: String, lastMove: Option[String])
|
||||
|
|
Loading…
Reference in New Issue