spring-boot - Spring-Kafka 使用 DeadLetterPublishingRecoverer 将自定义记录而不是失败记录发送到 DLT
问题描述
我正在使用 DeadLetterPublishingRecoverer 将失败的记录自动发送到 DLT。我正在尝试向 DLT 发送自定义记录而不是失败记录。是否有可能做到这一点。请帮我配置一下。我的 DeadLetterPublishingRecoverer 配置如下。
@Bean
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<String, byte[]> byteArrayTemplate) {
return new DeadLetterPublishingRecoverer([
(byte[].class) : byteArrayTemplate],)
}
解决方案
创建一个子类DeadLetterPublishingRecoverer
并覆盖该createProducerRecord()
方法。
/**
* Subclasses can override this method to customize the producer record to send to the
* DLQ. The default implementation simply copies the key and value from the consumer
* record and adds the headers. The timestamp is not set (the original timestamp is in
* one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
* less than 0, it must be set to null in the {@link ProducerRecord}.
* @param record the failed record
* @param topicPartition the {@link TopicPartition} returned by the destination
* resolver.
* @param headers the headers - original record headers plus DLT headers.
* @param data the value to use instead of the consumer record value.
* @param isKey true if key deserialization failed.
* @return the producer record to send.
* @see KafkaHeaders
*/
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
TopicPartition topicPartition, Headers headers, @Nullable byte[] data, boolean isKey) {
在即将发布的 2.7 版本中,这更改为
/**
* Subclasses can override this method to customize the producer record to send to the
* DLQ. The default implementation simply copies the key and value from the consumer
* record and adds the headers. The timestamp is not set (the original timestamp is in
* one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
* less than 0, it must be set to null in the {@link ProducerRecord}.
* @param record the failed record
* @param topicPartition the {@link TopicPartition} returned by the destination
* resolver.
* @param headers the headers - original record headers plus DLT headers.
* @param key the key to use instead of the consumer record key.
* @param value the value to use instead of the consumer record value.
* @return the producer record to send.
* @see KafkaHeaders
*/
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) {
推荐阅读
- kubernetes - Splunk 转发器与 Kubernetes 的侧车模式
- c# - 使用 C# 和反射对类的属性进行单元测试
- javascript - 始终显示垫选择滚动条(Angular 5)
- clearcase - clearcase merge with beyond compare
- php - (PHP) 如何正确地将数组列表连接成 file_put_contents 的字符串?
- python - XAMPP 和 PyMySQL 未正确集成
- azure - 使用 Azure 逻辑应用访问 Always Encrypted 数据
- excel - 如何在每日列中分割两个日期之间的时间
- qt - Qt:在布局上设置渐变
- android - 迁移到 AndroidX - android.support.FILE_PROVIDER_PATHS 位置