docker - 无法产生消费消息
问题描述
我是弹簧集成卡夫卡的新手。我正在 docker 上设置 kafka,但无法生成和使用消息。这是我的 docker compose 文件 任何帮助将不胜感激。谢谢
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
- "9999:9999"
environment:
- KAFKA_ADVERTISED_HOST_NAME=20.0.201.75
- KAFKA_LISTENERS=INSIDE://:9090,OUTSIDE://:9092
- KAFKA_ADVERTISED_LISTENERS=INSIDE://:9090,OUTSIDE://20.0.201.75:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- JMX_PORT=9999
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- KAFKA_JMX_OPTS= -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=9999
volumes:
- /var/run/docker.sock:/var/run/docker.sock
我正在向 20.0.201.75:9092 发送消息,并使用来自 20.0.201.75:9092 和 20.0.201.75 的消息是此处的 dockerhostip。生产者端代码是
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public String sendData() {
System.out.println("hewe");
for(int i=0;i<10;i++) {
ListenableFuture<SendResult<String, String>> s = kafkaTemplate.send("dms", Integer.valueOf(0),String.valueOf(i+230), "added message in dms");
try {
s.get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("sent successfully");
return "";
}
消费者端代码是:
@Bean("kafkaListenerContainerFactory")
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties("dms");
KafkaMessageListenerContainer<String,String> kmlc = new KafkaMessageListenerContainer<String,String>(consumerFactory(), properties);
// set more properties
return kmlc;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "20.0.201.75:9092");
// set more properties
return new DefaultKafkaConsumerFactory<>(props);
}
解决方案
推荐阅读
- php - 用户角色 - 添加角色功能 php 片段 - Wordpress
- android - 不会停留在仪表板上并返回登录活动
- swift - 快速向泛型提供类元数据(可编码)
- .htaccess - 如何将域指向 FTP 中的文件夹(不是根目录)?
- mysql - SQL中UNION后删除重复项
- python - python - 如何在Python中的熊猫中对指定时间段的日期时间进行分组并将聚合函数应用于日期时间?
- cmake - 我如何告诉介子设置有关依赖项的位置?
- ansible - 剧本 host_vars
- database - MongoDB 的索引是 Alternative 1 还是 Alternative 2 还是 Alternative 3?
- python - 在 pygame 中使用多处理?