首页 > 解决方案 > 如何在Kafka中使用不同编程语言的多个消费者用于相同的组ID

问题描述

我想在 Kafka(多种编程语言)中为一个主题创建负载平衡。所以我做了以下。

  1. 创建了一个有 4 个分区的主题。
  2. 在 C# 中创建了一个生产者(每秒生成消息)
  3. 在 C# 中创建了一个消费者(consumer1)(消费者组:testConsumerGrp)
  4. 在 NodeJs 中再创建一个消费者(consumer2)(消费者组:testConsumerGrp)

我在 C# 中使用了confluent.kafka ,在NodeJs中使用了 kafkajs。

我打开生产者并保持运行。

  1. 如果我只运行 C# 消费者,它工作正常。
  2. 如果我只运行 NodeJs 消费者,它工作正常。
  3. 如果我运行多个 C# 消费者(只有 c# 并且少于 4 个实例),它工作正常。
  4. 如果我运行多个 NodeJs 消费者(只有 NodeJs 和少于 4 个实例),它工作正常。
  5. 如果我运行一个 C# 和一个 NodeJs 消费者,那么我会收到Inconsistent group protocol错误

我们不能为同一个消费者群体使用两种编程语言吗?

C# 中的生产者 - Windows 窗体

using System;
using System.Collections.Generic;
using System.Windows.Forms;
using Confluent.Kafka;

namespace KafkaProducer
{
    public partial class frmProducer : Form
    {
        const string TOPIC = "testTopic";
        private IProducer<Null, string> pBuilder;

        public frmProducer()
        {
            InitializeComponent();
        }

        private async void timer1_Tick(object sender, EventArgs e)
        {
            try
            {
                // instead of sending some value, we send current DateTime as value
                var dr = await pBuilder.ProduceAsync(TOPIC, new Message<Null, string> { Value = DateTime.Now.ToLongTimeString() });

                // once done, add the value into list box
                listBox1.Items.Add($"{dr.Value} - Sent to Partition: {dr.Partition.Value}");
                listBox1.TopIndex = listBox1.Items.Count - 1;
            }
            catch (ProduceException<Null, string> err)
            {
                MessageBox.Show($"Failed to deliver msg: {err.Error.Reason}");
            }
        }

        private void frmProducer_Load(object sender, EventArgs e)
        {
            ProducerConfig config = new ProducerConfig { BootstrapServers = "localhost:9092" };
            pBuilder = new ProducerBuilder<Null, string>(config).Build();

            timer1.Enabled = true;
        }

        private void frmProducer_FormClosing(object sender, FormClosingEventArgs e)
        {
            timer1.Enabled = false;
            pBuilder.Dispose();
        }
    }
}

C# 中的消费者 - Windows 窗体

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using Confluent.Kafka;

namespace KafkaConsumer
{
    public partial class frmConsumer : Form
    {
        CancellationTokenSource cts = new CancellationTokenSource();

        public frmConsumer()
        {
            InitializeComponent();
        }

        private void StartListen()
        {
            var conf = new ConsumerConfig
            {
                GroupId = "test-consumer-group",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe("testTopic");

                //TopicPartitionTimestamp tpts = new TopicPartitionTimestamp("testTopic", new Partition(), Timestamp.  )
                //c.OffsetsForTimes()

                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);

                            // Adding the consumed values into the UI
                            listBox1.Invoke(new Action(() =>
                            {
                                listBox1.Items.Add($"{cr.Value} - from Partition: {cr.Partition.Value}" );
                                listBox1.TopIndex = listBox1.Items.Count - 1;
                            }));
                        }
                        catch (ConsumeException err)
                        {
                            MessageBox.Show($"Error occured: {err.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ensure the consumer leaves the group cleanly and final offsets are committed.
                    c.Close();
                }
            }
        }

        private void Form1_FormClosing(object sender, FormClosingEventArgs e)
        {
            cts.Cancel();
        }

        private async void frmConsumer_Load(object sender, EventArgs e)
        {
            await Task.Run(() => StartListen());
        }
    }
}

NodeJs 中的消费者

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ["localhost:9092"]
});

const consumer = kafka.consumer({ groupId: "test-consumer-group" });

const run = async () => {
  // Consuming
  await consumer.connect();
  await consumer.subscribe({ topic: "testTopic", fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log(message.value.toString() + " - from Partition " + partition);
    }
  });
};

run().catch(console.error);

如果我同时运行 C# 和 NodeJs 消费者,则会Inconsistent group protocol出现错误。

如何在 Kafka 中使用来自不同编程语言的多个消费者?

标签: c#node.jsapache-kafkakafka-consumer-api

解决方案


简短的回答

这可能与您想象的不同语言的关系不大。这是由于 2 个消费者客户端(及其库)的协议不同而发生的。

尝试在两个消费者客户端中设置以下属性:

partition.assignment.strategy = round-robin

注意:我刚刚提供了通用属性,因此您需要查看客户的特定语言版本。您甚至可以将其设置为range但保持一致。

解释是这样的

阅读Kafka wiki上的协议以找出根本原因Inconsistent group protocol- 事实证明这是在以下情况下返回的:

  1. 有一个活跃的消费者组,其中有活跃/正在运行的消费者
  2. 并且一个新的消费者到达加入这个组的协议类型(或一组协议)与当前组的协议类型不兼容

现在,在您使用的客户端库中,可能存在各个方面,ConsumerGroupProtocolMetadata但其中一个方面似乎确实有所不同,即partition.assignment.strategy.

dotnet 客户端librdkafka的包装器,默认上述属性的值为range. 这是参考

然而

kafkajs根据文档将其默认为round-robin- 因此导致不一致。

希望这可以帮助。


推荐阅读