首页 > 解决方案 > 无法消费消息Kafka Spring Boot Docker Compose

问题描述

我的代码在使用 IDE 单独运行时运行良好。但是当使用 docker-compose 时,生产者会正确生成消息,我也可以使用 docker CLI composer 来消费消息,但我负责消费消息的 Spring Boot 微服务没有消费。

消费者容器(名称:进程)中没有错误显示,它仅记录以下内容...

request.timeout.ms = 30000

retry.backoff.ms = 100

sasl.client.callback.handler.class = null

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 60000

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.login.callback.handler.class = null

sasl.login.class = null

sasl.login.refresh.buffer.seconds = 300

sasl.login.refresh.min.period.seconds = 60

sasl.login.refresh.window.factor = 0.8

sasl.login.refresh.window.jitter = 0.05

sasl.mechanism = GSSAPI

security.protocol = PLAINTEXT

security.providers = null

send.buffer.bytes = 131072

session.timeout.ms = 10000

ssl.cipher.suites = null

ssl.enabled.protocols = [TLSv1.2]

ssl.endpoint.identification.algorithm = https

ssl.engine.factory.class = null

ssl.key.password = null

ssl.keymanager.algorithm = SunX509

ssl.keystore.location = null

ssl.keystore.password = null

ssl.keystore.type = JKS

ssl.protocol = TLSv1.2

ssl.provider = null

ssl.secure.random.implementation = null

ssl.trustmanager.algorithm = PKIX

ssl.truststore.location = null

ssl.truststore.password = null

ssl.truststore.type = JKS

value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer


2021-04-07 14:58:07.123 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0

2021-04-07 14:58:07.126 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651

2021-04-07 14:58:07.127 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1617807487120

2021-04-07 14:58:07.138 INFO 1 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Subscribed to topic(s): test

2021-04-07 14:58:07.146 INFO 1 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService

2021-04-07 14:58:07.241 INFO 1 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8082 (http) with context path ''

2021-04-07 14:58:07.354 INFO 1 --- [ main] c.h.p.p.ProcessFcmDataServiceApplication : Started ProcessFcmDataServiceApplication in 9.916 seconds (JVM running for 11.719)

2021-04-07 14:58:08.036 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

2021-04-07 14:58:08.037 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

2021-04-07 14:58:08.165 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

2021-04-07 14:58:08.165 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

2021-04-07 14:58:08.316 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

2021-04-07 14:58:08.317 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected 

这是我的 docker-compose.yml

version: "3"
services:
  register:
    container_name: register
    build: register-FCM-token-service/
    networks:
      - push-notification
    ports:
      - "8001:8001"

  send:
    container_name: send
    build: send-FCM-notification-service/
    networks:
      - push-notification
    depends_on:
      - process
    links:
      - kafka:kafka
    environment:
      kafka.boot.server: kafka:9092
    ports:
      - "8083:8083"

  process:
    container_name: process
    build: process-FCM-data-service/
    networks:
      - push-notification
    depends_on:
      - recieve
    links:
      - kafka:kafka
    environment:
      kafka.boot.server: kafka:9092
    ports:
      - "8082:8082"
 
  recieve:
    container_name: recieve
    build: recieve-push-request-service/
    depends_on:
      - kafka
    links:
      - kafka:kafka
    ports:
      - "8080:8080"
    environment:
      kafka.boot.server: kafka:9092
    networks:
      - push-notification 

  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper
    restart: always
    networks:
      - push-notification 

  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    networks:
      - push-notification
    restart: always
    environment:
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      ALLOW_PLAINTEXT_LISTENER: "yes"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_AUTO_OFFSET_RESET: 'latest'

networks:
  push-notification:

“进程”容器(消费者)不工作。

尝试使用容器 CLI 时,kafka 消费者在 kafka 容器内工作正常,这是输出...

/ # kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
{"source":"salesforce","userid":"1235aa","messageTitle":"test 1 title","messageBody":"test 1 body"}

这是我的流程微服务(消费者)的 Kafka Config Java 文件...

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.boot.server}")
    private String kafkaServer;
    
    @Value("${kafka.consumer.group.id}")
    private String kafkaGroupId;
    
    public ProducerFactory<String, PushMessageModel> getProducerFactory(){
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);    //use StringSerializer.class instead for simple string value 
        return new DefaultKafkaProducerFactory<>(configs);
    }
    
    @Bean
    public KafkaTemplate<String, PushMessageModel> getKafkaTemplate(){
        return new KafkaTemplate<>(getProducerFactory());
    }
    
    @Bean
    public ConsumerFactory<String, FCMMessageContainer> getConsumerFactory(){
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        
        return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new JsonDeserializer<>(FCMMessageContainer.class));
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, FCMMessageContainer>> getKafkaListener(){
        ConcurrentKafkaListenerContainerFactory<String, FCMMessageContainer> listener = new ConcurrentKafkaListenerContainerFactory<>();
        listener.setConsumerFactory(getConsumerFactory());
        listener.setErrorHandler(new KafkaErrHandler());
        return listener;
    }
}

我该如何调试这个问题?如何解决这个问题?我的代码显然没有问题,因为它可以在 IDE 上运行,那么问题出在哪里?

标签: javaspring-bootapache-kafkadocker-composemicroservices

解决方案


推荐阅读