首页 > 解决方案 > 当我尝试生成从文件中读取的消息时,朴素的 Kafka Producer 无法正常工作

问题描述

我正在尝试使用 Java 编写一个天真的 Kafka Producer。该应用程序接受两个输入:

  1. 要生成消息的 Kafka 主题名称
  2. 包含要生成到 Kafka 的消息的文件路径

我写了以下代码。当我运行它时,我看到System.out.println语句打印了预期值,但由于某种原因没有向 Kafka 生成消息。我应该改变什么才能让它工作?

package com.myname.kafka.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NaiveKafkaProducer {

    private static final Properties properties = new Properties();

    private static Producer<String, String> producer;

    private static String topic;

    private static BufferedReader br;

    static {
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("request.required.acks", "all");
        System.out.println("Creating Kafka producer with the following properties :: " + properties);
        producer = new KafkaProducer<>(properties);
    }

    @SuppressWarnings("resource")
    public static void main(String[] args) throws IOException {
        try {
            if(args.length != 0) {
                topic = args[0];
                File file = new File(args[1]);
                br = new BufferedReader((Reader) new FileReader(file));
            }
        } catch (Exception e) {
            System.out.println("Check input arguments. Error thrown while populating arguments to local variables");
            e.printStackTrace();
        }

        String msg;
        while ((msg = br.readLine()) != null) {
            System.out.println("Message to publish : " + msg);
            System.out.println("Topic : " + topic);
            producer.send(new ProducerRecord<String, String>(topic, "", msg));
        }
        return;
    }
}

令人惊讶的是,以下代码有效(其中我对所有内容都进行了硬编码):

package com.myname.kafka.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NaiveKafkaProducer {

    private static final Properties properties = new Properties();

    private static Producer<String, String> producer;

    private static String topic;

    private static BufferedReader br;

    static {
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("request.required.acks", "all");
        System.out.println("Creating Kafka producer with the following properties :: " + properties);
        producer = new KafkaProducer<>(properties);
    }

    public static void main(String[] args) throws IOException {
            try {
                String[] msgs = new String[2];
                msgs[0] = "message 1";
                msgs[1] = "message 2";
                topic = "mytopic"
                for(String msg:msgs){
                    producer.send(new ProducerRecord<String, String>(topic, "", msg));
                }
                producer.close();
            } catch (Exception e) {
                System.out.println("Exception caught in main method while trying to produce the messages to Kafka");
                e.printStackTrace();
            }
    }
}

标签: javaapache-kafkakafka-producer-api

解决方案


在第二个片段中调用了一个关键方法,在第一个片段中缺少

producer.close();

来自该方法的文档:

关闭这个生产者。此方法会阻塞,直到所有先前发送的请求完成。

当您调用 methodproduce时,实际上并不意味着该消息已生成。方法返回你futureget()您可以通过调用产生方法的每个结果来等待产生每个消息。


推荐阅读