apache-kafka - Kafka DSL KStream 转换器之间的 Kafka 共享状态存储
问题描述
我有一个拓扑,我使用 Transformer 来聚合我的对象,稍后在我的拓扑中,我试图从第一个 Transformer 中使用的 State Store 中读取。似乎无法访问数据。是因为状态存储在不同的分区上吗?
我的拓扑看起来像这样:
streamsBuilder.stream("input")
.transform(new TransformerSupplier1(), "my-store")
.leftJoin(someKTable, myValueJoiner())
.flatTransform(new TransformerSupplier2(), "my-store")
在我TransformerSupplier1
的Transformer
,我的国营商店是类型<String, Map<String, Object>>
在我TransformerSupplier2
的 'sTransformer
中,我试图通过我用来存储在第一个变压器中的密钥来获取状态存储,但我得到了 null,当我这样做时.all().peekNextKey()
,什么也找不到。
如果我需要为我的变形金刚提供更多信息,请告诉我,我会尝试混淆真正的逻辑。谢谢
解决方案
推荐阅读
- php - 如何在我的 JSON 字符串 POST 前后删除斜杠
- php - laravel 中的动态关系使用子查询来进行归属关系
- java - 用 main.java 打开 loginFrame.java
- typescript - 使用自定义匹配器扩展 expect-webdriverio
- python - 即使目录正确,Visual Studio 也找不到我的文件
- python - 如何在pyspark中使用正则表达式验证多个电子邮件地址
- path-finding - A* 在多个网格上寻路?
- asp.net-mvc - ASP.Net MVC-如何在个人身份验证模式下显示登录按钮?
- python - 猪拉丁语翻译器:从 Python 开始
- excel - 如何使用 VBA 在 Excel 中插入 WebP 图像(“.jpg”)?