java - 无法消费消息Kafka Spring Boot Docker Compose
问题描述
我的代码在使用 IDE 单独运行时运行良好。但是当使用 docker-compose 时,生产者会正确生成消息,我也可以使用 docker CLI composer 来消费消息,但我负责消费消息的 Spring Boot 微服务没有消费。
消费者容器(名称:进程)中没有错误显示,它仅记录以下内容...
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2021-04-07 14:58:07.123 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-04-07 14:58:07.126 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-04-07 14:58:07.127 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1617807487120
2021-04-07 14:58:07.138 INFO 1 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Subscribed to topic(s): test
2021-04-07 14:58:07.146 INFO 1 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2021-04-07 14:58:07.241 INFO 1 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8082 (http) with context path ''
2021-04-07 14:58:07.354 INFO 1 --- [ main] c.h.p.p.ProcessFcmDataServiceApplication : Started ProcessFcmDataServiceApplication in 9.916 seconds (JVM running for 11.719)
2021-04-07 14:58:08.036 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-04-07 14:58:08.037 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2021-04-07 14:58:08.165 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-04-07 14:58:08.165 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2021-04-07 14:58:08.316 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2021-04-07 14:58:08.317 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-testgroup-1, groupId=testgroup] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
这是我的 docker-compose.yml
version: "3"
services:
register:
container_name: register
build: register-FCM-token-service/
networks:
- push-notification
ports:
- "8001:8001"
send:
container_name: send
build: send-FCM-notification-service/
networks:
- push-notification
depends_on:
- process
links:
- kafka:kafka
environment:
kafka.boot.server: kafka:9092
ports:
- "8083:8083"
process:
container_name: process
build: process-FCM-data-service/
networks:
- push-notification
depends_on:
- recieve
links:
- kafka:kafka
environment:
kafka.boot.server: kafka:9092
ports:
- "8082:8082"
recieve:
container_name: recieve
build: recieve-push-request-service/
depends_on:
- kafka
links:
- kafka:kafka
ports:
- "8080:8080"
environment:
kafka.boot.server: kafka:9092
networks:
- push-notification
zookeeper:
container_name: zookeeper
image: wurstmeister/zookeeper
restart: always
networks:
- push-notification
kafka:
container_name: kafka
image: wurstmeister/kafka
ports:
- "9092:9092"
networks:
- push-notification
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_AUTO_OFFSET_RESET: 'latest'
networks:
push-notification:
“进程”容器(消费者)不工作。
尝试使用容器 CLI 时,kafka 消费者在 kafka 容器内工作正常,这是输出...
/ # kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
{"source":"salesforce","userid":"1235aa","messageTitle":"test 1 title","messageBody":"test 1 body"}
这是我的流程微服务(消费者)的 Kafka Config Java 文件...
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.boot.server}")
private String kafkaServer;
@Value("${kafka.consumer.group.id}")
private String kafkaGroupId;
public ProducerFactory<String, PushMessageModel> getProducerFactory(){
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); //use StringSerializer.class instead for simple string value
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, PushMessageModel> getKafkaTemplate(){
return new KafkaTemplate<>(getProducerFactory());
}
@Bean
public ConsumerFactory<String, FCMMessageContainer> getConsumerFactory(){
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new JsonDeserializer<>(FCMMessageContainer.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, FCMMessageContainer>> getKafkaListener(){
ConcurrentKafkaListenerContainerFactory<String, FCMMessageContainer> listener = new ConcurrentKafkaListenerContainerFactory<>();
listener.setConsumerFactory(getConsumerFactory());
listener.setErrorHandler(new KafkaErrHandler());
return listener;
}
}
我该如何调试这个问题?如何解决这个问题?我的代码显然没有问题,因为它可以在 IDE 上运行,那么问题出在哪里?
解决方案
推荐阅读
- javascript - 用空格分割字符串,除非空格在最浅层的大括号或方括号内
- python - 我如何在 python 中实现内置对象和创建的类之间的操作?
- python - 在python中将uint64转换为字节数组
- elasticsearch - 如何在 Elasticsearch 中组合多个 aggs?
- html - 如何使按钮高度相同?
- r - 循环遍历列表时出错:“`[<-.data.frame`(`*tmp*`, , i, value = c(7L, 1L, 4L, 7L, 7L, : 新列会留下漏洞... "
- vba - 使用 VBA 在 Excel 中扫描时,将扫描的值从一个单元格移动到一个范围内的另一个单元格
- c# - 使用 .NET Core MSTest 运行单元测试:“未找到以下 TestContainer...”
- angularjs - 出现错误:添加 angular-bootstrap-lightbox 依赖项时出现 $injector:modulerr 模块错误
- python - Pyinstaller 的构建提前退出而没有错误