scala - 卡夫卡如何获得最后提交的偏移量
问题描述
上面的代码在 <2.3 的 kafka 版本中运行良好。但事后不起作用。
已提交的签名更改为 Map<TopicPartition,OffsetAndMetadata> 已提交(设置分区)
需要以下格式的结果。(分区顺序无关紧要) "{"topicA":{"1":4,"0":105}}"
def getLatestOffset(topic: String, consumer: KafkaConsumer[String, String]): String = {
val partitions = new util.ArrayList[TopicPartition]
for (partitionInfo <- consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}
consumer.assign(partitions)
//consumer.seekToEnd(partitions)
var map = Map[String, Long]()
for (partition <- partitions) {
//consumer.seekToEnd(Collections.singletonList(partition))
// val latestOffset = consumer.position(partition)
val latestOffset = consumer.committed(partition).offset()
map += (partition.partition().toString -> latestOffset)
}
var jsonMap = Map[String, Any]()
jsonMap += (topic -> map)
implicit val formats = DefaultFormats
val response: String = write(jsonMap)
response
}
解决方案
推荐阅读
- xpath - 为了从网页中提取文本,XPath 被精确到一个?
- powerbi - 带过滤器的 PowerBi Dax 子查询
- php - 在 WooCommerce 中为 wp_dropdown_categories 下拉菜单启用 select2
- javascript - HTML Replace Select item with something more formatable
- c# - 处理在不同线程中读取的 TCP Socket 数据
- javascript - 用nexe编译主进程electron js?
- ios - swift - 使用firebase检索数据后如何使用Values?
- java - Android:如何为 2 个设备(平板电脑和手机)调用不同的布局
- spring-boot - 带有尤里卡的 Zuul 代理返回 504 超时
- node.js - Nodejs GET 和 POST 在实时服务器中混合,但在 localhost 中工作