首页 > 解决方案 > Kafka读取主题的所有消息

问题描述

我想在预定的时间间隔内读取来自 Kafka 主题的所有消息,以计算一些全局索引值。我正在做这样的事情:

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("group.id", "test")
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,Int.MaxValue.toString)

  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(util.Collections.singletonList(TOPIC))
  consumer.poll(10000)
  consumer.seekToBeginning(consumer.assignment())
   val records = consumer.poll(10000)

使用这种机制,我可以获得所有记录,但这是一种有效的方法吗?每个主题大约有 20000000 (2.1 GB) 记录。

标签: scalaapache-kafkakafka-consumer-api

解决方案


您可能会考虑使用 Kafka Streams 库来执行此操作。它支持不同类型的窗口。

  1. 翻滚时间窗口
  2. 跳跃时间窗
  3. 滑动时间窗
  4. 会话窗口

您可以使用 Tumbling windows 来捕获给定内部的事件并计算您的全局索引。

https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#windowing


推荐阅读