首页 > 解决方案 > 如何删除消费者已经消费的数据?卡夫卡

问题描述

我正在kafka中进行数据复制。但是,kafka 日志文件的大小增加得非常快。一天的大小达到 5 GB。作为这个问题的解决方案,我想立即删除处理过的数据。我在 AdminClient 中使用删除记录方法来删除偏移量。但是当我查看日志文件时,与该偏移量对应的数据并没有被删除。

RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);

我不想要像(log.retention.hours , log.retention.bytes , log.segment.bytes , log.cleanup.policy=delete)这样的建议

因为我只想删除消费者消费的数据。在这个解决方案中,我还删除了没有被消费的数据。

你有什么建议?

标签: javaapache-kafkaoffsetkafka-topic

解决方案


你没有做错什么。您提供的代码有效,我已经对其进行了测试。以防万一我忽略了您的代码中的某些内容,我的是:

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
    TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
    Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
    deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
    kafkaAdminClient.deleteRecords(deleteMap);
}

我使用了组:'org.apache.kafka',名称:'kafka-clients',版本:'2.0.0'

因此,请检查您是否针对正确的分区(第一个分区为 0)

检查您的代理版本:https ://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html说:

0.11.0.0 版本的代理支持此操作

从同一应用程序生成消息,以确保您已正确连接。

您还可以考虑另一种选择。使用cleanup.policy=compact如果您的消息密钥重复,您可以从中受益。不仅因为该键的旧消息将被自动删除,而且您可以使用具有空负载的消息删除该键的所有消息这一事实。只是不要忘记将delete.retention.msmin.compaction.lag.ms设置为足够小的值。在这种情况下,您可以使用一条消息,而不是为同一密钥生成空有效负载(但请谨慎使用这种方法,因为这样您可以删除未使用的消息(使用该密钥))


推荐阅读