spring-boot - 具有事务、多线程的 Spring-Kafka
问题描述
我遇到了一个应用程序消费消息并产生消息作为对消费消息的响应的案例。这是使用 kafka 事务完成的,但该应用程序还有一个定期发送 Kafka 消息的计划作业(也使用事务,因为它发送到两个主题)。
当计划的作业开始发送时,我收到此异常:
org.apache.kafka.common.KafkaException: TransactionalId aura-transaction-1: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
有谁知道可能是什么原因?我正在考虑尝试使用不同的 kafkaTemplates(+生产者工厂)来查看是否可以解决问题。从那时起,我可以为计划的作业分配一个新的事务 ID 前缀。目前他们有相同的。
消费者使用一个基本的@KafkaListener,它已经在来自 KafkaMessageListenerContainer 的事务中注册。然后它使用 KafkaTemplate.send(Object) 生成一条消息。
计划的作业使用 KafkaTemplate.executeInTransaction 功能并发送到两个主题。
版本:Spring Boot 2.1.1 Spring Kafka:2.2.2
堆栈跟踪:
org.apache.kafka.common.KafkaException: TransactionalId person-identhendelse-lager-1.privat-person-fregIdenthendelse-v1.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758)
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216)
at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:459)
at org.springframework.kafka.core.KafkaTemplate.executeInTransaction(KafkaTemplate.java:278)
at no.nav.person.identhendelse.lager.app.aggregat.AggregatIdenthendelsePublisher.sendForPerson(AggregatIdenthendelsePublisher.java:52)
at no.nav.person.identhendelse.lager.app.aggregat.AggregatScheduledTask.aggregate(AggregatScheduledTask.java:54)
at no.nav.person.identhendelse.lager.app.aggregat.AggregatScheduledTask$$FastClassBySpringCGLIB$$7f682c33.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88)
at io.micrometer.core.aop.TimedAspect.timedMethod(TimedAspect.java:77)
at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at no.nav.person.utils.precondition.feature.annotation.PreconditionMethodInterceptor.invoke(PreconditionMethodInterceptor.java:22)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at no.nav.person.identhendelse.lager.app.aggregat.AggregatScheduledTask$$EnhancerBySpringCGLIB$$e0b597f7.aggregate(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
添加示例代码: https ://github.com/Lg87/kafka-transaction-example 查看 readme.md 和 FIND KafkaException 以查看发生的异常。
解决方案
提出此类问题时,请始终提供版本信息。
显示您的代码和完整的堆栈跟踪。
您提到
transactionTemplate
-不要使用模板以及executeInTransaction
-它们是多余的,因为它们都启动了事务。我们最近修复了此类“嵌套”事务被破坏的问题。
编辑
我发现了问题;使用时producerPerConsumerPartition
(默认为 true),容器使用的生产者不应添加到缓存中以供任意KafkaTemplate
操作使用。
作为一种变通方法,DefaultKafkaProducerFactory
对独立模板操作使用不同的。
推荐阅读
- python - 如何制作 python API 来 ping 一个 IP 地址?
- android - 在 AndroidManifest.xml 展示裸工作流配置更改
- reactjs - 使用 jsPdf 和 html-to-image 生成的 React pdf 无法正常工作
- laravel - 传递给 Common\Auth\Controllers\LoginController::authenticated() 的参数 2 必须是 App\User 的实例,给定 null,在 C:\Users\dell 中调用
- php - 更新后 Docker 应用程序无法连接到数据库
- powerbi - Power BI 循环关系
- console - 如何解决 composer.json 验证的 name 属性中的这个错误?
- python - 为具有多个模块的程序创建 docker 文件
- gcc - gcc 链接器,如何声明 HEAP 和 STACK 之间的区域,将变量放在那里以检测溢出(Cortex M3 上的硬故障问题)?
- azure - 允许 IP 地址通过 SQL Server 防火墙