首页 > 解决方案 > 处理 Spring Kafka 中的异常

问题描述

我正在使用 spring-kafka 2.2.6。我使用过 SeekToCurrentErrorHandler 和 ErrorHandlingDeserializer2。SeekToCurrentErrorHandler 当前配置为在重试三次后记录消息。有没有办法跳过验证错误(由 Spring 中的 Validator 实现捕获)和消息转换错误的重试?所有错误都被容器错误处理程序(即SeeToCurrentErrorHandler)拦截。我应该重写 SeeToCurrentErrorHandler 的句柄方法吗?

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(this.kafkaConfigProperties.getConsumerConcurrency());
    factory.setAutoStartup(false);
    factory.setErrorHandler(new SeekToCurrentErrorHandler((c, e) -> {
        LOG.info(e.getMessage());
    }, this.kafkaConfigProperties.getRetryCount()));
    return factory;
}

 @Bean
public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> map = new HashMap<>();
    Properties consumerProperties = getConsumerProperties();
    consumerProperties.forEach((key, value) -> {
        map.put((String) key, value);
    });
    KafkaSoapMessageConverter kafkaSoapMessageConverter = new KafkaSoapMessageConverter();
    Map<String, Object> configMap = new HashMap<>(1);
    configMap.put(KafkaSoapMessageConverter.CLASS_TO_DESERIALIZE, MyClass.class);
    kafkaSoapMessageConverter.configure(configMap, false);
    ErrorHandlingDeserializer2<Object> errorHandlingDeserializer = new ErrorHandlingDeserializer2<>(
            kafkaSoapMessageConverter);
    DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(map);
    consumerFactory.setValueDeserializer(errorHandlingDeserializer);
    return consumerFactory;
}

编辑

我使用了下面的代码

if(DeserializationException.class == e.getClass() 
        || e.getCause().getClass() == MethodArgumentNotValidException.class) {
    SeekUtils.doSeeks(records, consumer, e, true, (c, e) -> { return true; }, LOG); 
} else {
    super.handle(e, records, consumer, container);
}

标签: javaspring-bootapache-kafkaspring-kafka

解决方案


2.3 版(当前为 2.3.5)添加了配置哪些异常可重试的功能:

/**
 * Set an exception classifications to determine whether the exception should cause a retry
 * (until exhaustion) or not. If not, we go straight to the recoverer. By default,
 * the following exceptions will not be retried:
 * <ul>
 * <li>{@link DeserializationException}</li>
 * <li>{@link MessageConversionException}</li>
 * <li>{@link MethodArgumentResolutionException}</li>
 * <li>{@link NoSuchMethodException}</li>
 * <li>{@link ClassCastException}</li>
 * </ul>
 * All others will be retried.
 * When calling this method, the defaults will not be applied.
 * @param classifications the classifications.
 * @param defaultValue whether or not to retry non-matching exceptions.
 * @see BinaryExceptionClassifier#BinaryExceptionClassifier(Map, boolean)
 */
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {

这是设置默认值的方式:

    Map<Class<? extends Throwable>, Boolean> classified = new HashMap<>();
    classified.put(DeserializationException.class, false);
    classified.put(MessageConversionException.class, false);
    classified.put(MethodArgumentResolutionException.class, false);
    classified.put(NoSuchMethodException.class, false);
    classified.put(ClassCastException.class, false);

此外,您可以向默认值添加例外:

/**
 * Add an exception type to the default list; if and only if an external classifier
 * has not been provided. By default, the following exceptions will not be retried:
 * <ul>
 * <li>{@link DeserializationException}</li>
 * <li>{@link MessageConversionException}</li>
 * <li>{@link MethodArgumentResolutionException}</li>
 * <li>{@link NoSuchMethodException}</li>
 * <li>{@link ClassCastException}</li>
 * </ul>
 * All others will be retried.
 * @param exceptionType the exception type.
 * @see #removeNotRetryableException(Class)
 * @see #setClassifications(Map, boolean)
 */
public void addNotRetryableException(Class<? extends Exception> exceptionType) {
    Assert.isTrue(this.classifier instanceof ExtendedBinaryExceptionClassifier,
            "Cannot add exception types to a supplied classifier");
    ((ExtendedBinaryExceptionClassifier) this.classifier).getClassified().put(exceptionType, false);
}

推荐阅读