apache-kafka - 发生错误时 RabbitTransactionManager 不会在 ChainedTransactionManager 处回滚
问题描述
我正在尝试为 Rabbit 和 Kafka 使用一个事务管理器(ChainedTransactionManager),链接 RabbitTransactionManager 和 KafkaTransactionManager。我们打算实现尽力而为的 1 阶段提交。
为了测试它,事务方法在 2 个操作(向 Rabbit 交换器发送消息并在 Kafka 中发布和事件)之后抛出异常。运行测试时,日志表明已启动回滚,但消息最终还是出现在 Rabbit 中。
- 笔记:
- 我们使用 QPid 来模拟内存中的 RabbitMQ 进行测试(版本 7.1.12)
- 我们正在使用内存中的 Kafka 进行测试(spring-kafka-test)
- 其他相关框架/库:spring-cloud-stream
这是发生问题的方法:
@Transactional
public void processMessageAndEvent() {
Message<String> message = MessageBuilder
.withPayload("Message to RabbitMQ")
.build();
outputToRabbitMQExchange.output().send(message);
outputToKafkaTopic.output().send(
withPayload("Message to Kafka")
.setHeader(KafkaHeaders.MESSAGE_KEY, "Kafka message key")
.build()
);
throw new RuntimeException("We want the previous changes to rollback");
}
这是主要的 Spring-boot 应用程序配置:
@SpringBootApplication
**@EnableTransactionManagement**
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
这是 TransactionManager 配置:
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka", MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(ktm, rtm);
}
最后是application.yml文件中的相关配置:
spring:
application:
name: my-application
main:
allow-bean-definition-overriding: true
cloud:
stream:
bindings:
source_outputToRabbitMQExchange:
content-type: application/json
destination: outputToRabbitMQExchange
group: ${spring.application.name}
sink_outputToKafkaTopic:
content-type: application/json
destination: outputToKafkaTopic
binder: kafka
rabbit:
bindings:
output_outputToRabbitMQExchange:
producer:
transacted: true
routing-key-expression: headers.myKey
kafka:
bindings:
sink_outputToKafkaTopic:
producer:
transacted: true
binder:
brokers: ${...kafka.hostname}
transaction:
transaction-id-prefix: ${CF_INSTANCE_INDEX}.${spring.application.name}.T
default-binder: rabbit
kafka:
producer:
properties:
max.block.ms: 3000
transaction.timeout.ms: 5000
enable.idempotence: true
retries: 1
acks: all
bootstrap-servers: ${...kafka.hostname}
当我们执行该方法时,我们可以看到消息仍在 Rabbit 中,尽管日志显示事务将被回滚。
有什么我们可能遗漏或误解的吗?
解决方案
@EnableBinding
已弃用,取而代之的是较新的函数式编程模型。
也就是说,我几乎照原样复制了您的代码/配置(transacted
不是 kafka 生产者绑定属性),它对我来说很好(引导 2.4.5,云 2020.0.2)...
@SpringBootApplication
@EnableTransactionManagement
@EnableBinding(Bindings.class)
public class So67297869Application {
public static void main(String[] args) {
SpringApplication.run(So67297869Application.class, args);
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(ktm, rtm);
}
@Bean
public ApplicationRunner runner(Foo foo) {
return args -> {
foo.send("test");
};
}
}
interface Bindings {
@Output("source_outputToRabbitMQExchange")
MessageChannel rabbitOut();
@Output("sink_outputToKafkaTopic")
MessageChannel kafkaOut();
}
@Component
class Foo {
@Autowired
Bindings bindings;
@Transactional
public void send(String in) {
bindings.rabbitOut().send(MessageBuilder.withPayload(in)
.setHeader("myKey", "test")
.build());
bindings.kafkaOut().send(MessageBuilder.withPayload(in)
.setHeader(KafkaHeaders.MESSAGE_KEY, "test".getBytes())
.build());
throw new RuntimeException("fail");
}
}
spring:
application:
name: my-application
main:
allow-bean-definition-overriding: true
cloud:
stream:
bindings:
source_outputToRabbitMQExchange:
content-type: application/json
destination: outputToRabbitMQExchange
group: ${spring.application.name}
sink_outputToKafkaTopic:
content-type: application/json
destination: outputToKafkaTopic
binder: kafka
rabbit:
bindings:
source_outputToRabbitMQExchange:
producer:
transacted: true
routing-key-expression: headers.myKey
kafka:
binder:
brokers: localhost:9092
transaction:
transaction-id-prefix: foo.${spring.application.name}.T
default-binder: rabbit
kafka:
producer:
properties:
max.block.ms: 3000
transaction.timeout.ms: 5000
enable.idempotence: true
retries: 1
acks: all
bootstrap-servers: localhost:9092
logging:
level:
org.springframework.transaction: debug
org.springframework.kafka: debug
org.springframework.amqp.rabbit: debug
2021-04-28 09:35:32.488 DEBUG 53253 --- [ main] o.s.a.r.t.RabbitTransactionManager : Initiating transaction rollback
2021-04-28 09:35:32.489 DEBUG 53253 --- [ main] o.s.a.r.connection.RabbitResourceHolder : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2), conn: Proxy@3c770db4 Shared Rabbit Connection: SimpleConnection@1f736d00 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 63439]
2021-04-28 09:35:32.490 DEBUG 53253 --- [ main] o.s.a.r.t.RabbitTransactionManager : Resuming suspended transaction after completion of inner transaction
2021-04-28 09:35:32.490 DEBUG 53253 --- [ main] o.s.k.t.KafkaTransactionManager : Initiating transaction rollback
2021-04-28 09:35:32.490 DEBUG 53253 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38e83838] abortTransaction()
队列中没有我与 RK 绑定到交换的消息#
。
你用的是什么版本?
编辑
这是删除弃用后的等效应用程序,使用功能模型和StreamBridge
(相同的 yaml):
@SpringBootApplication
@EnableTransactionManagement
public class So67297869Application {
public static void main(String[] args) {
SpringApplication.run(So67297869Application.class, args);
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm, BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(ktm, rtm);
}
@Bean
public ApplicationRunner runner(Foo foo) {
return args -> {
foo.send("test");
};
}
}
@Component
class Foo {
@Autowired
StreamBridge bridge;
@Transactional
public void send(String in) {
bridge.send("source_outputToRabbitMQExchange", MessageBuilder.withPayload(in)
.setHeader("myKey", "test")
.build());
bridge.send("sink_outputToKafkaTopic", MessageBuilder.withPayload(in)
.setHeader(KafkaHeaders.MESSAGE_KEY, "test".getBytes())
.build());
throw new RuntimeException("fail");
}
}
推荐阅读
- reactjs - 如何在函数中评分时获得评分值
- scikit-learn - 使用 Gridsearch 降低回归得分
- chatbot - 如何创建一个聊天机器人,将输入组织到选定的模板中?
- sql - MAX query problems
- python-3.x - 如何从第 1 行修复“与第一次不匹配的差异”
- python - 为什么我使用pycharm时我的django项目url直接指向旧的
- octobercms - 如何实现这个 SQL 查询?whereNotBetween 问题
- javascript - 我想获得我的 Firebase 数据的平均值
- java - 使用接口在java中运行时绑定
- node.js - 在 NodeJS 中解析 XML 时出现意外的令牌 <