首页 > 解决方案 > Apache Kafka Consumer 未按顺序接收消息

问题描述

我正在我的项目中为 Kafka 尝试 POC,并使用 Confluent.kafka 库在 .net core 2.1 中创建了两个控制台应用程序。我已经在 Ubuntu 机器上安装了 Kafka,它运行正常。当我使用生产者控制台应用程序将数千条消息推送到 Kafka 并在消息中附加一个序列号时。当我在消费者控制台应用程序中使用这些消息时,消息的顺序不正确。只有一个生产者和消费者,它们都与一个主题相关联。下面是我的制片人的代码

public class Kafta
{
    private Dictionary<string, object> config;
    private string topicName;

    public Kafta(string topic)
    {
        config = new Dictionary<string, object>
        {
            {"bootstrap.servers","192.168.60.173:9092" }
        };
        topicName = topic;
    }
    public async void SendMessageAsync(string message)
    {
        using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
        {
            var msg = await producer.ProduceAsync(topicName, "userid", message);

            //producer.ProduceAsync("console", null, message);
            producer.Flush(500);
        }
    }
}

Producer的Program.cs static void main

static void Main(string[] args)
    {
        string topic = "tester2";
        long count = 1;
        Console.WriteLine("Starting to send message");
        Console.WriteLine("Write the message here: ");

        if(args.Length == 2)
        {
            topic = args[0];
            count = long.Parse(args[1]);
        }
        try
        {
            Console.WriteLine("Topic name " + topic);
            var message = Console.ReadLine();            
            var service = new Kafta(topic);

            for(var i = 0; i<count;i++)
            {
                var msg = message + " number " + i.ToString();
                Console.WriteLine("Message to Kafta: " + msg);
                service.SendMessageAsync(msg);
            }

        }
        catch (Exception ex)
        {
            Console.WriteLine("Exception occured " + ex.Message);
        }
        finally
        {
            Console.WriteLine("Press any key to exit");
            Console.Read();
        }
    }

消费者代码

static void Main(string[] args)
    {
        var config = new Dictionary<string, object>
        {
          { "group.id", "sample-consumer" },
          { "bootstrap.servers", "192.168.60.173:9092" },
          { "enable.auto.commit", "false"}
        };
        string topic = "tester2";
        if (args.Length == 1)
            topic = args[0];
        using (var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
        {
            consumer.Subscribe(new string[] { topic });                
            consumer.OnMessage += (_, msg) =>
            {
                Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
                consumer.CommitAsync(msg);

            };

            while (true)
            {
                consumer.Poll(100);
            }
        }
    }

生产者的输出

Message to Kafta: message number 0
Message to Kafta: message number 1
Message to Kafta: message number 2
Message to Kafta: message number 3
Message to Kafta: message number 4
Message to Kafta: message number 5
Message to Kafta: message number 6
Message to Kafta: message number 7
Message to Kafta: message number 8
Message to Kafta: message number 9

消费者的输出:

message number 4
message number 7
message number 0
message number 1
message number 2
message number 3
message number 5
message number 6
message number 8
message number 9

我是 Kafka 的新手,不确定我缺少什么才能使其正常工作。根据 Kafka 文档,我的用例保证了消息的排序,所以我一定有一些愚蠢的错误,无法弄清楚。

我可以使用 Kafka 的其他替代品吗?

谢谢

标签: .netapache-kafkakafka-consumer-apikafka-producer-apiconfluent-platform

解决方案


根据 Kafka 文档,消息的顺序得到保证

按分区。从您的问题中,您没有提到您的主题有多少个分区。您正在打印Topic: {msg.Topic} Partition: {msg.Partition},但这不是您帖子的输出..

在您的生产者中,您正在做“一劳永逸”,SendMessageAsync而不是验证代理是否实际收到了带有该方法返回值的消息。所以这是一种可能性 - 您的打印声明将按顺序排列,但消息不一定以这种方式到达代理。

如果代码中显示的消费者输出中的分区号始终相同,虽然我不熟悉 C# API,但看起来您正在使用非阻塞消费者消息侦听器。该OnMessage函数可能会在单独的线程中被调用,该线程不一定以保证的顺序写入标准输出。更好的测试可能是在每条消息中插入一个时间戳,而不仅仅是一个计数器

我可以使用 Kafka 的其他替代品吗?

存在大量其他 MQ 技术,例如 RabbitMQ,因此如果您不关心 Kafka 的持久性功能和其他 API(Streams 和 Connect),请随意使用这些


推荐阅读