首页 > 解决方案 > 如何在按钮单击时使用最新消息并将其写入 txt 文件?

问题描述

我写了一个简单的生产者,它通过按钮单击向 Kafka 发送消息。

class Producer : IProducer
{
    public void produce(string msg, string topic)
    {
        var config = new Dictionary<string, object>
        {
            {"bootstrap.servers", "localhost:9092" }
        };

        using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
        {
             var m = producer.ProduceAsync(topic, null, msg).Result;
        }
    }
}

我的问题是 - 如何编写一个消费者,它会在按钮单击时引发并接收来自 Kafka 的最新发布消息并将其保存到本地磁盘上某处的 txt 文件中?这是我现在写的:

public class Consumer
{
    public void Consume(string topic)
    {
        var config = new Dictionary<string, object>
        {
            {"group.id", "consumer-latest-msg" },
            {"bootstrap.servers", "localhost:9092" },
            {"auto.commit.interval.ms", 5000 },
            {"auto.offset.reset", "latest" }
        };

        using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
        {
                //what to do?
        }
    }
}

标签: c#.netapache-kafkakafka-consumer-apiconfluent-platform

解决方案


注意:在我开始写答案之前只是一个免责声明:我不熟悉 C#,所以我将尝试从概念上回答这个问题。

我认为您在消费空间内需要做的就是:

  1. subscribe()通过调用消费者的方法订阅主题
  2. 调用poll()消费者的方法 --> 这将返回新写入的记录
  3. 循环处理所有记录,即根据需要将每条消息(键和/或值)打包到一个新文件中。

我在 Java 中快速尝试过,这里是代码片段(包含提到的 3 个步骤)供您参考:

consumer.subscribe(Collections.singleton("test-topic"));

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for(ConsumerRecord<String, String> record : records) {

                fWriter = new FileWriter("data/msg-" + i++);
                pWriter = new PrintWriter(fWriter);
                pWriter.print(record.value());
                pWriter.close();
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        consumer.close();
    }

在运行此消费者客户端并从控制台生产者生成几条消息时,在指定目录(数据)中创建了 2 个文件。

此外,无限循环的原因是为了确保消费者继续运行,因为如果消费者不定期轮询,那么消费者组协调器会假定消费者已经死亡并在消费者组内触发重新平衡。

我希望这有帮助!


推荐阅读