java - 无法使用 Spingboot 应用程序配置带有 Camel-Kafka 的事务
问题描述
我目前正在创建一个 Springboot 应用程序,该应用程序将使用来自 Kafka 主题的消息,并在处理后写回另一个主题。我正在使用 CAMEL 进行集成。
My Route looks like this :
onException(IllegalArgumentException.class).maximumRedeliveries(4);
from("kafka:CDC-GEXPUAT-CUSTOMER")
.id("CamelRouteCustomer_1")
.**transacted**()
.choice()
.when(simple("${body} contains 'GEXPUAT.CUSTOMER'" ))
.unmarshal().json(JsonLibrary.Jackson, CustomerWrapper.class)
.process(customerProcessor)
.otherwise()
.log("${body}")
.end()
.to("seda:aggregate_1");
当我在路由中使用 .transacted() 时,出现以下错误: org.apache.camel.NoSuchBeanException: No bean can be found in the registry of type: PlatformTransactionManager
所以我现在尝试创建一个配置类来定义 TransactionManager
@Configuration
public class CommonBean {
@Bean
SpringTransactionPolicy springTransactionPolicy() throws Exception {
SpringTransactionPolicy txRequired = new SpringTransactionPolicy();
txRequired.setTransactionManager(transactionManager());
txRequired.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return txRequired;
}
@Bean
public DefaultKafkaProducerFactory<byte[], byte[]> producerFactory() {
DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<byte[], byte[]>(
kafkaConfigs());
// enable transaction manager
defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
return defaultKafkaProducerFactory;
}
@Bean
@Primary
public ChainedKafkaTransactionManager<byte[], byte[]> transactionManager() throws Exception {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager());
}
@Bean
public PlatformTransactionManager kafkaTransactionManager() {
KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
kafkaTransactionManager.setRollbackOnCommitFailure(true);
return kafkaTransactionManager;
}
}
但是现在我收到编译错误并且找不到类:
- SpringTransactionPolicy
- 默认KafkaProducerFactory
- 链式KafkaTransactionManager
- KafkaTransactionManager
我不确定需要在 pom.xml 中添加什么依赖项,以便我可以在 Camel Spingboot 项目中配置我的 KafkaTransactionManager
目前 POM.XML 看起来像这样
我已经评论了 xml 文件的某些部分
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
<!-- <version>2.3.3.RELEASE</version> -->
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.surajit.camel</groupId>
<artifactId>camel-microservice-a</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>CamelProject</name>
<description>Camel project for Spring Boot</description>
<properties>
<java.version>8</java.version>
<camel.version>3.7.0</camel.version>
<spring-boot.version>2.3.3.RELEASE</spring-boot.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-activemq-starter</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-kafka-starter</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-jackson-starter</artifactId>
<version>${camel.version}</version>
</dependency>
<!-- <dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.2.1</version>
</dependency> -->
<!-- <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency> -->
<!-- <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
解决方案
我可以通过在 pom.xml 中添加以下内容来解决问题
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!-- <version>2.2.14.RELEASE</version> -->
<version>2.7.0</version>
</dependency>
然后我删除了CommonBean 自定义事务 Bean 类
Added the following in the application.properties
spring.kafka.producer.transaction-id-prefix="producer"
spring.kafka.producer.bootstrap-servers=xxx.xxx.xxx.xxx:9092
spring.kafka.jaas.enabled=false
现在按路线正确开始
推荐阅读
- java - 如何使用 Jackson 将 JSON 数组中的嵌套值解析为列表
- telegram-bot - Telegram-BOT 用户 ID 只是一个私人聊天 ID?
- python - 如何在 python 中修复 datetime TypeError
- android - Android Room:样板代码
- python - Postgresql:使用时间条件插入新记录时更新旧记录
- mysql - 计算二年级复数结果时的问题
- javascript - 使用 Array.Sort 而不使用数组中的两个值
- android - FileProvider.getUriForFile:检查使用的权限
- c# - 在 XML 反序列化中将 XML 节点的值设置为 C# 模型上的字段
- kubernetes - 如何在 gke 中将 kube-controller-manager 中的 --horizontal-pod-autoscaler-sync-period 字段更改为 5sec