scala - 我从 alpakka kafka 消费时收到了死信
问题描述
[信息] [06/01/2020 05:05:53.947] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] 开始。StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] SLF4J:未找到 SLF4J 提供程序。SLF4J:默认为无操作 (NOP) 记录器实现 SLF4J:参见http://www.slf4j.org/codes.html#noProviders了解更多详情。[INFO] [06/01/2020 05:06:05.058] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] 完成 [INFO] [akkaDeadLetter][06 /01/2020 05:06:35.165] [default-akka.actor.default-dispatcher-24] [akka://default/system/kafka-consumer-1] 消息 [akka.kafka.internal.KafkaConsumerActor$Internal$从 Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] 到 Actor[akka://default/system/kafka-consumer-1#76265671] 的 StopFromStage] 未交付。[1] 遇到死信。如果这不是预期行为,则 Actor[akka://default/system/kafka-consumer-1#76265671] 可能已意外终止。可以使用配置设置“akka.log-dead-letters”和“akka.log-dead-letters-during-shutdown”关闭或调整此日志记录。
这是我面临的问题,但仅当我使用带有 Kafka 源的流时。
这是我的代码
val bookData: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] = KafkaSource.createSource(TOPIC)
val parseBook: Flow[ConsumerRecord[Array[Byte], String], Book, NotUsed] = Flow[ConsumerRecord[Array[Byte], String]].map { message =>
Json.parse(message.value).as[Book]
}
val flow: Flow[Book, Book, NotUsed] = Flow[Book].mapAsync(4)(book => toInstant(book))
def toInstant(book: Book): Future[Book] = Future{
val array = book.publicationDate.split("-")
val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
val timeTillNow = Time.timeSince(instant)
val royalty = if (timeTillNow.toDays > 1000) {
.10 * book.price * book.copiesSold
} else {
.15 * book.price * book.copiesSold
}
book.copy(royalty = Some(royalty))
}
val print: Sink[Any, Future[Done]] = Sink.foreach(element => println(element))
在运行以下时,我收到了死信,
bookData.via(parseBook).via(flow).runWith(print)
但在运行时没有死信:bookData.via(parseBook).runWith(print)
。即使在运行后一个时,我也得到了预期的输出。
有人可以帮忙吗?
解决方案
这很可能发生,因为流程正在引发异常。通常,我希望看到更多详细信息的日志记录,但这些可能会丢失,因为没有配置 SLF4J 实现。有关如何配置日志记录的详细信息,请参阅Akka 文档中的“SLF4J 后端” 。
查看您的代码,我认为错误最有可能出现在这一行:
val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
请注意,它array(2).toInt
同时用于日和月。我的猜测是这个月应该是Month.of(array(1).toInt)
. 如果array(2)
确实是一个月中的某一天,那么任何高于 12 的值都将导致Month.of
抛出 a DateTimeException
,这将终止流。
推荐阅读
- c# - 如何将 GML 字符串转换为 XML?
- java - 使用 Gson 反序列化 LocalDateTime 不起作用
- docker - 路由广播 UDP 进/出 Docker for Mac 容器
- python - Django - 扩展父级并仅更改子类字段名的 Pythonic 方式?
- database - 如何以编程方式从 InDesign 中制作的预设计模板创建 PDF
- ios - 桌面软件如何在 iPhone 上欺骗 GPS 位置
- firebase - RNFetchBlob 无法上传图片
- bash - 在 bash 函数中使用别名
- javascript - 如何使用 lodash 比较两个数组(顺序很重要)
- json - 使用 ceil 运算符的 LogicApp 液体变换