首页 > 解决方案 > 我从 alpakka kafka 消费时收到了死信

问题描述

[信息] [06/01/2020 05:05:53.947] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] 开始。StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] SLF4J:未找到 SLF4J 提供程序。SLF4J:默认为无操作 (NOP) 记录器实现 SLF4J:参见http://www.slf4j.org/codes.html#noProviders了解更多详情。[INFO] [06/01/2020 05:06:05.058] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] 完成 [INFO] [akkaDeadLetter][06 /01/2020 05:06:35.165] [default-akka.actor.default-dispatcher-24] [akka://default/system/kafka-consumer-1] 消息 [akka.kafka.internal.KafkaConsumerActor$Internal$从 Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] 到 Actor[akka://default/system/kafka-consumer-1#76265671] 的 StopFromStage] 未交付。[1] 遇到死信。如果这不是预期行为,则 Actor[akka://default/system/kafka-consumer-1#76265671] 可能已意外终止。可以使用配置设置“akka.log-dead-letters”和“akka.log-dead-letters-during-shutdown”关闭或调整此日志记录。

这是我面临的问题,但仅当我使用带有 Kafka 源的流时。

这是我的代码

val bookData: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] = KafkaSource.createSource(TOPIC)
  val parseBook: Flow[ConsumerRecord[Array[Byte], String], Book, NotUsed] = Flow[ConsumerRecord[Array[Byte], String]].map { message =>
    Json.parse(message.value).as[Book]
  }
  val flow: Flow[Book, Book, NotUsed] = Flow[Book].mapAsync(4)(book => toInstant(book))

  def toInstant(book: Book): Future[Book] = Future{
    val array = book.publicationDate.split("-")
    val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
    val timeTillNow = Time.timeSince(instant)
    val royalty = if (timeTillNow.toDays > 1000) {
      .10 * book.price * book.copiesSold
    } else {
      .15 * book.price * book.copiesSold
    }
    book.copy(royalty = Some(royalty))
  }
 val print: Sink[Any, Future[Done]] = Sink.foreach(element => println(element))

在运行以下时,我收到了死信, bookData.via(parseBook).via(flow).runWith(print) 但在运行时没有死信:bookData.via(parseBook).runWith(print)。即使在运行后一个时,我也得到了预期的输出。

有人可以帮忙吗?

标签: scalaapache-kafkaakka

解决方案


这很可能发生,因为流程正在引发异常。通常,我希望看到更多详细信息的日志记录,但这些可能会丢失,因为没有配置 SLF4J 实现。有关如何配置日志记录的详细信息,请参阅Akka 文档中的“SLF4J 后端” 。

查看您的代码,我认为错误最有可能出现在这一行:

val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)

请注意,它array(2).toInt同时用于日和月。我的猜测是这个月应该是Month.of(array(1).toInt). 如果array(2)确实是一个月中的某一天,那么任何高于 12 的值都将导致Month.of抛出 a DateTimeException,这将终止流。


推荐阅读