首页 > 解决方案 > Kafka不使用偏移量而是通过记录的字段从主题中删除记录

问题描述

假设我有一个名为“batch”的主题,它有 1 个分区,并且我向它发布了数百万条记录以进行处理。我有一个 3 人的消费者组来处理数百万条记录。我遇到了一种情况,我不再需要处理满足某些标准的某些消息子集,例如age < 50

如何以编程方式从主题中删除这些消息。就像我在 UI 中单击“取消”按钮一样,它应该从主题中删除那些记录子集,age < 50以便消费者不会对其进行处理。

我知道我可以通过运行带有偏移量的命令行来删除消息:- https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh

还有Java API,但同样是偏移量:

https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/admin/AdminClient.html#deleteRecords-java.util.Map-org.apache.kafka.clients.admin.DeleteRecordsOptions-

Delete records whose offset is smaller than the given offset of the corresponding partition

但就我而言,我不能使用偏移量,因为我只需要删除某些记录而不是all records smaller than the given offset

标签: javaapache-kafkaspring-kafka

解决方案


我需要指出的主要一点是,您不应将 Kafka 中的数据视为与数据库中的数据相同的东西。Kafka 并未设计为以这种方式工作(例如:当我单击 X 按钮时,Y 记录将被删除)。

相反,您应该将主题视为永无止境的数据流。为 Kafka 主题生成的每条记录都将由消费者独立消费和处理。

将主题视为流可以为您提供不同的解决方案:

您可以使用带有过滤结果的第二个主题!

Streaming Diagram
                            ___ Topic A ____
--  Produced Messages -->  |                |      _______________________
                           |________________| --> |                       |
                                                  | Filtering Application |
                            ___  Topic B ___      |                       |
                           |                | <-- |_______________________|
<-- Consumed Messages --   |________________|

解释很简单,您生成了主题 A 的消息。然后您使用 a Filtering Applicationwhich 将:

  1. 使用来自主题 A 的消息
  2. 基于一些业务逻辑(例如:)age < 50将过滤
  3. 将过滤后的消息生成到主题 B

最后,您的消费者将收到来自主题 B 的消息。

现在,在创建过滤应用程序时,您有几个选择:

  1. 使用消费者和生产者实现基本解决方案
  2. 使用Kafka 流
  3. 使用KSQL

推荐阅读