首页 > 解决方案 > 为什么 KafkaTransactionManager 没有应用于这个 Spring Cloud Stream Kafka Producer?

问题描述

我已经配置了一个 Spring Cloud Stream Kafka 应用程序来使用事务(Github 上提供了完整的源代码):

spring:
  application:
    name: message-relay-service
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: message-relay-tx-
            producer:
              configuration:
                retries: 1
                acks: all
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

      bindings:
        output:
          destination: transfer
          contentType: application/*+avro
      schema-registry-client:
        endpoint: http://localhost:8081
      schema:
        avro:
          subjectNamingStrategy: org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
  datasource:
    url: jdbc:h2:tcp://localhost:9090/mem:mydb
    driver-class-name: org.h2.Driver
    username: sa
    password:
  jpa:
    hibernate:
      ddl-auto: create
    database-platform: org.hibernate.dialect.H2Dialect

server:
  port: 8085

这个应用程序有一个计划任务,它定期检查记录以使用任务发送到数据库中@Scheduled。此方法带有注释,@Transactional主类定义@EnableTransactionManagement.

但是,在调试代码时,我意识到 KafkaTransactionManager 没有被执行,也就是说,没有 Kafka 事务到位。有什么问题?

@EnableTransactionManagement
@EnableBinding(Source::class)
@EnableScheduling
@SpringBootApplication
class MessageRelayServiceApplication

fun main(args: Array<String>) {
    runApplication<MessageRelayServiceApplication>(*args)
}

---

@Component
class MessageRelay(private val outboxService: OutboxService,
                   private val source: Source) {

    @Transactional
    @Scheduled(fixedDelay = 10000)
    fun checkOutbox() {
        val pending = outboxService.getPending()
        pending.forEach {
            val message = MessageBuilder.withPayload(it.payload)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, it.messageKey)
                    .setHeader(MessageHeaders.CONTENT_TYPE, it.contentType)
                    .build()
            source.output().send(message)
            outboxService.markAsProcessed(it.id)
        }
    }

}

标签: springapache-kafkaspring-cloudspring-cloud-stream

解决方案


我没有看到@EnableTransactionManagementaccount-service只有在message-relay-service

无论如何,目前不支持您的方案;事务绑定器是为消费者启动事务的处理器设计的,在消费者线程上发送的任何记录都参与该事务,消费者将偏移量发送到事务然后提交事务。

它不是为仅生产者绑定而设计的;请针对活页夹打开一个 GitHub 问题,因为它应该受到支持。

我不确定为什么您没有看到交易开始,但即使有,问题是@Transactional它将使用 Boot 的自动配置的 KTM(和生产者工厂)并且绑定使用不同的生产者工厂(您的配置中的那个) .

即使交易正在进行中,生产者也不会参与其中。


推荐阅读