scala - 如何以惯用的方式将错误日志记录添加到我的 Akka 流中?
问题描述
我目前正在运行类似于以下内容的 Akka 流设置:
┌───────────────┐
┌─────────────┐ │┌─────────────┐│
│REST endpoint│──▶│Queue source ││
└─────────────┘ │└──────╷──────┘│
│┌──────▼──────┐│
││ Flow[T] ││
│└──────╷──────┘│
│┌──────▼──────┐│ ┌─────────────┐
││ KafkaSink │├─▶│ Kafka topic │
│└─────────────┘│ └─────────────┘
└───────────────┘
虽然这个工作很好,但我想对生产系统有一些了解,即是否存在错误以及什么样的错误。例如,我将 包装KafkaSink
成 aRestartSink.withBackoff
并将以下属性应用于包装的接收器:
private val decider: Supervision.Decider = {
case x =>
log.error("KafkaSink encountered an error and will stop", x)
Supervision.Stop
}
Flow[...]
.log("KafkaSink")
.to(Producer.plainSink(...))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.addAttributes(
ActorAttributes.logLevels(
onElement = Logging.DebugLevel,
onFinish = Logging.WarningLevel,
onFailure = Logging.ErrorLevel
)
)
这确实为我提供了一些见解,例如,我将收到一条下游已关闭的日志消息,以及通过supervisionStrategy
我添加的 发生的异常。
然而,这个解决方案感觉有点像一种变通方法(例如,将异常记录到监督策略中),而且它也没有提供任何对RestartWithBackoffSink
. 当然,我可以DEBUG
为该类启用级别日志记录,但同样,这感觉就像在生产中做的一种解决方法。
长话短说:
- 我试图深入了解 Akka 流中发生的错误的方式是否有任何明显的缺点
- 是否有更好/更惯用的方式在生产中向 Akka 流添加日志记录
解决方案
我想你快到了!!
实际上,它是文档中描述的方式。使用log()
方法可以让您更细粒度地控制流经流的元素的日志记录级别、流的完成和失败。虽然,我不喜欢在主管策略中添加日志消息。如果您确实想显示该特定异常,则创建一个自定义异常,在主管策略中捕获它并让 Akka 为您记录该消息。您可以debug-logging
在 Akka 流配置中启用,该配置默认off
用于在 DEBUG 日志级别进行额外的故障排除日志记录。除此之外,您还可以在参与者级别启用日志记录。(请参阅此文档)。
我认为在生产中,可能有两种记录错误的方法:
1) 在恢复阶段记录或重新抛出异常。这样上游的所有异常都将被捕获并记录:
object AkkaStreamRecap extends App {
implicit val system = ActorSystem("AkkaStreamsRecap")
implicit val materialiser = ActorMaterializer()
import system.dispatcher
val source = Source(-5 to 5)
val sink = Sink.foreach[Int](println)
val flow = Flow[Int].map(x => 1 / x)
val runnableGraph = source.
via(flow).
recover {
case e => throw e
}.
to(sink)
runnableGraph.run()
}
输出:
0
0
0
0
-1
[ERROR] [03/06/2020 16:27:58.703] [AkkaStreamsRecap-akka.actor.default-dispatcher-2] [akka://AkkaStreamsRecap/system/StreamSupervisor-0/flow-0-0-ignoreSink] Error in stage [Recover(<function1>)]: / by zero
java.lang.ArithmeticException: / by zero
at com.personal.akka.http.chapter1.AkkaStreamRecap$.$anonfun$flow$1(AkkaStreamRecap.scala:41)
at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:23)
at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:54)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:480)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:576)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:682)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:731)
at akka.actor.Actor.aroundPreStart(Actor.scala:550)
at akka.actor.Actor.aroundPreStart$(Actor.scala:550)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:671)
at akka.actor.ActorCell.create(ActorCell.scala:676)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
at akka.dispatch.Mailbox.run(Mailbox.scala:228)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2)定义自定义监督策略并在流属性或物化器设置中使用它:
object AkkaStreamRecap extends App {
implicit val system = ActorSystem("AkkaStreamsRecap")
private val decider: Supervision.Decider = {
case e: ArithmeticException =>
println("Arithmetic exception: Divide by Zero")
Supervision.Stop
}
implicit val materialiser = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
import system.dispatcher
val source = Source(-5 to 5)
val sink = Sink.foreach[Int](println)
val flow = Flow[Int].map(x => 1 / x)
val runnableGraph = source.via(flow).log("divide by zero").to(sink)
runnableGraph.run()
}
输出:
0
0
0
0
-1
Arithmetic exception: Divide by Zero
[ERROR] [03/06/2020 16:37:00.740] [AkkaStreamsRecap-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://AkkaStreamsRecap/system/StreamSupervisor-0)] [divide by zero] Upstream failed.
java.lang.ArithmeticException: / by zero
at com.personal.akka.http.chapter1.AkkaStreamRecap$.$anonfun$flow$1(AkkaStreamRecap.scala:26)
at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:23)
at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:54)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:480)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:576)
at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:682)
at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:731)
at akka.actor.Actor.aroundPreStart(Actor.scala:550)
at akka.actor.Actor.aroundPreStart$(Actor.scala:550)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:671)
at akka.actor.ActorCell.create(ActorCell.scala:676)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
at akka.dispatch.Mailbox.run(Mailbox.scala:228)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
让我知道它是否有帮助!
PS ..我在官方文档中找不到关于记录错误的其他方式的任何来源或方式。
推荐阅读
- java - 如何使用多个键和值在表中搜索?
- javascript - JS Module export import
- azure - Azure Kubernetes - Prometheus 自动服务发现?
- linux - 如何在 rapsbian 启动时自动执行命令
- selenium - 自动化测试 UI 窗口 Javascript
- python - 无法重新安装 Python
- spring-cloud - “DedupeResponseHeader”不适用于 Greenwich.SR3
- python - 时间脉冲 tkinter python
- mapbox-gl-js - Mapbox GL JS 中的圆形样式
- python - 键值列表列表到列表字典