.net - Apache Kafka Consumer 未按顺序接收消息
问题描述
我正在我的项目中为 Kafka 尝试 POC,并使用 Confluent.kafka 库在 .net core 2.1 中创建了两个控制台应用程序。我已经在 Ubuntu 机器上安装了 Kafka,它运行正常。当我使用生产者控制台应用程序将数千条消息推送到 Kafka 并在消息中附加一个序列号时。当我在消费者控制台应用程序中使用这些消息时,消息的顺序不正确。只有一个生产者和消费者,它们都与一个主题相关联。下面是我的制片人的代码
public class Kafta
{
private Dictionary<string, object> config;
private string topicName;
public Kafta(string topic)
{
config = new Dictionary<string, object>
{
{"bootstrap.servers","192.168.60.173:9092" }
};
topicName = topic;
}
public async void SendMessageAsync(string message)
{
using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
{
var msg = await producer.ProduceAsync(topicName, "userid", message);
//producer.ProduceAsync("console", null, message);
producer.Flush(500);
}
}
}
Producer的Program.cs static void main
static void Main(string[] args)
{
string topic = "tester2";
long count = 1;
Console.WriteLine("Starting to send message");
Console.WriteLine("Write the message here: ");
if(args.Length == 2)
{
topic = args[0];
count = long.Parse(args[1]);
}
try
{
Console.WriteLine("Topic name " + topic);
var message = Console.ReadLine();
var service = new Kafta(topic);
for(var i = 0; i<count;i++)
{
var msg = message + " number " + i.ToString();
Console.WriteLine("Message to Kafta: " + msg);
service.SendMessageAsync(msg);
}
}
catch (Exception ex)
{
Console.WriteLine("Exception occured " + ex.Message);
}
finally
{
Console.WriteLine("Press any key to exit");
Console.Read();
}
}
消费者代码
static void Main(string[] args)
{
var config = new Dictionary<string, object>
{
{ "group.id", "sample-consumer" },
{ "bootstrap.servers", "192.168.60.173:9092" },
{ "enable.auto.commit", "false"}
};
string topic = "tester2";
if (args.Length == 1)
topic = args[0];
using (var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{
consumer.Subscribe(new string[] { topic });
consumer.OnMessage += (_, msg) =>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
consumer.CommitAsync(msg);
};
while (true)
{
consumer.Poll(100);
}
}
}
生产者的输出
Message to Kafta: message number 0
Message to Kafta: message number 1
Message to Kafta: message number 2
Message to Kafta: message number 3
Message to Kafta: message number 4
Message to Kafta: message number 5
Message to Kafta: message number 6
Message to Kafta: message number 7
Message to Kafta: message number 8
Message to Kafta: message number 9
消费者的输出:
message number 4
message number 7
message number 0
message number 1
message number 2
message number 3
message number 5
message number 6
message number 8
message number 9
我是 Kafka 的新手,不确定我缺少什么才能使其正常工作。根据 Kafka 文档,我的用例保证了消息的排序,所以我一定有一些愚蠢的错误,无法弄清楚。
我可以使用 Kafka 的其他替代品吗?
谢谢
解决方案
根据 Kafka 文档,消息的顺序得到保证
仅按分区。从您的问题中,您没有提到您的主题有多少个分区。您正在打印Topic: {msg.Topic} Partition: {msg.Partition}
,但这不是您帖子的输出..
在您的生产者中,您正在做“一劳永逸”,SendMessageAsync
而不是验证代理是否实际收到了带有该方法返回值的消息。所以这是一种可能性 - 您的打印声明将按顺序排列,但消息不一定以这种方式到达代理。
如果代码中显示的消费者输出中的分区号始终相同,虽然我不熟悉 C# API,但看起来您正在使用非阻塞消费者消息侦听器。该OnMessage
函数可能会在单独的线程中被调用,该线程不一定以保证的顺序写入标准输出。更好的测试可能是在每条消息中插入一个时间戳,而不仅仅是一个计数器
我可以使用 Kafka 的其他替代品吗?
存在大量其他 MQ 技术,例如 RabbitMQ,因此如果您不关心 Kafka 的持久性功能和其他 API(Streams 和 Connect),请随意使用这些
推荐阅读
- python - “NoneType”对象没有属性“更新”
- operating-system - pause() 是“无条件地”阻塞进程还是“在等待队列中”?
- css - 如何对齐 flex-start 项目的中心?
- machine-learning - 为什么我们在 kmeans 聚类方法中使用 kmeans.fit 函数?
- jenkins - 在詹金斯执行一些代码时,它会抛出一个错误“jq command not found”
- php - 为什么使用这个
- c# - Net Core:TagBuilder 编辑现有属性
- websocket - Websocket 协议是否管理大数据块的发送
- excel - 如何将行号存储在 VBA 中 Worksheet_SelectionChange 事件的变量中?
- python - PySpark 两个值的总和