apache-kafka - 如何使用 Kafka Streams 处理无序的事件
问题描述
我有一个应用程序,其中基于用户登录、用户的中间操作(可选)和用户注销等用户操作在 Kafka 主题上发送事件。每个事件在事件对象中都有一些信息以及 userId ,例如登录事件有 loginTime;添加注释有注释(中间操作)。类似地,注销事件具有 logoutTime。要求是在收到每个用户的注销事件后将所有这些事件的信息聚合到一个对象中并将其发送到下游。
由于某些原因(网络延迟,多个事件生产者)事件可能没有按顺序出现(用户注销事件可能先于中间事件),那么问题是如何处理这种情况?收到用户注销事件后,我不能等待中间事件,因为中间事件是可选的,具体取决于用户的操作。
我在这里认为的唯一选择是在收到用户注销事件后等待一段时间,如果在该等待时间内收到处理中间事件并发送已处理的事件,但再次不确定如何实现这一点。
解决方案
Kafka
不保证订购topic
,它保证订购partition
. 一个主题可以有多个分区,因此每个消费主题的消费者都会消费一个分区。这就是 kafka 实现可扩展性的方式。所以你遇到的是正常行为(它不是错误或与网络延迟或类似的东西有关)。您可以做的是确保您要按顺序处理的所有消息都发送到同一个分区。您可以通过将分区数设置为 1 来做到这一点,这是最愚蠢的方式。当您与生产者发送消息时,默认情况下,kafka 会查看密钥,对其进行哈希处理,并通过该哈希知道应该在哪个分区上发送消息。您可以确保所有消息的密钥都是相同的。这样一来,所有键的哈希值都是相同的,所有消息都将进入同一个分区。还,您可以实现自定义分区器并覆盖 kafka 如何选择分区消息的默认方式。这样,所有消息都会按顺序到达。如果您无法执行任何此类操作,那么您将收到乱序的事件,您将不得不考虑如何乱序消费它们,但这不是与 kafka 相关的问题。
推荐阅读
- java - 正则表达式检查字符串是数字还是单词“all”
- powershell - “IIS:当文件已存在时无法创建文件。Exception.Message”通过 SSL 证书和 powershell 创建 https 绑定时
- postgresql - PostgreSQL 表 --data-only 转储占用表大小的两倍以上
- ksqldb - 如何获取 ksqldb 表的当前状态?
- linux - boost::multiindex 和继承
- typescript - 如何限制打字稿中的枚举字符串值
- elasticsearch - 弹性搜索将过滤器应用于聚合数据
- javascript - 如何在 JavaScript 中实现点击事件?
- r - 从 R 中的函数或向量在数据框中创建多列
- reactjs - 如何比较和验证两个数据选择器的值与 ant design 中的表单项规则?