首页 > 解决方案 > Kafka Streams 中的重试和 DLT 实现

问题描述

我正在将 Kafka Streams 集成到 Spring Boot 应用程序中。即使在提供的重试次数之后,如果处理器出现问题,我也想将数据发送到 DLT。

目前,我正在通过以下方式执行此操作:

Topology topology = builder.build();
stream = new KafkaStreams(topology, streamsConfiguration);

// Setup Error Handling
final MaxFailuresUncaughtExceptionHandler exceptionHandler =
        new MaxFailuresUncaughtExceptionHandler(Integer.parseInt(maxFailures), Long.parseLong(maxTimeInterval));
stream.setUncaughtExceptionHandler(exceptionHandler);
public class MaxFailuresUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
  private int currentFailureCount;

  @Override
  public StreamThreadExceptionResponse handle(final Throwable throwable) {
      currentFailureCount++;

      if (currentFailureCount >= maxFailures) {
        // Send record in DLT
      }
      return REPLACE_THREAD;
  }
}

我想实现但无法做到的几件事:

我想知道是否还有其他更好的方法来处理这个问题,这样我们就不需要手动将记录发送到 DLT。类似于我们可以从下面提到的普通 Kafka Consumer API 中实现的功能:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, Object> factory =
  new ConcurrentKafkaListenerContainerFactory();
  factory.setConsumerFactory(consumerFactory());
  factory.setConcurrency(consumerConfigVars.getConcurrency());
  
  factory.setErrorHandler(new SeekToCurrentErrorHandler(
  new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0L, 3L))); // dead-letter after 3 tries
  
  return factory;
}

标签: javaspring-bootapache-kafkaapache-kafka-streams

解决方案


推荐阅读