首页 > 解决方案 > Spring Boot - 发布者无法在 Kafka 上发布消息

问题描述

在 Windows 上,我安装了 Ubuntu 并设置了 docker,然后设置了 Apache Kafka。我使用 Spring Boot 代码开发了一个生产者和消费者。

生产者 - 属性文件

spring.kafka.producer.bootstrap-servers=localhost:9092

消费者 - 属性文件

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group-id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

错误 -

2021-09-26 23:23:15.809[0;39m [32m INFO[0;39m [35m10568[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.kafka.common.utils.AppInfoParser    [0;39m [2m:[0;39m Kafka version: 2.6.0
[2m2021-09-26 23:23:15.811[0;39m [32m INFO[0;39m [35m10568[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.kafka.common.utils.AppInfoParser    [0;39m [2m:[0;39m Kafka commitId: 62abe01bee039651
[2m2021-09-26 23:23:15.811[0;39m [32m INFO[0;39m [35m10568[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.kafka.common.utils.AppInfoParser    [0;39m [2m:[0;39m Kafka startTimeMs: 1632678795806
[2m2021-09-26 23:23:36.826[0;39m [33m WARN[0;39m [35m10568[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[2m2021-09-26 23:23:57.929[0;39m [33m WARN[0;39m [35m10568[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[2m2021-09-26 23:24:15.853[0;39m [31mERROR[0;39m [35m10568[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.k.support.LoggingProducerListener   [0;39m [2m:[0;39m Exception thrown when sending a message with key='null' and payload='Hello Timotius 0.4861729042438717' to topic t_hello:

org.apache.kafka.common.errors.TimeoutException: Topic t_hello not present in metadata after 60000 ms.

[2m2021-09-26 23:24:15.868[0;39m [32m INFO[0;39m [35m10568[0;39m [2m---[0;39m [2m[           main][0;39m [36mConditionEvaluationReportLoggingListener[0;39m [2m:[0;39m 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
[2m2021-09-26 23:24:15.911[0;39m [31mERROR[0;39m [35m10568[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.boot.SpringApplication              [0;39m [2m:[0;39m Application run failed

java.lang.IllegalStateException: Failed to execute CommandLineRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:807) ~[spring-boot-2.4.2.jar:2.4.2]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:788) ~[spring-boot-2.4.2.jar:2.4.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:333) ~[spring-boot-2.4.2.jar:2.4.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1311) ~[spring-boot-2.4.2.jar:2.4.2]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1300) ~[spring-boot-2.4.2.jar:2.4.2]
    at com.course.kafkaproducer.KafkaProducerApplication.main(KafkaProducerApplication.java:16) ~[main/:na]
Caused by: org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic t_hello not present in metadata after 60000 ms.
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:574) ~[spring-kafka-2.6.5.jar:2.6.5]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363) ~[spring-kafka-2.6.5.jar:2.6.5]
    at com.course.kafkaproducer.producer.HelloKafkaProducer.sendHello(HelloKafkaProducer.java:14) ~[main/:na]
    at com.course.kafkaproducer.KafkaProducerApplication.run(KafkaProducerApplication.java:21) ~[main/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:804) ~[spring-boot-2.4.2.jar:2.4.2]
    ... 5 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic t_hello not present in metadata after 60000 ms.

[2m2021-09-26 23:24:15.917[0;39m [32m INFO[0;39m [35m10568[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.k.clients.producer.KafkaProducer    [0;39m [2m:[0;39m [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
[2m2021-09-26 23:24:19.091[0;39m [33m WARN[0;39m [35m10568[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

码头工人-compose.yml

version: "3.7"

networks:
  kafka-net:
    name: kafka-net
    driver: bridge

services:
  zookeeper:
    image: zookeeper:3.7.0
    container_name: zookeeper
    restart: always
    networks:
      - kafka-net
    ports:
      - "2181:2181"
    volumes:
      - ./docker-data/zookeeper/data:/data
      - ./docker-data/zookeeper/datalog:/datalog

  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    container_name: kafka
    restart: always
    networks:
      - kafka-net
    ports:
      - "9092:9092"
    volumes:
       - ./docker-data/var/run/docker.sock:/var/run/docker.sock
       - ./docker-data/kafka:/kafka
    environment:
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: DOCKER_INTERNAL://:29092,DOCKER_EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: DOCKER_INTERNAL://kafka:29092,DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

标签: spring-bootapache-kafka

解决方案


The topic t_hello doesn't exist, that's why you get the error. You have two options

  1. Configure docker-compose to create the topic ( docs ).

     environment:
       KAFKA_CREATE_TOPICS: "t_hello:1:3"
    
  2. Or create it programatically from your code. I don't have an example at hand, but you'll find plenty of the using the class KafkaAdminClient.


推荐阅读