首页 > 解决方案 > 双重警告记录来自 DeserializationExceptionHandler 和 RecordDeserializer 的反序列化失败

问题描述

我有一个用例覆盖LogAndContinueExceptionHandler

@Override
public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record,
            Exception exception) {
.....
log.error(....);
}

在控制台上的日志中,我看到2 个WARN 日志,一个来自LogAndContinueExceptionHandler,另一个来自RecordDeserializer,在我的情况下会导致双重消息。

class RecordDeserializer {
....
ConsumerRecord<Object, Object> deserialize(ProcessorContext processorContext,
            ConsumerRecord<byte[], byte[]> rawRecord) {
        try {
            return new ConsumerRecord(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(),
                    rawRecord.timestamp(), TimestampType.CREATE_TIME, rawRecord.checksum(),
                    rawRecord.serializedKeySize(), rawRecord.serializedValueSize(),
                    this.sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), (byte[]) rawRecord.key()),
                    this.sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(),
                            (byte[]) rawRecord.value()),
                    rawRecord.headers());
        } catch (Exception var7) {
            Exception deserializationException = var7;

            DeserializationHandlerResponse response;
            try {
                response = this.deserializationExceptionHandler.handle(processorContext, rawRecord,
                        deserializationException);
            } catch (Exception var6) {
                this.log.error("Deserialization error callback failed after deserialization error for record {}",
                        rawRecord, var7);
                throw new StreamsException("Fatal user code error in deserialization error callback", var6);
            }

            if (response == DeserializationHandlerResponse.FAIL) {
                throw new StreamsException(
                        "Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.",
                        var7);
            } else {

如果 DeserializationHandlerResponse 已经在记录它,为什么还需要在这里再次记录它!

                        new Object[]{rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), var7});
                this.droppedRecordsSensor.record();
                return null;
            }
        }
}

以上导致相同的消息从 DeserializationExceptionHandler 进入日志,然后再次从 RecordDeserializer 进入日志。这不能避免吗?

标签: apache-kafka-streams

解决方案


推荐阅读