scala - 强制 alpakka kafka 消费者在反序列化错误时显示错误消息
问题描述
Alpakka kafka 消费者处理记录,直到遇到无法反序列化并静默死亡而不留下错误消息的记录。如何强制它报告错误信息?
解决方案
根据文件,
当流失败时,库内部将处理所有底层资源。
(反)序列化
如果从 Kafka 读取失败是由其他原因引起的,例如反序列化问题,那么该阶段将立即失败。如果您预计会出现这种情况,请考虑使用原始字节数组并在随后的映射阶段进行反序列化,您可以在其中使用监督来跳过失败的元素。
建议使用字节数组作为值(反)序列化消息,并在 Akka Stream 的映射操作中进行(反)序列化,而不是直接在 Kafka(反)序列化器中实现它。
Spray JSON示例(此示例使用恢复对无法正确解析的数据做出反应并忽略错误元素):
import spray.json._
final case class SampleData(name: String, value: Int)
object SampleDataSprayProtocol extends DefaultJsonProtocol {
implicit val sampleDataProtocol: RootJsonFormat[SampleData] = jsonFormat2(SampleData)
}
import SampleDataSprayProtocol._
val resumeOnParsingException = ActorAttributes.withSupervisionStrategy {
new akka.japi.function.Function[Throwable, Supervision.Directive] {
override def apply(t: Throwable): Supervision.Directive = t match {
case _: spray.json.JsonParser.ParsingException => Supervision.Resume
case _ => Supervision.stop
}
}
}
val consumer = Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic))
.map { consumerRecord =>
val value = consumerRecord.value()
val sampleData = value.parseJson.convertTo[SampleData]
sampleData
}
.withAttributes(resumeOnParsingException)
.toMat(Sink.seq)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
推荐阅读
- javascript - 从 MUI 中删除 LoadingButton?
- python - 对 1 或 0 python 的数据进行相关性分析的最佳方法
- android - 将 AIDL 升级到计费 4.0 会导致:此版本的应用程序未配置为通过 Google Play 计费
- elixir - 如何取回用于启动 DynamicSupervisor 孩子的初始参数?
- javascript - 设置状态为异步时如何立即将变量的值存储在状态中
- django - Django 丢失会话数据
- javascript - 将日期分配给日期选择器
- python - 通过两个回调更新 Dash DataTable:按钮单击和间隔
- javascript - 使用数组在字符串中查找特定单词 (JavaScript)
- c++ - 如何制作toggle child?