首页 > 解决方案 > 在不崩溃 Kafka 流处理器应用程序的情况下处理 json 解析错误

问题描述

我有一个 kafka 流应用程序,它映射/转换 json 消息并将输出流式传输到一个主题。

KStream<String, String> logMessageStream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
logMessageStream.map((k, v) -> { //Map record 
                try { // Map record to (requestId, message)
                    // readValue throws IOException, JsonParseException, JsonMappingException
                    LogMessage logMessage = objectMapper.readValue(v, LogMessage.class);
                    return new KeyValue<>(logMessage.requestId(), logMessage);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return null; // <== RETURNS null due to caught exception
}).toStream().to(outoutTopic)

现在,如果输入记录 json 包含无效语法,我将收到解析错误,流应用程序崩溃:

java.lang.NullPointerException
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
    ....

我想在映射时使用此错误并继续处理其他消息。有没有我可以设置的处理程序来消费异常。寻找建议。

谢谢..

标签: javaapache-kafkaapache-kafka-streams

解决方案


您还可以利用该StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG属性,详见https://docs.confluent.io/current/streams/faq.html#handling-corrupted-records-and-deserialization-errors-poison-pill-records

Properties streamsSettings = new Properties();
streamsSettings.put(
  StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
  LogAndContinueExceptionHandler.class.getName()
);

推荐阅读