首页 > 解决方案 > Spring Kafka 接收并转发另一个代理

问题描述

spring-kafka用来接收和发送消息。

我要做的是阅读一些消息X kafka broker并进行一些增强并发送给另一个Y kafka broker

这是我的 bean 和配置。

@Bean
public ProducerFactory<String, AuditLog> forwarderKafkaProducerFactory() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093");
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configs);
}

@Bean
public KafkaTemplate<String, AuditLog> forwarderKafkaClient() {
    return new KafkaTemplate<>(forwarderKafkaProducerFactory());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, AuditLog>> kafkaListenerContainerFactoryV2() {
    ConcurrentKafkaListenerContainerFactory<String, AuditLog> factory = new ConcurrentKafkaListenerContainerFactory<>();
    Map<String, Object> configs = new HashMap<>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, "receiver-sender");
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ConsumerFactory<String, AuditLog> consumerFactory = new DefaultKafkaConsumerFactory<>(
            configs, new StringDeserializer(), new JsonDeserializer<>(AuditLog.class));
    factory.setConsumerFactory(consumerFactory);
    return factory;
}

如您所见,一个经纪人在 9092 中运行,另一个经纪人在 9093 中运行

接收/转发逻辑就是这样。

@KafkaListener(topics = "audit_log", containerFactory = "kafkaListenerContainerFactoryV2")
public void listen(@Payload AuditLog payload) {
    log.info("Audit log [{}]", payload);
    if (!payload.isForwarded()) {
        String key = "some-value";
        String system = String.join("/", key, payload.getSystem());
        payload.setSystem(system);
        payload.setForwarded(true);
        template.send("audit_log", key, payload);
    }
}

代码片段配置上方的模板是正确的。我可以通过查看来确认((DefaultKafkaProducerFactory)template.getProducerFactory()).getConfigurationProperties()

在此配置中,我可以接收来自 9092 的消息,但无法发送到 9093。模板始终发送到 9092。

谢谢。

标签: spring-bootapache-kafkaspring-kafka

解决方案


问题是 Kafka 设置不正确。我在当前环境中使用wurstmeister/kafka

Docker compose 文件是这样的。

zookeeper:
  image: wurstmeister/zookeeper:3.4.6
kafka:
  image: wurstmeister/kafka:2.12-2.5.0
  ports:
    - "9092:9092"
  environment:
    KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
    KAFKA_ADVERTISED_HOST_NAME: "127.0.0.1"
  depends_on:
    - zookeeper

然后我只添加第二个卡夫卡

zookeeper2:
  image: wurstmeister/zookeeper:3.4.6
kafka2:
  image: wurstmeister/kafka:2.12-2.5.0
  ports:
    - "9093:9092"
  environment:
    KAFKA_ZOOKEEPER_CONNECT: "zookeeper2:2181"
    KAFKA_ADVERTISED_HOST_NAME: "127.0.0.1"
  depends_on:
    - zookeeper2

但是注册表所有者警告:) 我现在在写这个答案时看到了这个警告。

修改 docker-compose.yml 中的 KAFKA_ADVERTISED_HOST_NAME 以匹配您的 docker 主机 IP(注意:如果要运行多个代理,请勿使用 localhost 或 127.0.0.1 作为主机 IP。)

我试过 bitnami zookeeper/kafka

version: "2"

services:
  zookeeper1:
    image: docker.io/bitnami/zookeeper:3.7
    ports:
      - "2001:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka1:
    image: docker.io/bitnami/kafka:3
    ports:
      - "9201:9201"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper1:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9201
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka1:9092,EXTERNAL://localhost:9201
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper1

  zookeeper2:
    image: docker.io/bitnami/zookeeper:3.7
    ports:
      - "2002:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka2:
    image: docker.io/bitnami/kafka:3
    ports:
      - "9202:9202"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper2:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9202
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka2:9092,EXTERNAL://localhost:9202
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper2

有问题的代码可以在此设置下正确运行。


推荐阅读