首页 > 解决方案 > 卡夫卡如何获得最后提交的偏移量

问题描述

上面的代码在 <2.3 的 kafka 版本中运行良好。但事后不起作用。

已提交的签名更改为 Map<TopicPartition,OffsetAndMetadata> 已提交(设置分区)

需要以下格式的结果。(分区顺序无关紧要) "{"topicA":{"1":4,"0":105}}"

  def getLatestOffset(topic: String, consumer: KafkaConsumer[String, String]): String = {


    val partitions = new util.ArrayList[TopicPartition]


    for (partitionInfo <- consumer.partitionsFor(topic)) {
      partitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
    }
    consumer.assign(partitions)

    //consumer.seekToEnd(partitions)


    var map = Map[String, Long]()
    for (partition <- partitions) {

      //consumer.seekToEnd(Collections.singletonList(partition))

      // val latestOffset = consumer.position(partition)
      val latestOffset = consumer.committed(partition).offset()

      map += (partition.partition().toString -> latestOffset)

    }

    var jsonMap = Map[String, Any]()
    jsonMap += (topic -> map)


    implicit val formats = DefaultFormats
    val response: String = write(jsonMap)

    response
  }


标签: scalaapache-kafkakafka-consumer-api

解决方案


推荐阅读