java - Kafka不使用偏移量而是通过记录的字段从主题中删除记录
问题描述
假设我有一个名为“batch”的主题,它有 1 个分区,并且我向它发布了数百万条记录以进行处理。我有一个 3 人的消费者组来处理数百万条记录。我遇到了一种情况,我不再需要处理满足某些标准的某些消息子集,例如age < 50
如何以编程方式从主题中删除这些消息。就像我在 UI 中单击“取消”按钮一样,它应该从主题中删除那些记录子集,age < 50
以便消费者不会对其进行处理。
我知道我可以通过运行带有偏移量的命令行来删除消息:- https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh
还有Java API,但同样是偏移量:
Delete records whose offset is smaller than the given offset of the corresponding partition
但就我而言,我不能使用偏移量,因为我只需要删除某些记录而不是all records smaller than the given offset
解决方案
我需要指出的主要一点是,您不应将 Kafka 中的数据视为与数据库中的数据相同的东西。Kafka 并未设计为以这种方式工作(例如:当我单击 X 按钮时,Y 记录将被删除)。
相反,您应该将主题视为永无止境的数据流。为 Kafka 主题生成的每条记录都将由消费者独立消费和处理。
将主题视为流可以为您提供不同的解决方案:
您可以使用带有过滤结果的第二个主题!
Streaming Diagram
___ Topic A ____
-- Produced Messages --> | | _______________________
|________________| --> | |
| Filtering Application |
___ Topic B ___ | |
| | <-- |_______________________|
<-- Consumed Messages -- |________________|
解释很简单,您生成了主题 A 的消息。然后您使用 a Filtering Application
which 将:
- 使用来自主题 A 的消息
- 基于一些业务逻辑(例如:)
age < 50
将过滤 - 将过滤后的消息生成到主题 B
最后,您的消费者将收到来自主题 B 的消息。
现在,在创建过滤应用程序时,您有几个选择:
推荐阅读
- token - Instagram 令牌在一些请求后被阻止
- java - Querydsl 本机 jpa 查询返回实体而不是对象数组
- sql - 用于根据其他列值更新列值的 SQL 脚本
- sql - 在 postgresql 中处理周末和节假日的最佳方式
- get - 如何防止在 where 子句中的 GET 参数中注入
- react-native - React Native 如何显示从服务器获取的标记
- php - 此集合实例上不存在属性 [数量]
- python - Python拆分不同的列表
- python - 使用 writer.sheets 方法时的 Pandas.ExcelWriter KeyError
- php - Angular 和 PHP .htaccess 文件重定向问题