首页 > 解决方案 > 发生错误时 RabbitTransactionManager 不会在 ChainedTransactionManager 处回滚

问题描述

我正在尝试为 Rabbit 和 Kafka 使用一个事务管理器(ChainedTransactionManager),链接 RabbitTransactionManager 和 KafkaTransactionManager。我们打算实现尽力而为的 1 阶段提交。

为了测试它,事务方法在 2 个操作(向 Rabbit 交换器发送消息并在 Kafka 中发布和事件)之后抛出异常。运行测试时,日志表明已启动回滚,但消息最终还是出现在 Rabbit 中。

这是发生问题的方法:

    @Transactional
public void processMessageAndEvent() {
    Message<String> message = MessageBuilder
            .withPayload("Message to RabbitMQ")
            .build();
    outputToRabbitMQExchange.output().send(message);

    outputToKafkaTopic.output().send(
            withPayload("Message to Kafka")
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "Kafka message key")
                    .build()
    );

    throw new RuntimeException("We want the previous changes to rollback");
}

这是主要的 Spring-boot 应用程序配置:

    @SpringBootApplication
**@EnableTransactionManagement**
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}

这是 TransactionManager 配置:

    @Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
    return new RabbitTransactionManager(cf);
}

@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka", MessageChannel.class))
            .getTransactionalProducerFactory();
    KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return new ChainedKafkaTransactionManager<>(ktm, rtm);
}

最后是application.yml文件中的相关配置:

spring:
  application:
    name: my-application
  main:
    allow-bean-definition-overriding: true
  cloud:
    stream:
      bindings:
        source_outputToRabbitMQExchange:
          content-type: application/json
          destination: outputToRabbitMQExchange
          group: ${spring.application.name}
        sink_outputToKafkaTopic:
          content-type: application/json
          destination: outputToKafkaTopic
          binder: kafka
      rabbit:
        bindings:
          output_outputToRabbitMQExchange:
            producer:
              transacted: true
              routing-key-expression: headers.myKey
      kafka:
        bindings:
          sink_outputToKafkaTopic:
            producer:
              transacted: true
        binder:
          brokers: ${...kafka.hostname}
          transaction:
            transaction-id-prefix: ${CF_INSTANCE_INDEX}.${spring.application.name}.T
      default-binder: rabbit

  kafka:
    producer:
      properties:
        max.block.ms: 3000
        transaction.timeout.ms: 5000
        enable.idempotence: true
        retries: 1
        acks: all
    bootstrap-servers: ${...kafka.hostname}

当我们执行该方法时,我们可以看到消息仍在 Rabbit 中,尽管日志显示事务将被回滚。

有什么我们可能遗漏或误解的吗?

标签: apache-kafkarabbitmqspring-kafkaspring-cloud-streamspring-rabbit

解决方案


@EnableBinding已弃用,取而代之的是较新的函数式编程模型。

也就是说,我几乎照原样复制了您的代码/配置(transacted不是 kafka 生产者绑定属性),它对我来说很好(引导 2.4.5,云 2020.0.2)...

@SpringBootApplication
@EnableTransactionManagement
@EnableBinding(Bindings.class)
public class So67297869Application {

    public static void main(String[] args) {
        SpringApplication.run(So67297869Application.class, args);
    }

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

    @Bean(name = "transactionManager")
    @Primary
    public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
                MessageChannel.class))
                        .getTransactionalProducerFactory();
        KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return new ChainedKafkaTransactionManager<>(ktm, rtm);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            foo.send("test");
        };
    }

}

interface Bindings {

    @Output("source_outputToRabbitMQExchange")
    MessageChannel rabbitOut();

    @Output("sink_outputToKafkaTopic")
    MessageChannel kafkaOut();

}

@Component
class Foo {

    @Autowired
    Bindings bindings;

    @Transactional
    public void send(String in) {
        bindings.rabbitOut().send(MessageBuilder.withPayload(in)
                .setHeader("myKey", "test")
                .build());
        bindings.kafkaOut().send(MessageBuilder.withPayload(in)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "test".getBytes())
                .build());
        throw new RuntimeException("fail");
    }

}
spring:
  application:
    name: my-application
  main:
    allow-bean-definition-overriding: true
  cloud:
    stream:
      bindings:
        source_outputToRabbitMQExchange:
          content-type: application/json
          destination: outputToRabbitMQExchange
          group: ${spring.application.name}
        sink_outputToKafkaTopic:
          content-type: application/json
          destination: outputToKafkaTopic
          binder: kafka
      rabbit:
        bindings:
          source_outputToRabbitMQExchange:
            producer:
              transacted: true
              routing-key-expression: headers.myKey
      kafka:
        binder:
          brokers: localhost:9092
          transaction:
            transaction-id-prefix: foo.${spring.application.name}.T
      default-binder: rabbit

  kafka:
    producer:
      properties:
        max.block.ms: 3000
        transaction.timeout.ms: 5000
        enable.idempotence: true
        retries: 1
        acks: all
    bootstrap-servers: localhost:9092

logging:
  level:
    org.springframework.transaction: debug
    org.springframework.kafka: debug
    org.springframework.amqp.rabbit: debug
2021-04-28 09:35:32.488 DEBUG 53253 --- [           main] o.s.a.r.t.RabbitTransactionManager       : Initiating transaction rollback
2021-04-28 09:35:32.489 DEBUG 53253 --- [           main] o.s.a.r.connection.RabbitResourceHolder  : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@3c770db4 Shared Rabbit Connection: SimpleConnection@1f736d00 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 63439]
2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.a.r.t.RabbitTransactionManager       : Resuming suspended transaction after completion of inner transaction
2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38e83838] abortTransaction()

队列中没有我与 RK 绑定到交换的消息#

你用的是什么版本?

编辑

这是删除弃用后的等效应用程序,使用功能模型和StreamBridge(相同的 yaml):

@SpringBootApplication
@EnableTransactionManagement
public class So67297869Application {

    public static void main(String[] args) {
        SpringApplication.run(So67297869Application.class, args);
    }

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

    @Bean(name = "transactionManager")
    @Primary
    public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
                MessageChannel.class))
                        .getTransactionalProducerFactory();
        KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return new ChainedKafkaTransactionManager<>(ktm, rtm);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            foo.send("test");
        };
    }

}

@Component
class Foo {

    @Autowired
    StreamBridge bridge;

    @Transactional
    public void send(String in) {
        bridge.send("source_outputToRabbitMQExchange", MessageBuilder.withPayload(in)
                .setHeader("myKey", "test")
                .build());
        bridge.send("sink_outputToKafkaTopic", MessageBuilder.withPayload(in)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "test".getBytes())
                .build());
        throw new RuntimeException("fail");
    }

}

推荐阅读