c# - 如何在按钮单击时使用最新消息并将其写入 txt 文件?
问题描述
我写了一个简单的生产者,它通过按钮单击向 Kafka 发送消息。
class Producer : IProducer
{
public void produce(string msg, string topic)
{
var config = new Dictionary<string, object>
{
{"bootstrap.servers", "localhost:9092" }
};
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
var m = producer.ProduceAsync(topic, null, msg).Result;
}
}
}
我的问题是 - 如何编写一个消费者,它会在按钮单击时引发并接收来自 Kafka 的最新发布消息并将其保存到本地磁盘上某处的 txt 文件中?这是我现在写的:
public class Consumer
{
public void Consume(string topic)
{
var config = new Dictionary<string, object>
{
{"group.id", "consumer-latest-msg" },
{"bootstrap.servers", "localhost:9092" },
{"auto.commit.interval.ms", 5000 },
{"auto.offset.reset", "latest" }
};
using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
//what to do?
}
}
}
解决方案
注意:在我开始写答案之前只是一个免责声明:我不熟悉 C#,所以我将尝试从概念上回答这个问题。
我认为您在消费空间内需要做的就是:
subscribe()
通过调用消费者的方法订阅主题- 调用
poll()
消费者的方法 --> 这将返回新写入的记录 - 循环处理所有记录,即根据需要将每条消息(键和/或值)打包到一个新文件中。
我在 Java 中快速尝试过,这里是代码片段(包含提到的 3 个步骤)供您参考:
consumer.subscribe(Collections.singleton("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for(ConsumerRecord<String, String> record : records) {
fWriter = new FileWriter("data/msg-" + i++);
pWriter = new PrintWriter(fWriter);
pWriter.print(record.value());
pWriter.close();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
consumer.close();
}
在运行此消费者客户端并从控制台生产者生成几条消息时,在指定目录(数据)中创建了 2 个文件。
此外,无限循环的原因是为了确保消费者继续运行,因为如果消费者不定期轮询,那么消费者组协调器会假定消费者已经死亡并在消费者组内触发重新平衡。
我希望这有帮助!
推荐阅读
- java - 流上的 reduce() 操作似乎正在修改数据源(列表)Stream API Java 8
- css - 样式文本(使用 CSS)以匹配 XD 中的无边框字体
- javascript - React 中的 Google 身份验证
- sql - ORACLE SQL:制作仅从特定日期获取数据的视图
- python - 即使已激活 Python 应用程序也未在 virtualenv 中运行
- android - 尝试使用 enableAutoManage() 函数时,GoogleApiClient 给出“错误的第一个参数类型”
- sorting - 将“序列号”列添加到反应虚拟化材料 UI 表?
- wordpress - 管理面板中的 WordPress 用户特定内容
- code-coverage - IAR CodeCoverage 将特定文件排除在覆盖范围之外
- javascript - 通过Wordpress在服务器上执行一个JS文件