apache-kafka-streams - 双重警告记录来自 DeserializationExceptionHandler 和 RecordDeserializer 的反序列化失败
问题描述
我有一个用例覆盖LogAndContinueExceptionHandler。
@Override
public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record,
Exception exception) {
.....
log.error(....);
}
在控制台上的日志中,我看到2 个WARN 日志,一个来自LogAndContinueExceptionHandler,另一个来自RecordDeserializer,在我的情况下会导致双重消息。
class RecordDeserializer {
....
ConsumerRecord<Object, Object> deserialize(ProcessorContext processorContext,
ConsumerRecord<byte[], byte[]> rawRecord) {
try {
return new ConsumerRecord(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(),
rawRecord.timestamp(), TimestampType.CREATE_TIME, rawRecord.checksum(),
rawRecord.serializedKeySize(), rawRecord.serializedValueSize(),
this.sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), (byte[]) rawRecord.key()),
this.sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(),
(byte[]) rawRecord.value()),
rawRecord.headers());
} catch (Exception var7) {
Exception deserializationException = var7;
DeserializationHandlerResponse response;
try {
response = this.deserializationExceptionHandler.handle(processorContext, rawRecord,
deserializationException);
} catch (Exception var6) {
this.log.error("Deserialization error callback failed after deserialization error for record {}",
rawRecord, var7);
throw new StreamsException("Fatal user code error in deserialization error callback", var6);
}
if (response == DeserializationHandlerResponse.FAIL) {
throw new StreamsException(
"Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.",
var7);
} else {
如果 DeserializationHandlerResponse 已经在记录它,为什么还需要在这里再次记录它!
new Object[]{rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), var7});
this.droppedRecordsSensor.record();
return null;
}
}
}
以上导致相同的消息从 DeserializationExceptionHandler 进入日志,然后再次从 RecordDeserializer 进入日志。这不能避免吗?
解决方案
推荐阅读
- django-views - 将 html 参数传递给我的视图,然后在我的 object.get() 中使用它
- google-chrome - 为什么在 Azure CDN 的 https 下提供 mp3 文件时会跳过,但在 Chrome 中从 http 提供时却不会?
- powerbi - 在 PowerBI 中将持续时间从天转换为时间
- javascript - 想用昨天日期的过去日期更新日期列中的日期
- android - 无法使用 android 中的 facebook 登录将 facebook 电子邮件的值发送到服务器
- internet-explorer - 当我们尝试使用 wix 构建 .msi 时如何省略组件
- java - 无法从 JAVA 中的重定向 url 检索 PayPal 成功/取消付款数据
- r - R - 用 ggplot2 绘制数据框的通用函数(多列)
- python - 如何在同时包含 C++ 和 Python 的项目中使用 doxygen(多编程语言项目)
- amazon-web-services - 我应该如何架构 aws lambda 以支持批处理模型中的并行进程?