首页 > 解决方案 > Kafka ProducerInterceptor 可以过滤记录吗?

问题描述

Kafka 支持拦截器的概念,拦截器位于 Kafka 和消费或生产记录的进程之间,因此可以对从 Kafka 读取或写入到 Kafka 的记录进行变异或执行自定义日志记录。

据我所见,ConsumerInterceptor允许过滤记录,因为它返回一个ConsumerRecords对象,并且实现可以在将记录传递给消费者之前从容器类中删除(即审查)项目。

ProducerInterceptor只接受并返回ProducerRecord,而不是类似Optional<ProducerRecord>. 如果此方法返回的记录为空,会发生什么情况?用例希望阻止将记录写入 Kafka - 这是否通过简单地通过返回 null 来删除正在写入的记录来支持,还是必须改变输入对象并将其字段归零?

标签: apache-kafka

解决方案


你可能会得到一个 SerializationException,因为拦截将在序列化之前发生,因为你需要在 onSend 上执行此操作。如果您处理 null 情况,它可能对您有用,但您需要确定这是否是正确的做法。如果您想在发生过滤的情况下向 Kafka 发送一个空记录,那么这是有道理的。如果您只是想删除记录,最好将过滤逻辑放在生产者本身而不是拦截。


推荐阅读