首页 > 解决方案 > 无法产生消费消息

问题描述

我是弹簧集成卡夫卡的新手。我正在 docker 上设置 kafka,但无法生成和使用消息。这是我的 docker compose 文件 任何帮助将不胜感激。谢谢

zookeeper:
        image: wurstmeister/zookeeper
        ports:
         - "2181:2181"

kafka:
        image: wurstmeister/kafka
        ports:
           - "9092:9092"
           - "9999:9999"
        environment:
           - KAFKA_ADVERTISED_HOST_NAME=20.0.201.75
           - KAFKA_LISTENERS=INSIDE://:9090,OUTSIDE://:9092
           - KAFKA_ADVERTISED_LISTENERS=INSIDE://:9090,OUTSIDE://20.0.201.75:9092
           - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
           - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
           - JMX_PORT=9999
           - KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
           - KAFKA_JMX_OPTS= -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=9999
        volumes:
           - /var/run/docker.sock:/var/run/docker.sock

我正在向 20.0.201.75:9092 发送消息,并使用来自 20.0.201.75:9092 和 20.0.201.75 的消息是此处的 dockerhostip。生产者端代码是

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    public String sendData() {
        System.out.println("hewe");
        for(int i=0;i<10;i++) {
        ListenableFuture<SendResult<String, String>> s = kafkaTemplate.send("dms", Integer.valueOf(0),String.valueOf(i+230), "added message in dms");
            try {
                s.get();
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("sent successfully");

        return "";

    }

消费者端代码是:

    @Bean("kafkaListenerContainerFactory")
    public KafkaMessageListenerContainer<String, String> container() throws Exception {
        ContainerProperties properties = new ContainerProperties("dms");
        KafkaMessageListenerContainer<String,String> kmlc = new KafkaMessageListenerContainer<String,String>(consumerFactory(), properties);
        // set more properties
        return kmlc;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "20.0.201.75:9092");
        // set more properties
        return new DefaultKafkaConsumerFactory<>(props);
    }


标签: dockerapache-kafkaapache-zookeeper

解决方案


推荐阅读