首页 > 解决方案 > 强制 alpakka kafka 消费者在反序列化错误时显示错误消息

问题描述

Alpakka kafka 消费者处理记录,直到遇到无法反序列化并静默死亡而不留下错误消息的记录。如何强制它报告错误信息?

标签: scalaapache-kafkakafka-consumer-apialpakka

解决方案


根据文件

当流失败时,库内部将处理所有底层资源。

(反)序列化

如果从 Kafka 读取失败是由其他原因引起的,例如反序列化问题,那么该阶段将立即失败。如果您预计会出现这种情况,请考虑使用原始字节数组并在随后的映射阶段进行反序列化,您可以在其中使用监督来跳过失败的元素。

建议使用字节数组作为值(反)序列化消息,并在 Akka Stream 的映射操作中进行(反)序列化,而不是直接在 Kafka(反)序列化器中实现它。

Spray JSON示例(此示例使用恢复对无法正确解析的数据做出反应并忽略错误元素):

import spray.json._

final case class SampleData(name: String, value: Int)

object SampleDataSprayProtocol extends DefaultJsonProtocol {
  implicit val sampleDataProtocol: RootJsonFormat[SampleData] = jsonFormat2(SampleData)
}

import SampleDataSprayProtocol._

    val resumeOnParsingException = ActorAttributes.withSupervisionStrategy {
      new akka.japi.function.Function[Throwable, Supervision.Directive] {
        override def apply(t: Throwable): Supervision.Directive = t match {
          case _: spray.json.JsonParser.ParsingException => Supervision.Resume
          case _ => Supervision.stop
        }
      }
    }

    val consumer = Consumer
      .plainSource(consumerSettings, Subscriptions.topics(topic))
      .map { consumerRecord =>
        val value = consumerRecord.value()
        val sampleData = value.parseJson.convertTo[SampleData]
        sampleData
      }
      .withAttributes(resumeOnParsingException)
      .toMat(Sink.seq)(Keep.both)
      .mapMaterializedValue(DrainingControl.apply)
      .run()

推荐阅读