首页 > 解决方案 > SpringBoot,Kafka:java.lang.NoSuchMethodError:org.apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V

问题描述

我在我的项目中使用 spring boot v2.2.4 和 Apache Kafka。

下面是我的pom.xml文件:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <version>1.4.200</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.github.docker-java</groupId>
            <artifactId>docker-java</artifactId>
            <version>3.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.3</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <!-- <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-annotations</artifactId>
            <version>3.5.6-Final</version>
        </dependency>
 -->
    </dependencies>

下面是我作为 kafka 的一部分的代码

@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String,ServingDetailsEntity> producerFactoryServingDetail(){
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public ProducerFactory<String,String> producerFactory(){
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public KafkaTemplate<String, ServingDetailsEntity> kafkaTemplateItem(){
        return new KafkaTemplate<String, ServingDetailsEntity>(producerFactoryServingDetail());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

但是当json消息发送到kafka队列时,我遇到了错误

java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V
    at org.springframework.kafka.core.KafkaTemplate.closeProducer(KafkaTemplate.java:382) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(KafkaTemplate.java:433) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:198) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:570) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:550) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ~[kafka-clients-0.11.0.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_201]

但是json消息已到达队列,但我想了解为什么我会遇到错误

期待任何帮助

标签: javaspring-bootapache-kafkaspring-kafkakafka-producer-api

解决方案


是的,正如 Gary 所提到的,如果您使用的是 SpringBoot,请让它管理兼容的版本。
您还可以在这里找到兼容性矩阵https://spring.io/projects/spring-kafka


推荐阅读