scala - 如何仅使用 Kafka 主题中的最新偏移量
问题描述
我正在开发一个使用 kafka 的 scala 应用程序。我的kafka消费者代码如下。
def getValues(topic: String): String = {
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
val topicPartition = new TopicPartition(topic, 0)
consumer.assign(util.Collections.singletonList(topicPartition))
val offset = consumer.position(topicPartition) - 1
val record = consumer.poll(Duration.ofMillis(500)).asScala
for (data <- record)
if(data.offset() == offset) val value = data.value()
return value
}
在这我只想返回最新的值。当我运行我的应用程序时,我得到以下日志:
Resetting offset for partition topic-0 to offset 0
因为它val offset = consumer.position(topicPartition) - 1
变成 -1 并且 data.offset() 给出了所有偏移量的列表。结果我没有得到最新的价值。为什么它会自动将偏移量重置为0?我该如何纠正?我的代码有什么错误?或任何其他方式我可以从最新的偏移量中获得价值?
解决方案
您正在寻找seek
一种方法——根据JavaDocs—— “覆盖消费者将在下一次轮询(超时)时使用的获取偏移量”。
还要确保您正在设置
props.put("auto.offset.reset", "latest")
对您的代码进行这两项修改后,以下内容对我来说只能获取所选主题value
中分区的最新偏移量:0
import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._
def getValues(topic: String): String = {
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
val topicPartition = new TopicPartition(topic, 0)
consumer.assign(java.util.Collections.singletonList(topicPartition))
val offset = consumer.position(topicPartition) - 1
consumer.seek(topicPartition, offset)
val record = consumer.poll(Duration.ofMillis(500)).asScala
for (data <- record) {
val value: String = data.value() // you are only reading one message if no new messages flow into the Kafka topic
}
value
}
推荐阅读
- python - python psycopg2 - ProgrammingError:函数交叉表(未知,未知)不存在
- javascript - 有没有一种简单的方法可以用命令行重构 javascript 代码?
- java - Recyclerview 未更新屏幕 onNotifydatasetchanged
- c# - 为同一接口的多个实现设置属性
- android - 尽管添加了项目(文本框等),但布局预览显示为空白。甚至项目名称也没有出现在屏幕上
- regex - 从文本文件中获取姓名和年龄
- java - 大小无法解析或不是字段
- go - 如何使用 VGO 导入和更新子包
- javascript - fullpage.js - 触发响应后,jQuery.position() 和 .offset() 仍然返回 0,0
- mysql - MySQL 更新错误不正确的日期时间值