java - Kafka 主题不断添加到 DLT 主题中
问题描述
3 次重试后,我想将主题发送到 .DLT 主题并发送通知。
但它会继续发送通知并继续添加到.DLT。到目前为止,我已将超过 45 条记录添加到 DLT 和邮件中。一旦我删除了 DLT 主题中的所有消息,它就会停止。
我得到以下异常:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.test.sendPayload(java.lang.String) throws java.lang.Exception' threw exception; nested exception is java.lang.NullPointerException
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:302)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$FastClassBySpringCGLIB$$a98718f8.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.cloud.sleuth.instrument.messaging.MessageListenerMethodInterceptor.invoke(TraceMessagingAutoConfiguration.java:283)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter$$EnhancerBySpringCGLIB$$f33cc4d.onMessage(<generated>)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1256)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1700(KafkaMessageListenerContainer.java:386)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$4.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1147)
at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1137)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1115)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException: null
at com.test.AbstractStateService.getState(AbstractStateService.java:177)
at com.test.AbstractStateService.getStateFromDatabase(AbstractStateService.java:120)
at com.test.AbstractStateService.getStateForm(AbstractStateService.java:97)
at com.test.AsyncListener.sendPayloadRequest(FormAsyncListener.java:51)
at com.test.AsyncListener.sendPayload(FormAsyncListener.java:39)
at com.test.AsyncListener$$FastClassBySpringCGLIB$$114d56fd.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at com.test.AsyncListener$$EnhancerBySpringCGLIB$$1ea3676f.process(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
... 30 common frames omitted
@Configuration
@EnableKafka
public class KafkaConsumer {
private String offset = "earliest";
@Autowired
KafkaTemplate<Object, Object> template;
@Bean
public RetryTemplate retryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(3);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(300000);
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, String> chainedTM) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(1000);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 1));
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, String> chainedTM) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
factory.setStatefulRetry(true);
DeadLetterCustomHandler recoverer = recoverer(template);
factory.setRecoveryCallback(context -> {
recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
(Exception) context.getLastThrowable());
return null;
});
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return props;
}
@Bean
public KafkaConsumer<String, String> kafkaConsumerStr() {
return new KafkaConsumer<>(consumerFactory().getConfigurationProperties());
}
@Bean
public DeadLetterCustomHandler recoverer(KafkaTemplate<Object, Object> template) {
return new DeadLetterCustomHandler(template);
}
}
@KafkaListener(id = TOPIC_FORM, topics = TOPIC_FORM)
@Transactional(readOnly = false)
public void sendPayload(@Payload String message) throws Exception {
log.info("message " + message);
throw new NullPointerException();
}
解决方案
推荐阅读
- reactjs - 在组件中记住获取的结果
- c# - 当我从另一个用户运行我的 ASP.NET Core MVC 解决方案时,它没有连接到 SQL Server 数据库
- javascript - 无法连接用户
- google-cloud-platform - GCP:有 rtmp 并想取回 hls m3u8
- windows - 是否可以在服务器和多个客户端之间的命名管道中广播消息?
- html - z-index 因溢出而失败
- ruby-on-rails - Rails:外键 - 数据库表是什么样的?
- python - 为什么即使元素存在硒也会抛出元素未找到错误?
- javascript - 如何使用 JavaScript 以纳秒为单位获取 unix 时间戳?
- oracle-cloud-infrastructure - 使用 python SDK 将文件上传到对象存储