首页 > 解决方案 > 无法使用 Spingboot 应用程序配置带有 Camel-Kafka 的事务

问题描述

我目前正在创建一个 Springboot 应用程序,该应用程序将使用来自 Kafka 主题的消息,并在处理后写回另一个主题。我正在使用 CAMEL 进行集成。

My Route looks like this :
    onException(IllegalArgumentException.class).maximumRedeliveries(4); 
        

from("kafka:CDC-GEXPUAT-CUSTOMER")
             .id("CamelRouteCustomer_1")
             .**transacted**()
             .choice()
                .when(simple("${body} contains 'GEXPUAT.CUSTOMER'" ))
                    .unmarshal().json(JsonLibrary.Jackson, CustomerWrapper.class)
                    .process(customerProcessor)
                .otherwise()
                    .log("${body}")                 
             .end()
             .to("seda:aggregate_1");

当我在路由中使用 .transacted() 时,出现以下错误: org.apache.camel.NoSuchBeanException: No bean can be found in the registry of type: PlatformTransactionManager

所以我现在尝试创建一个配置类来定义 TransactionManager

@Configuration
public class CommonBean {
    
     @Bean
        SpringTransactionPolicy springTransactionPolicy() throws Exception {
            SpringTransactionPolicy txRequired = new SpringTransactionPolicy();
            txRequired.setTransactionManager(transactionManager());
            txRequired.setPropagationBehaviorName("PROPAGATION_REQUIRED");
            return txRequired;
        }

        @Bean
        public DefaultKafkaProducerFactory<byte[], byte[]> producerFactory() {
            DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<byte[], byte[]>(
                    kafkaConfigs());
            // enable transaction manager
            defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
            return defaultKafkaProducerFactory;
        }


        @Bean
        @Primary
        public ChainedKafkaTransactionManager<byte[], byte[]> transactionManager() throws Exception {
            return new ChainedKafkaTransactionManager<>(kafkaTransactionManager());
        }

        @Bean
        public PlatformTransactionManager kafkaTransactionManager() {
            KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
            kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
            kafkaTransactionManager.setRollbackOnCommitFailure(true);
            return kafkaTransactionManager;
        }

}

但是现在我收到编译错误并且找不到类:

  1. SpringTransactionPolicy
  2. 默认KafkaProducerFactory
  3. 链式KafkaTransactionManager
  4. KafkaTransactionManager

我不确定需要在 pom.xml 中添加什么依赖项,以便我可以在 Camel Spingboot 项目中配置我的 KafkaTransactionManager

目前 POM.XML 看起来像这样

我已经评论了 xml 文件的某些部分

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.5</version>
        <!-- <version>2.3.3.RELEASE</version> -->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.surajit.camel</groupId>
    <artifactId>camel-microservice-a</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>CamelProject</name>
    <description>Camel project for Spring Boot</description>
    <properties>
        <java.version>8</java.version>
        <camel.version>3.7.0</camel.version>
        <spring-boot.version>2.3.3.RELEASE</spring-boot.version>
        <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
    </properties>
    <dependencies>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-spring-boot-starter</artifactId>
            <version>${camel.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-activemq-starter</artifactId>
            <version>${camel.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-kafka-starter</artifactId>
            <version>${camel.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-jackson-starter</artifactId>
            <version>${camel.version}</version>
        </dependency>
        
    <!--    <dependency>
            <groupId>com.datastax.cassandra</groupId>
            <artifactId>cassandra-driver-core</artifactId>
            <version>3.0.2</version>
        </dependency>
        
        <dependency>
            <groupId>com.datastax.oss</groupId>
            <artifactId>java-driver-core</artifactId>
            <version>4.2.1</version>
        </dependency> -->

    <!--    <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-dependencies</artifactId>
          <version>${spring-boot.version}</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency> -->
        
    <!--     <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency> -->
        

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

标签: javaspring-bootapache-kafkaapache-camel

解决方案


我可以通过在 pom.xml 中添加以下内容来解决问题

<dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
         <!--  <version>2.2.14.RELEASE</version> -->
          <version>2.7.0</version>
        </dependency>

然后我删除了CommonBean 自定义事务 Bean 类

Added the following in the application.properties
spring.kafka.producer.transaction-id-prefix="producer"
spring.kafka.producer.bootstrap-servers=xxx.xxx.xxx.xxx:9092
spring.kafka.jaas.enabled=false

现在按路线正确开始


推荐阅读