apache-kafka - kafka流+如何使状态存储中的条目异步过期
问题描述
我有一个 kafka 流拓扑,它从输入主题读取更新某些状态并确定状态条目是否需要保留在状态存储中或可以删除。如果它可以被删除,它将被删除,否则我有一个标点符号,每 10 秒运行一次,并且会使状态存储中的项目过期。
我最近发现标点符号在同一个流线程上运行,并且可能会阻塞流的处理。
我可以使用哪些模式在单独的线程池中执行标点符号内的逻辑以避免阻塞流处理?
感谢你的帮助。
解决方案
Matthias J. Sax已经说过,到目前为止,国有商店不可能做到这一点,所以当他在 Confluent 工作时,我相信这是最新消息。
但是,在我们的案例中,我们所做的是使用 KStream-KTable 连接而不是状态存储。我不确定,如果这对你的情况是可能的,但让我解释一下,我们做了什么,也许它对你也有用:
我们有两个主题 A 和 B,主题 A 与 KStream 一起使用。主题 B 与 KTable 一起使用。我们转换 KTable 数据,因此我们可以将它加入到主题 A 的 KStream 中。我们加入它,执行我们的操作并通过null
使用原始键向主题 B 写入一个值来从主题 B 中“删除”数据map
和through
。因此,当我们在主题 A 中获得另一条记录时,我们的 KTable 中不再有要加入的值(正是我们想要的)。
我希望它有所帮助。
推荐阅读
- python - Django forloop 和所有可选择的单选按钮
- python - 将数据帧的某一行乘以相同维度的另一个数据帧的另一行的值
- java - 获取条件查询的 JPQL/SQL 字符串表示
- c++ - 初始化时 num 的哪个值会给这个嵌套循环一个“真”的输出?
- reactjs - 使用样式化组件向子元素添加样式?
- r - 将文本值添加到辅助轴上的 ggplot
- python - 为什么“any”有时比python中布尔值的“max”工作得更快,有时比“max”慢得多?
- html - 如何在移动视图的总屏幕上显示侧选项卡?
- react-native - 更改状态后组件未更新
- python - 如何用 python 克服这个 sql 连接错误?