首页 > 解决方案 > 无法重置 Kafka 偏移量

问题描述

我正在尝试重置 Kafka 偏移量,但无法重置它。

当前偏移量为 6 并尝试将其重置为最早并尝试减 1 但它没有重置并继续反映 6

kafka-consumer-groups --bootstrap-server <server>:9092 --group EDWOFFSETGROUP_24 --describe
Consumer group 'EDWOFFSETGROUP_24' has no active members.

TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
EDW_TOTALS_JSON1 0          6               6               0               -               -               -

方法#1

kafka-consumer-groups --bootstrap-server <server>:9092  --group EDWOFFSETGROUP_24 --topic EDW_TOTALS_JSON1:0 --reset-offsets --shift-by -1
WARN: No action will be performed as the --execute option is missing.In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
[2019-12-04 14:43:14,364] WARN New offset (5) is lower than earliest offset for topic partition EDW_TOTALS_JSON1-0. Value will be set to 6 (kafka.admin.ConsumerGroupCommand$)

TOPIC                      PARTITION  NEW-OFFSET     
EDW_TOTALS_JSON1           0          6     

方法#2

kafka-consumer-groups --bootstrap-server <server>:9092 --group EDWOFFSETGROUP_24 --topic EDW_TOTALS_JSON1 --reset-offsets --to-earliest --execute

TOPIC                      PARTITION  NEW-OFFSET     
EDW_TOTALS_JSON1           0          6           

标签: apache-kafkaoffset

解决方案


似乎通过保留策略清除了数据。如果消息被截断,它将从出现在主题分区详细信息上的新位置开始。

请参阅保留设置定义

您可以尝试与新消费者一起验证

bin/kafka-console-consumer.sh --zookeeper <zk_host>:2181 --topic test --from-beginning

如果您使用的是新 API,请在下面使用

bin/kafka-console-consumer.sh --bootstrap-server <server>:9092 --topic EDW_TOTALS_JSON1 --from-beginnin

您还可以使用 GetOffsetShell 检查分区的偏移详细信息

   bin/kafka-run-class.sh kafka.tools.GetOffsetShell
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--offsets <Integer: count> number of offsets returned (default: 1)
--partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default)
--time <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore  >
--topic <topic> REQUIRED: The topic to get offsets from.

例子

kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1

推荐阅读