首页 > 解决方案 > Kafka 主题不断添加到 DLT 主题中

问题描述

3 次重试后,我想将主题发送到 .DLT 主题并发送通知。

但它会继续发送通知并继续添加到.DLT。到目前为止,我已将超过 45 条记录添加到 DLT 和邮件中。一旦我删除了 DLT 主题中的所有消息,它就会停止。

我得到以下异常:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.test.sendPayload(java.lang.String) throws java.lang.Exception' threw exception; nested exception is java.lang.NullPointerException
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:302)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$FastClassBySpringCGLIB$$a98718f8.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.cloud.sleuth.instrument.messaging.MessageListenerMethodInterceptor.invoke(TraceMessagingAutoConfiguration.java:283)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$EnhancerBySpringCGLIB$$f33cc4d.onMessage(<generated>)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1256)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer.java:386)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1147)
    at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1137)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1115)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException: null
at com.test.AbstractStateService.getState(AbstractStateService.java:177)
    at com.test.AbstractStateService.getStateFromDatabase(AbstractStateService.java:120)
    at com.test.AbstractStateService.getStateForm(AbstractStateService.java:97)
    at com.test.AsyncListener.sendPayloadRequest(FormAsyncListener.java:51)
    at com.test.AsyncListener.sendPayload(FormAsyncListener.java:39)
    at com.test.AsyncListener$$FastClassBySpringCGLIB$$114d56fd.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at com.test.AsyncListener$$EnhancerBySpringCGLIB$$1ea3676f.process(<generated>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
    ... 30 common frames omitted
@Configuration
@EnableKafka
public class KafkaConsumer {

private String offset = "earliest";

@Autowired
KafkaTemplate<Object, Object> template;

@Bean
public RetryTemplate retryTemplate(){
    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
    simpleRetryPolicy.setMaxAttempts(3);
    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(300000);
    retryTemplate.setRetryPolicy(retryPolicy());
    retryTemplate.setBackOffPolicy(backOffPolicy());
    return retryTemplate;
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, String> chainedTM) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(1000);
    factory.getContainerProperties().setSyncCommits(true);
    factory.setRetryTemplate(retryTemplate());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setTransactionManager(chainedTM);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 1));
    return factory;
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, String> chainedTM) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(concurrency);
    factory.getContainerProperties().setPollTimeout(pollTimeout);
    factory.getContainerProperties().setSyncCommits(true);
    factory.setRetryTemplate(retryTemplate());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setTransactionManager(chainedTM);
    factory.setStatefulRetry(true);
    DeadLetterCustomHandler recoverer = recoverer(template);
    factory.setRecoveryCallback(context -> {
        recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
                (Exception) context.getLastThrowable());
        return null;
    });
    return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new ConcurrentHashMap<String, Object>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    return props;
}

@Bean
public KafkaConsumer<String, String> kafkaConsumerStr() {
    return new KafkaConsumer<>(consumerFactory().getConfigurationProperties());
}

@Bean
public DeadLetterCustomHandler recoverer(KafkaTemplate<Object, Object> template) {
    return new DeadLetterCustomHandler(template);
}
}

@KafkaListener(id = TOPIC_FORM, topics = TOPIC_FORM)
@Transactional(readOnly = false)
public void sendPayload(@Payload String message) throws Exception {
    log.info("message " + message);
    throw new NullPointerException();
}

标签: javaapache-kafkakafka-consumer-apispring-kafka

解决方案


推荐阅读