首页 > 解决方案 > 从kafka获取最后写入的消息偏移量

问题描述

我正在为名为default.

我的生产者分区是RD_KAFKA_PARTITION_UA,当我使用未分配的分区时,如何获取推送到default主题的最后一条消息的偏移量。

我不需要消费者的偏移量,我需要最后发布的消息的偏移量。

我需要用于崩溃恢复的偏移量,不打算进行人为干预。

我需要使用偏移量librdkafka,当我的程序再次启动时,基于偏移量,我将对我的程序进行一些修改,因此我只需要在需要时通过 API 获取它。

我不认为我可以依靠回调,

考虑我推送了我的消息,我收到了回调并且我的程序崩溃了。

或者

我推送了我的消息,我正要投票,我的程序崩溃了。

我正在使用 C 语言librdkafka库。

标签: capache-kafkakafka-producer-api

解决方案


这可以使用生产者的交付报告轻松完成。

librdkafka 示例之一rdkafka_example.c演示了此功能。

例如:

./rdkafka_example -P -b localhost:9092 -t testtopic -o report
% Type stuff and hit enter to send
hello
del: Success: offset 0
% Message delivered (5 bytes, offset 0, partition 0): hello
hello
% Sent 5 bytes to topic testtopic partition -1
del: Success: offset 1
% Message delivered (5 bytes, offset 1, partition 0): hello
^Cdel: Success: offset 2
% Message delivered (5 bytes, offset 2, partition 0): hello

您会看到,对于发送的每条消息,传递报告都包含生成的消息的偏移量。

基本上这个例子:

  • produce.offset.report=true [1]
  • 登记交货报告[2]
  • 在交付报告中,访问偏移量字段[3]

推荐阅读