spring - 为什么 KafkaTransactionManager 没有应用于这个 Spring Cloud Stream Kafka Producer?
问题描述
我已经配置了一个 Spring Cloud Stream Kafka 应用程序来使用事务(Github 上提供了完整的源代码):
spring:
application:
name: message-relay-service
cloud:
stream:
kafka:
binder:
transaction:
transaction-id-prefix: message-relay-tx-
producer:
configuration:
retries: 1
acks: all
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
bindings:
output:
destination: transfer
contentType: application/*+avro
schema-registry-client:
endpoint: http://localhost:8081
schema:
avro:
subjectNamingStrategy: org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
datasource:
url: jdbc:h2:tcp://localhost:9090/mem:mydb
driver-class-name: org.h2.Driver
username: sa
password:
jpa:
hibernate:
ddl-auto: create
database-platform: org.hibernate.dialect.H2Dialect
server:
port: 8085
这个应用程序有一个计划任务,它定期检查记录以使用任务发送到数据库中@Scheduled
。此方法带有注释,@Transactional
主类定义@EnableTransactionManagement
.
但是,在调试代码时,我意识到 KafkaTransactionManager 没有被执行,也就是说,没有 Kafka 事务到位。有什么问题?
@EnableTransactionManagement
@EnableBinding(Source::class)
@EnableScheduling
@SpringBootApplication
class MessageRelayServiceApplication
fun main(args: Array<String>) {
runApplication<MessageRelayServiceApplication>(*args)
}
---
@Component
class MessageRelay(private val outboxService: OutboxService,
private val source: Source) {
@Transactional
@Scheduled(fixedDelay = 10000)
fun checkOutbox() {
val pending = outboxService.getPending()
pending.forEach {
val message = MessageBuilder.withPayload(it.payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, it.messageKey)
.setHeader(MessageHeaders.CONTENT_TYPE, it.contentType)
.build()
source.output().send(message)
outboxService.markAsProcessed(it.id)
}
}
}
解决方案
我没有看到@EnableTransactionManagement
,account-service
只有在message-relay-service
。
无论如何,目前不支持您的方案;事务绑定器是为消费者启动事务的处理器设计的,在消费者线程上发送的任何记录都参与该事务,消费者将偏移量发送到事务然后提交事务。
它不是为仅生产者绑定而设计的;请针对活页夹打开一个 GitHub 问题,因为它应该受到支持。
我不确定为什么您没有看到交易开始,但即使有,问题是@Transactional
它将使用 Boot 的自动配置的 KTM(和生产者工厂)并且绑定使用不同的生产者工厂(您的配置中的那个) .
即使交易正在进行中,生产者也不会参与其中。
推荐阅读
- android - 如何使用firebase MLkit检测文本识别中的多种语言?
- azure - 将 API 管理设置为 VNET 内部的外部时,我可以在内部路由流量吗?
- python - 有没有更有效的方法来制作这个程序?
- javascript - 如何在服务人员中缓存动态 url?它与预缓存有什么关系吗?我正在使用 ReactJS 默认 serviceWorker
- ruby-on-rails - 未定义的局部变量或方法“sub_task”
- mongodb - 使用地图从字符串更改日期
- sql - postgres 无法按时间和日期获取查询过滤器,即
- visual-studio - 获取最新版本 - TFS 中缺少文件但在本地文件夹中可用
- regex - 正则表达式在特定字符之前将文本复制到另一个
- python - django,按一个字段分组,只取每组的latest/max,取回ORM对象