spring-boot - Spring Kafka 接收并转发另一个代理
问题描述
我spring-kafka
用来接收和发送消息。
我要做的是阅读一些消息X kafka broker
并进行一些增强并发送给另一个Y kafka broker
这是我的 bean 和配置。
@Bean
public ProducerFactory<String, AuditLog> forwarderKafkaProducerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, AuditLog> forwarderKafkaClient() {
return new KafkaTemplate<>(forwarderKafkaProducerFactory());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, AuditLog>> kafkaListenerContainerFactoryV2() {
ConcurrentKafkaListenerContainerFactory<String, AuditLog> factory = new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "receiver-sender");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<String, AuditLog> consumerFactory = new DefaultKafkaConsumerFactory<>(
configs, new StringDeserializer(), new JsonDeserializer<>(AuditLog.class));
factory.setConsumerFactory(consumerFactory);
return factory;
}
如您所见,一个经纪人在 9092 中运行,另一个经纪人在 9093 中运行
接收/转发逻辑就是这样。
@KafkaListener(topics = "audit_log", containerFactory = "kafkaListenerContainerFactoryV2")
public void listen(@Payload AuditLog payload) {
log.info("Audit log [{}]", payload);
if (!payload.isForwarded()) {
String key = "some-value";
String system = String.join("/", key, payload.getSystem());
payload.setSystem(system);
payload.setForwarded(true);
template.send("audit_log", key, payload);
}
}
代码片段配置上方的模板是正确的。我可以通过查看来确认((DefaultKafkaProducerFactory)template.getProducerFactory()).getConfigurationProperties()
在此配置中,我可以接收来自 9092 的消息,但无法发送到 9093。模板始终发送到 9092。
谢谢。
解决方案
问题是 Kafka 设置不正确。我在当前环境中使用wurstmeister/kafka。
Docker compose 文件是这样的。
zookeeper:
image: wurstmeister/zookeeper:3.4.6
kafka:
image: wurstmeister/kafka:2.12-2.5.0
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_HOST_NAME: "127.0.0.1"
depends_on:
- zookeeper
然后我只添加第二个卡夫卡
zookeeper2:
image: wurstmeister/zookeeper:3.4.6
kafka2:
image: wurstmeister/kafka:2.12-2.5.0
ports:
- "9093:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper2:2181"
KAFKA_ADVERTISED_HOST_NAME: "127.0.0.1"
depends_on:
- zookeeper2
但是注册表所有者警告:) 我现在在写这个答案时看到了这个警告。
修改 docker-compose.yml 中的 KAFKA_ADVERTISED_HOST_NAME 以匹配您的 docker 主机 IP(注意:如果要运行多个代理,请勿使用 localhost 或 127.0.0.1 作为主机 IP。)
我试过 bitnami zookeeper/kafka
version: "2"
services:
zookeeper1:
image: docker.io/bitnami/zookeeper:3.7
ports:
- "2001:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka1:
image: docker.io/bitnami/kafka:3
ports:
- "9201:9201"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper1:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9201
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka1:9092,EXTERNAL://localhost:9201
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper1
zookeeper2:
image: docker.io/bitnami/zookeeper:3.7
ports:
- "2002:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka2:
image: docker.io/bitnami/kafka:3
ports:
- "9202:9202"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper2:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9202
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka2:9092,EXTERNAL://localhost:9202
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper2
有问题的代码可以在此设置下正确运行。
推荐阅读
- php - php设置cookie过期时间
- r - 为什么 group_by 不能在 dplyr 中使用 max(colSums)
- c# - 是否可以禁用 DataGridView 中单个列的交替行?
- angular5 - Obervable.create 未在 angular5 中使用 ngrx 触发
- php - 将 PDF 下载到浏览器
- linux-kernel - 在 Linux 中检查进程的堆栈使用情况
- azure - 如何集成驻留在不同 Azure B2c 租户中的两个 WebApp 以获得单点登录体验?
- javascript - 将数据复制到剪贴板而不选择任何文本
- javascript - 如何复制仅保留当前道具的对象?
- karate - 如何在空手道框架中设置代理