首页 > 解决方案 > 代理不是按照 Kafka 中的时间戳顺序提交消息吗?

问题描述

我有一个场景,我部署了多个产生消息的生产者。该消息包含一个名为eventTime的属性,它对应于引发该特定事件的时间。我想将此eventTime设置为ProducerRecord. 我已经设置好了。有时可能会发生:

  1. 假设我要生成 2 个时间戳为 100,102 的事件。
  2. 我有 2 个生产者AB

现在,A可能首先收到带有102 时间戳 的消息,然后B可能会收到带有100 时间戳的消息

也就是说,A将首先生成具有102时间戳的消息,然后B将生成具有100时间戳的消息。

因此,消息的顺序将是 102, 100 而不是 100, 102。

我尝试了以下方法,但没有一个起作用。

batch.sizeand设置linger.ms为高值 (2216384,1500) - 我正在发送一条带有long类型键的消息,消息为String类型。

  //Specify buffer size in config
  props.put("batch.size", 2216384);

  //Reduce the no of requests less than 0   
  props.put("linger.ms", 1500);

new Thread(()->{produce(101,1540964511l,"second","topic-name");}).start();
new Thread(()->{produce(101,1540964498l,"first","topic-name");}).start();

如您所见,上面的第一个时间戳小于第二个时间戳。理想情况下,应该首先消费第一消息,然后消费第二条消息,但事实并非如此。请注意,我使用 101 作为两者的键,并且对于这两个消息,分区也设置为 0。

public static void produce(long id, long timestamp, String msg, String topic)
   {
      System.out.println("producing .. "+msg);
         producer.send(new ProducerRecord<Long, String>(topic, 0,timestamp, id, msg), new Callback(){
            public void onCompletion(RecordMetadata metadata, Exception e)
            {
               System.out.println(metadata);
            }
         });
   }

我也尝试过使用事务,但仍然没有用。

命令

bin/kafka-console-consumer.sh --topic test --partition 0 --bootstrap-server localhost:9092

输出

second
first
first
second
second
first

如果领导者代理的工作只是将它得到的任何东西写入分区,那么生产者 API 必须提供一些其他方式来按顺序向代理生成消息。

或者至少,在代理端,是否有一种机制在将消息提交到分区之前等待一段时间?

因此,我的问题是,如何使用它们的时间戳来实现消息的排序?

如果首先生成具有更高时间戳的消息,那么在消息排序的上下文中时间戳的用途是什么?

标签: javaapache-kafka

解决方案


推荐阅读