首页 > 解决方案 > 如何从 Kafka 处理器查询数据库?

问题描述

我正在尝试使用Kafka和创建帐户管理服务Kafka Streams

SignupRequest消息被放置在一个signup-requests主题上,并且消费该主题的流中的第一个处理器必须首先检查电子邮件的唯一性,这就是问题开始的地方,我正在考虑两种可能性,但我只是一个新手......

第一个是创建一个主题,这样我就可以用它检查电子邮件的唯一性KTableaccounts但是我读到主题中的消息有时间离开,之后它们被删除。因此,如果一个带有已检查电子邮件的帐户是在很久以前创建的,超过了配置的离开时间,那么它不应该出现在它上面KTable,我的验证就会受到影响。

第二种选择是直接查询帐户真正持久化的数据库,但是如何在kafka处理器内部进行异步操作?这是一个好习惯吗?

标签: apache-kafkaapache-kafka-streams

解决方案


我读到主题中的消息有时间离开,之后它们被删除。

您可以根据时间(或主题的大小)定义保留,但您也可以将主题配置为compacted。这是一个特殊的保留选项,这意味着对于每个密钥,始终保留最新消息——无论何时收到。因此,压缩主题非常适合位于 KTables 后面的主题。


推荐阅读