java - Kafka Streams 中的重试和 DLT 实现
问题描述
我正在将 Kafka Streams 集成到 Spring Boot 应用程序中。即使在提供的重试次数之后,如果处理器出现问题,我也想将数据发送到 DLT。
目前,我正在通过以下方式执行此操作:
Topology topology = builder.build();
stream = new KafkaStreams(topology, streamsConfiguration);
// Setup Error Handling
final MaxFailuresUncaughtExceptionHandler exceptionHandler =
new MaxFailuresUncaughtExceptionHandler(Integer.parseInt(maxFailures), Long.parseLong(maxTimeInterval));
stream.setUncaughtExceptionHandler(exceptionHandler);
public class MaxFailuresUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
private int currentFailureCount;
@Override
public StreamThreadExceptionResponse handle(final Throwable throwable) {
currentFailureCount++;
if (currentFailureCount >= maxFailures) {
// Send record in DLT
}
return REPLACE_THREAD;
}
}
我想实现但无法做到的几件事:
- 无法获取当前处理器的主题导致失败
- 记录导致失败的原因
我想知道是否还有其他更好的方法来处理这个问题,这样我们就不需要手动将记录发送到 DLT。类似于我们可以从下面提到的普通 Kafka Consumer API 中实现的功能:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(consumerConfigVars.getConcurrency());
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0L, 3L))); // dead-letter after 3 tries
return factory;
}
解决方案
推荐阅读
- sql - SQL Server:在左连接查询的执行计划中插入隐藏的“排序”
- mysql - 我们如何使用 GROUP BY 加入结果
- javascript - 为什么我在 React 应用程序的 HTML 输出中看到传递的道具?
- typescript - 如何在打字稿中使用 localStorage?
- css - Safari 位置的解决方法:粘性 (-webkit-sticky) 错误
- asp.net-mvc - 在 MVC 中为一个视图使用两种模型,一种用于获取数据,另一种用于向控制器发送数据
- json - Json解析没有得到第二个花括号内的值
- ubuntu - 在 ubuntu 中将 .p12 转换为 .pem
- django - 使用 django 和 nginx 将数据库从 sqlite 更改为 postgres 时出现 500 内部服务器错误
- php - 当我加载网站时如何通过此错误消息