c# - 如何在Kafka中使用不同编程语言的多个消费者用于相同的组ID
问题描述
我想在 Kafka(多种编程语言)中为一个主题创建负载平衡。所以我做了以下。
- 创建了一个有 4 个分区的主题。
- 在 C# 中创建了一个生产者(每秒生成消息)
- 在 C# 中创建了一个消费者(consumer1)(消费者组:testConsumerGrp)
- 在 NodeJs 中再创建一个消费者(consumer2)(消费者组:testConsumerGrp)
我在 C# 中使用了confluent.kafka ,在NodeJs中使用了 kafkajs。
我打开生产者并保持运行。
- 如果我只运行 C# 消费者,它工作正常。
- 如果我只运行 NodeJs 消费者,它工作正常。
- 如果我运行多个 C# 消费者(只有 c# 并且少于 4 个实例),它工作正常。
- 如果我运行多个 NodeJs 消费者(只有 NodeJs 和少于 4 个实例),它工作正常。
- 如果我运行一个 C# 和一个 NodeJs 消费者,那么我会收到
Inconsistent group protocol
错误
我们不能为同一个消费者群体使用两种编程语言吗?
C# 中的生产者 - Windows 窗体
using System;
using System.Collections.Generic;
using System.Windows.Forms;
using Confluent.Kafka;
namespace KafkaProducer
{
public partial class frmProducer : Form
{
const string TOPIC = "testTopic";
private IProducer<Null, string> pBuilder;
public frmProducer()
{
InitializeComponent();
}
private async void timer1_Tick(object sender, EventArgs e)
{
try
{
// instead of sending some value, we send current DateTime as value
var dr = await pBuilder.ProduceAsync(TOPIC, new Message<Null, string> { Value = DateTime.Now.ToLongTimeString() });
// once done, add the value into list box
listBox1.Items.Add($"{dr.Value} - Sent to Partition: {dr.Partition.Value}");
listBox1.TopIndex = listBox1.Items.Count - 1;
}
catch (ProduceException<Null, string> err)
{
MessageBox.Show($"Failed to deliver msg: {err.Error.Reason}");
}
}
private void frmProducer_Load(object sender, EventArgs e)
{
ProducerConfig config = new ProducerConfig { BootstrapServers = "localhost:9092" };
pBuilder = new ProducerBuilder<Null, string>(config).Build();
timer1.Enabled = true;
}
private void frmProducer_FormClosing(object sender, FormClosingEventArgs e)
{
timer1.Enabled = false;
pBuilder.Dispose();
}
}
}
C# 中的消费者 - Windows 窗体
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using Confluent.Kafka;
namespace KafkaConsumer
{
public partial class frmConsumer : Form
{
CancellationTokenSource cts = new CancellationTokenSource();
public frmConsumer()
{
InitializeComponent();
}
private void StartListen()
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("testTopic");
//TopicPartitionTimestamp tpts = new TopicPartitionTimestamp("testTopic", new Partition(), Timestamp. )
//c.OffsetsForTimes()
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
// Adding the consumed values into the UI
listBox1.Invoke(new Action(() =>
{
listBox1.Items.Add($"{cr.Value} - from Partition: {cr.Partition.Value}" );
listBox1.TopIndex = listBox1.Items.Count - 1;
}));
}
catch (ConsumeException err)
{
MessageBox.Show($"Error occured: {err.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
private void Form1_FormClosing(object sender, FormClosingEventArgs e)
{
cts.Cancel();
}
private async void frmConsumer_Load(object sender, EventArgs e)
{
await Task.Run(() => StartListen());
}
}
}
NodeJs 中的消费者
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: 'my-app',
brokers: ["localhost:9092"]
});
const consumer = kafka.consumer({ groupId: "test-consumer-group" });
const run = async () => {
// Consuming
await consumer.connect();
await consumer.subscribe({ topic: "testTopic", fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(message.value.toString() + " - from Partition " + partition);
}
});
};
run().catch(console.error);
如果我同时运行 C# 和 NodeJs 消费者,则会Inconsistent group protocol
出现错误。
如何在 Kafka 中使用来自不同编程语言的多个消费者?
解决方案
简短的回答:
这可能与您想象的不同语言的关系不大。这是由于 2 个消费者客户端(及其库)的协议不同而发生的。
尝试在两个消费者客户端中设置以下属性:
partition.assignment.strategy = round-robin
注意:我刚刚提供了通用属性,因此您需要查看客户的特定语言版本。您甚至可以将其设置为range
但保持一致。
解释是这样的:
阅读Kafka wiki上的协议以找出根本原因Inconsistent group protocol
- 事实证明这是在以下情况下返回的:
- 有一个活跃的消费者组,其中有活跃/正在运行的消费者
- 并且一个新的消费者到达加入这个组的协议类型(或一组协议)与当前组的协议类型不兼容
现在,在您使用的客户端库中,可能存在各个方面,ConsumerGroupProtocolMetadata
但其中一个方面似乎确实有所不同,即partition.assignment.strategy
.
dotnet 客户端是librdkafka的包装器,默认上述属性的值为range
. 这是参考。
然而
kafkajs根据文档将其默认为round-robin
- 因此导致不一致。
希望这可以帮助。
推荐阅读
- django - 将旧的 Django URL 更改为新路径
- haskell - 使用 Aeson 在 Haskell 中生成 JSON 的上下文
- ruby-on-rails - 用户索引测试失败 Rails 教程
- react-native - React Native - Redux - 如何从非组件调用操作?
- tensorflow - Tensorflow:在一个网络中训练两个网络
- ruby - 如何应用 if/then 语句来反驳/断言方法
- recaptcha - recaptcha v3 前端返回奇怪的、空洞的(无效的?)结果与 )]}'
- c - 利用金丝雀保护的缓冲区溢出
- machine-learning - 如何学习和创建语音识别系统?
- mysql - MySql 在主查询中使用 group by 排序子查询