首页 > 解决方案 > 如何在融合的 kafka 休息代理中访问主题的最新偏移量以计算滞后

问题描述

在 confluent kafka rest proxy 中,我们可以获得特定消费者组的最后提交偏移量,但我们如何获得主题的最新偏移量来计算延迟。

标签: apache-kafkakafka-rest

解决方案


您可以使用 Kafka REST 代理来获取为特定分区提交的最新偏移量。根据Confluent Docs

GET /consumers/(string: group_name)/instances/(string: instance)/offsets

获取给定分区的最后提交的偏移量(无论提交是由该进程还是其他进程发生的)。

请注意,必须向持有消费者实例的特定 REST 代理实例发出此请求。

参数:

  • group_name (string) -- 消费组的名称

  • instance (string) -- 消费者实例请求 JSON 的 ID

对象数组:

  • partitions -- 用于查找最后提交的偏移量的分区列表
  • partitions[i].topic (string) -- 主题名称
  • partitions[i].partition (int) -- 分区 ID

响应 JSON 对象数组:

  • offsets -- 提交的偏移量列表
  • offsets[i].topic (string) -- 提交偏移量的主题名称
  • offsets[i].partition (int) -- 已提交偏移量的分区 ID
  • offsets[i].offset (int) -- 提交的偏移量
  • offsets[i].metadata (string) -- 已提交偏移量的元数据

状态码:

  • 404 未找到 --
  • 错误代码 40402 - 未找到分区
  • 错误代码 40403 - 未找到使用者实例

示例请求:

GET /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    },
    {
      "topic": "test",
      "partition": 1
    }

  ]
}

示例响应:

HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json

{"offsets":
 [
  {
    "topic": "test",
    "partition": 0,
    "offset": 21,
    "metadata":""
  },
  {
    "topic": "test",
    "partition": 1,
    "offset": 31,
    "metadata":""
  }
 ]
}

推荐阅读