apache-kafka - 如何在融合的 kafka 休息代理中访问主题的最新偏移量以计算滞后
问题描述
在 confluent kafka rest proxy 中,我们可以获得特定消费者组的最后提交偏移量,但我们如何获得主题的最新偏移量来计算延迟。
解决方案
您可以使用 Kafka REST 代理来获取为特定分区提交的最新偏移量。根据Confluent Docs,
GET /consumers/(string: group_name)/instances/(string: instance)/offsets
获取给定分区的最后提交的偏移量(无论提交是由该进程还是其他进程发生的)。
请注意,必须向持有消费者实例的特定 REST 代理实例发出此请求。
参数:
group_name (string) -- 消费组的名称
instance (string) -- 消费者实例请求 JSON 的 ID
对象数组:
- partitions -- 用于查找最后提交的偏移量的分区列表
- partitions[i].topic (string) -- 主题名称
- partitions[i].partition (int) -- 分区 ID
响应 JSON 对象数组:
- offsets -- 提交的偏移量列表
- offsets[i].topic (string) -- 提交偏移量的主题名称
- offsets[i].partition (int) -- 已提交偏移量的分区 ID
- offsets[i].offset (int) -- 提交的偏移量
- offsets[i].metadata (string) -- 已提交偏移量的元数据
状态码:
- 404 未找到 --
- 错误代码 40402 - 未找到分区
- 错误代码 40403 - 未找到使用者实例
示例请求:
GET /consumers/testgroup/instances/my_consumer/offsets HTTP/1.1
Host: proxy-instance.kafkaproxy.example.com
Accept: application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json
{
"partitions": [
{
"topic": "test",
"partition": 0
},
{
"topic": "test",
"partition": 1
}
]
}
示例响应:
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v2+json
{"offsets":
[
{
"topic": "test",
"partition": 0,
"offset": 21,
"metadata":""
},
{
"topic": "test",
"partition": 1,
"offset": 31,
"metadata":""
}
]
}
推荐阅读
- alexa-skills-kit - 如何在 Slot 中收集多个值?
- linux - Bluez HID over GATT
- react-native - expo-auth-session/providers/google 无法在独立应用程序上运行:invalid_scheme
- java - 获取“未找到模块 javafx.controls”错误 Java Eclipse IDE
- sympy - sympy 如何简化以变量为指数的表达式
- javascript - 使用 find 函数循环遍历数组
- r - 如何有效地识别数据表中多个列的顺序变化?
- python - 如何在无头鹡鸰中禁用页面的“实时”视图按钮
- javascript - 将解析后的 URL 存储在 Cookie 中(不存储整个值)
- python - 将 .txt 文件转换为具有特定列 PYTHON 的 .csv