首页 > 解决方案 > Spring-Kafka 使用 DeadLetterPublishingRecoverer 将自定义记录而不是失败记录发送到 DLT

问题描述

我正在使用 DeadLetterPublishingRecoverer 将失败的记录自动发送到 DLT。我正在尝试向 DLT 发送自定义记录而不是失败记录。是否有可能做到这一点。请帮我配置一下。我的 DeadLetterPublishingRecoverer 配置如下。

@Bean
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<String, byte[]> byteArrayTemplate) {
    return new DeadLetterPublishingRecoverer([
            (byte[].class)                           : byteArrayTemplate],)

}

标签: spring-bootspring-kafka

解决方案


创建一个子类DeadLetterPublishingRecoverer并覆盖该createProducerRecord()方法。

/**
 * Subclasses can override this method to customize the producer record to send to the
 * DLQ. The default implementation simply copies the key and value from the consumer
 * record and adds the headers. The timestamp is not set (the original timestamp is in
 * one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
 * less than 0, it must be set to null in the {@link ProducerRecord}.
 * @param record the failed record
 * @param topicPartition the {@link TopicPartition} returned by the destination
 * resolver.
 * @param headers the headers - original record headers plus DLT headers.
 * @param data the value to use instead of the consumer record value.
 * @param isKey true if key deserialization failed.
 * @return the producer record to send.
 * @see KafkaHeaders
 */
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,

        TopicPartition topicPartition, Headers headers, @Nullable byte[] data, boolean isKey) {

在即将发布的 2.7 版本中,这更改为

/**
 * Subclasses can override this method to customize the producer record to send to the
 * DLQ. The default implementation simply copies the key and value from the consumer
 * record and adds the headers. The timestamp is not set (the original timestamp is in
 * one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
 * less than 0, it must be set to null in the {@link ProducerRecord}.
 * @param record the failed record
 * @param topicPartition the {@link TopicPartition} returned by the destination
 * resolver.
 * @param headers the headers - original record headers plus DLT headers.
 * @param key the key to use instead of the consumer record key.
 * @param value the value to use instead of the consumer record value.
 * @return the producer record to send.
 * @see KafkaHeaders
 */
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
        TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {

推荐阅读