apache-kafka - consumer.createMessageStreams(map) 方法是顺序读取还是分批读取
问题描述
我对卡夫卡很陌生。我们正在我们当前的应用程序中编写消费者,它从一个主题中消费,并对所消费的数据进行一些处理。我想了解,当我在下面写一段代码时内部会发生什么。
它按预期工作,消耗数据并得到处理,但只是想知道如何从主题中读取数据。
createMessageStreams 方法是从主题中顺序读取数据还是读取特定数量的批次并处理它们?
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
解决方案
首先,要提到ConsumerConnector
orkafka.consumer.KafkaStream
类在 kafka v#0.11.0 版本中已被弃用。如果您使用的是旧版本,您应该计划升级到至少 v#1.0 或更高版本的新版本。
createMessageStreams 方法是从主题中顺序读取数据,还是读取特定数量的批次并处理它们?
.createMessageStreams
返回主题映射和 KafkaStream 对列表。(topic,list#stream)
每个流都支持主题的消息或元数据对的迭代器。它仅在分区内按顺序读取数据。如果分区数多于流线程数,则一个线程可以读取多个分区。但只有在分区内,才能保证顺序。
for (final KafkaStream<byte[], byte[]> stream : streamList)
{
ConsumerIterator<byte[], byte[]> it= stream.iterator();
while (it.hasNext())
{
String message = new String(it.next().message());
System.out.println(message);
}
}
}
v#0.11 及以后的等效功能是.poll()
方法。您可以分别设置max.poll.records
或max.poll.interval.ms
设置每个轮询请求的记录数和间隔持续时间。
你可以在这里找到新的消费者: https ://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
推荐阅读
- nginx - 增加 proxy_send_timeout 和 proxy_read_timeout 入口 nginx
- swagger - Add the placeholder required in swagger ui v2.0
- spring - @Value 注释不适用于 STS 3.9.6 上的 Infinitest 5.2.0
- javascript - 单击按钮时从内容脚本打开选项页面?
- html - Webpack 4 不从 HTML 文件加载图像
- java - 禁用所有日期,而不是一个月或一周的第一天
- java - Eclipse 插件:标签中的编码
- javascript - React Native:ScrollView 中的 KeyboardAwareScrollView 不起作用
- c# - 带有 Npgsql 的 C# - 无法使用处理程序类型 Int32Handler 编写 CLR 类型 System.String
- go - 属于自己