scala - Kafka读取主题的所有消息
问题描述
我想在预定的时间间隔内读取来自 Kafka 主题的所有消息,以计算一些全局索引值。我正在做这样的事情:
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("group.id", "test")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,Int.MaxValue.toString)
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(TOPIC))
consumer.poll(10000)
consumer.seekToBeginning(consumer.assignment())
val records = consumer.poll(10000)
使用这种机制,我可以获得所有记录,但这是一种有效的方法吗?每个主题大约有 20000000 (2.1 GB) 记录。
解决方案
您可能会考虑使用 Kafka Streams 库来执行此操作。它支持不同类型的窗口。
- 翻滚时间窗口
- 跳跃时间窗
- 滑动时间窗
- 会话窗口
您可以使用 Tumbling windows 来捕获给定内部的事件并计算您的全局索引。
https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#windowing
推荐阅读
- php - 一个 PDF 文件中的多个图像
- python - 不同的自相关归一化值与 statsmodels
- android-studio - 无法启动守护进程,守护进程配置不正确
- r - 在 dplyr 中使用“get”
- docker - 卷挂载到 Windows 容器中
- php - 在 PHP 和 Localhost 中发送电子邮件
- c++ - 调用 socket.remote_endpoint(boost asio library) 线程安全
- elasticsearch - 异常-“网络/Elasticsearch 集群不可访问或针对 WAN/云实例时”
- ios - AWS Device Farm 目前在 iOS 10.3.3 及更低版本上支持 XCTest
- angular - Angular 5 HTTP Observable GET 请求处理嵌套对象