apache-kafka-streams - 外键在流式数据库更新上加入两个 ktable
问题描述
我们有两个数据库表,它们通过 cdc 应用程序将更新流式传输到 kafka 主题。我们将在 Ktables 上保留每一行的最新版本。然后加入他们并就任何更新写到另一个 kafka 主题。我们的代码如下所示:
pTable = builder.stream(pTopic, pKeyDbOperationEventConsumed)
.selectKey(((key, value) -> key.getId()))
.mapValues(pMapFunc)
.groupByKey(stringPGrouped)
.reduce((aggValue, newValue) -> newValue, pMaterializedAs);
sPTable = builder.stream(sPTopic, keyDbOperationEventConsumed)
.selectKey(((key, value) -> key.getPId() + "-" + key.getSId()))
.mapValues(spMapFunc)
.groupByKey(stringSPGrouped)
.reduce((aggValue, newValue) -> newValue, sPMaterializedAs);
sPTable.join(pTable, (sp) -> sp.getPId().toString(), joinerFunc)
.toStream()
.to(upstreamTopic, producedWithFunc);
它适用于本地环境的小数据。但我们无法让它在生产中发挥作用。我们的设置是:5 个 pod,两个主题都有 30 个分区。使用默认配置,它开始处理,但在处理非常小的数据后卡住了。我们看到尝试心跳失败,因为组正在重新平衡日志。然后我们将配置更改为:
- max.poll.interval.ms = 3600000
- request.timeout.ms = 7200000
- session.timeout.ms = 900000
- num.stream.threads = 6
但没有运气。它甚至无法处理一条记录。我们遇到了这个代理日志:组 x 中的成员 x 失败,将其从组中删除(kafka.coordinator.group.GroupCoordinator)
我的第一个问题是我们的用例是否有效。如果有效,我们如何追踪潜在问题?
解决方案
推荐阅读
- reactjs - 在 Typescript 中扩展和使用泛型类
- flutter - 未定义的颤振创建--ios-language objc--android-language java
- javascript - 使用 Active Directory 的 Spring Web App 的单点登录实现
- docker-compose - 将 OpenLDAP 作为 Wso2is 5.8.0 的 PRIMARY 外部用户存储设置的问题:添加新用户时出现与 createTimestamp 属性相关的“错误 21”
- apache - 如何将http重定向到https而不在apache中使用斜杠?
- google-coral - 如何在 Coral CPU 上运行 tflite 模型
- angular - 如何从服务中获取 url 的参数
- r - 如何在循环中使用 if else 语句?
- html - CSS 选择器特异性和文件加载顺序
- typescript - 如何将导出的函数转换为函数类型?