java - 当我尝试生成从文件中读取的消息时,朴素的 Kafka Producer 无法正常工作
问题描述
我正在尝试使用 Java 编写一个天真的 Kafka Producer。该应用程序接受两个输入:
- 要生成消息的 Kafka 主题名称
- 包含要生成到 Kafka 的消息的文件路径
我写了以下代码。当我运行它时,我看到System.out.println
语句打印了预期值,但由于某种原因没有向 Kafka 生成消息。我应该改变什么才能让它工作?
package com.myname.kafka.producer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NaiveKafkaProducer {
private static final Properties properties = new Properties();
private static Producer<String, String> producer;
private static String topic;
private static BufferedReader br;
static {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks", "all");
System.out.println("Creating Kafka producer with the following properties :: " + properties);
producer = new KafkaProducer<>(properties);
}
@SuppressWarnings("resource")
public static void main(String[] args) throws IOException {
try {
if(args.length != 0) {
topic = args[0];
File file = new File(args[1]);
br = new BufferedReader((Reader) new FileReader(file));
}
} catch (Exception e) {
System.out.println("Check input arguments. Error thrown while populating arguments to local variables");
e.printStackTrace();
}
String msg;
while ((msg = br.readLine()) != null) {
System.out.println("Message to publish : " + msg);
System.out.println("Topic : " + topic);
producer.send(new ProducerRecord<String, String>(topic, "", msg));
}
return;
}
}
令人惊讶的是,以下代码有效(其中我对所有内容都进行了硬编码):
package com.myname.kafka.producer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NaiveKafkaProducer {
private static final Properties properties = new Properties();
private static Producer<String, String> producer;
private static String topic;
private static BufferedReader br;
static {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks", "all");
System.out.println("Creating Kafka producer with the following properties :: " + properties);
producer = new KafkaProducer<>(properties);
}
public static void main(String[] args) throws IOException {
try {
String[] msgs = new String[2];
msgs[0] = "message 1";
msgs[1] = "message 2";
topic = "mytopic"
for(String msg:msgs){
producer.send(new ProducerRecord<String, String>(topic, "", msg));
}
producer.close();
} catch (Exception e) {
System.out.println("Exception caught in main method while trying to produce the messages to Kafka");
e.printStackTrace();
}
}
}
解决方案
在第二个片段中调用了一个关键方法,在第一个片段中缺少
producer.close();
来自该方法的文档:
关闭这个生产者。此方法会阻塞,直到所有先前发送的请求完成。
当您调用 methodproduce
时,实际上并不意味着该消息已生成。方法返回你future
。get()
您可以通过调用产生方法的每个结果来等待产生每个消息。
推荐阅读
- java - gradle 和外部 jar 的问题 - 找不到 jar
- php - 如何在 php codeigniter 中创建树视图层次结构?
- javascript - 如何根据另一个对象数组过滤一个对象数组?
- php - 如何检查不同表上的值?
- github - 这里没有 GitHub Pages 站点
- laravel - Laravel assertJson 失败
- laravel - fopen('my directory') 无法打开流:没有这样的文件或目录
- reactjs - 我应该在反应中为多步形式使用单独的路线或步骤吗?
- c# - File.Exists 在 UWP 项目中不起作用吗?
- php - 如何使用 Symfony 和 API 平台过滤子对象上的 GraphQL?