spring-boot - ChainedTransactionManager 在不同的线程中调用 AfterCompletion
问题描述
我正在使用ChainedTransactionManager
withKafkaTransactionManager
和JpaTransactionManager
。我正在使用一个线程局部变量来保存一个 id。
我有这样的流程:
HTTP 调用 => HTTP 过滤器(设置 TL var)=> 发送 Spring 事件的逻辑 => TransactionalEventListener(phase = BeforeCommit) 接收它并将其发送到 kafka(使用拦截器将 TL 添加为事件标头并清理 TL)
这就是应该发生的事情,但这就是正在发生的事情。
... => 发送 Spring 事件的逻辑 =>另一个事件占用线程,完成,清理 TL => 返回事件侦听器,接收事件,但此时 TL 已清除,kafka 发布失败.
所有这些都发生在同一个线程中。尽管 Spring Events 应该是同步的(除了您使用@Async
,我没有使用),但是TransactionalEventListener
在另一个线程中调用了 。
我所说的“另一个事件占用线程”的意思是当前消费者线程完成(尽管它并没有真正完成,因为仍然缺少侦听器代码),并且另一个消费者使用同一个线程来消费另一个事件。当这最后一个事件被消费时,该线程返回到 TxnEventListener 并清除 TL。
这是异常堆栈跟踪:
at org.apache.kafka.clients.producer.internals.ProducerInterceptors.onSend(ProducerInterceptors.java:61)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:855)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:569)
at io.opentracing.contrib.kafka.TracingKafkaProducer.send(TracingKafkaProducer.java:116)
at io.opentracing.contrib.kafka.TracingKafkaProducer.send(TracingKafkaProducer.java:97)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:404)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:222)
at org.springframework.kafka.core.KafkaTemplate.sendDefault(KafkaTemplate.java:200)
at revisions.messaging.publisher.implementation.RevisionsKafkaPublisher.sendDefault(RevisionsKafkaPublisher.kt:8)
at revisions.messaging.publisher.implementation.RevisionsEventsPublisher.handle(RevisionsEventsPublisher.kt:17)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:300)
at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:190)
at org.springframework.transaction.event.ApplicationListenerMethodTransactionalAdapter$TransactionSynchronizationEventAdapter.processEvent(ApplicationListenerMethodTransactionalAdapter.java:129)
at org.springframework.transaction.event.ApplicationListenerMethodTransactionalAdapter$TransactionSynchronizationEventAdapter.afterCompletion(ApplicationListenerMethodTransactionalAdapter.java:118)
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:171)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:990)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:965)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:786)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:712)
at org.springframework.data.transaction.MultiTransactionStatus.commit(MultiTransactionStatus.java:74)
at org.springframework.data.transaction.ChainedTransactionManager.commit(ChainedTransactionManager.java:150)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:152)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1418)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1398)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
有没有办法避免AfterCompletion
在另一个线程上调用?事件处理是同步的,没有@Async
或类似的。我不需要多个线程来消费请求/事件。
我认为事务同步和 ChainedTransactionManager 有问题,但我不知道是什么。
谢谢!
解决方案
推荐阅读
- javascript - 如何选中和取消选中复选框 onload 功能?
- python - 在 Python 中绘制 RidgeClassifier 的 ROC 曲线
- curl - 使用 ansible-playbook 安装 Gradel 5.4.1
- python - 我是编码本身的新手,所以我想知道如何解决这个问题
- flutter - 按下退格键时如何删除TextFormField中的连续删除文本?
- ubuntu-18.04 - 安装 timescaledb 失败
- xamarin.android - 带有 Transactor 的华为 HMS MLLocalTextAnalyzer 不从相机流返回任何结果(Xamarin)
- python - 如何在龙卷风集成中禁用 INFO jaeger 日志记录条目?
- mongodb - Meteor:按全名查找(多字段)
- c++ - C++ 链表插入节点,前一个节点初始化