首页 > 解决方案 > consumer.createMessageStreams(map) 方法是顺序读取还是分批读取

问题描述

我对卡夫卡很陌生。我们正在我们当前的应用程序中编写消费者,它从一个主题中消费,并对所消费的数据进行一些处理。我想了解,当我在下面写一段代码时内部会发生什么。

它按预期工作,消耗数据并得到处理,但只是想知道如何从主题中读取数据。

createMessageStreams 方法是从主题中顺序读取数据还是读取特定数量的批次并处理它们?

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

标签: apache-kafkakafka-consumer-api

解决方案


首先,要提到ConsumerConnectororkafka.consumer.KafkaStream类在 kafka v#0.11.0 版本中已被弃用。如果您使用的是旧版本,您应该计划升级到至少 v#1.0 或更高版本的新版本。

createMessageStreams 方法是从主题中顺序读取数据,还是读取特定数量的批次并处理它们?

.createMessageStreams返回主题映射和 KafkaStream 对列表。(topic,list#stream)每个流都支持主题的消息或元数据对的迭代器。它仅在分区内按顺序读取数据。如果分区数多于流线程数,则一个线程可以读取多个分区。但只有在分区内,才能保证顺序。

  for (final KafkaStream<byte[], byte[]> stream : streamList) 
    {
       ConsumerIterator<byte[], byte[]> it= stream.iterator();
       while (it.hasNext()) 
       {
          String message = new String(it.next().message());
          System.out.println(message);
        }
      }
}

v#0.11 及以后的等效功能是.poll()方法。您可以分别设置max.poll.recordsmax.poll.interval.ms设置每个轮询请求的记录数和间隔持续时间。

你可以在这里找到新的消费者: https ://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html


推荐阅读