首页 > 解决方案 > 外键在流式数据库更新上加入两个 ktable

问题描述

我们有两个数据库表,它们通过 cdc 应用程序将更新流式传输到 kafka 主题。我们将在 Ktables 上保留每一行的最新版本。然后加入他们并就任何更新写到另一个 kafka 主题。我们的代码如下所示:

pTable = builder.stream(pTopic, pKeyDbOperationEventConsumed)
                          .selectKey(((key, value) -> key.getId()))
                          .mapValues(pMapFunc)
                          .groupByKey(stringPGrouped)
                          .reduce((aggValue, newValue) -> newValue, pMaterializedAs);
sPTable = builder.stream(sPTopic, keyDbOperationEventConsumed)
                               .selectKey(((key, value) -> key.getPId() + "-" + key.getSId()))
                               .mapValues(spMapFunc)
                               .groupByKey(stringSPGrouped)
                               .reduce((aggValue, newValue) -> newValue, sPMaterializedAs);
sPTable.join(pTable, (sp) -> sp.getPId().toString(), joinerFunc)
                     .toStream()
                     .to(upstreamTopic, producedWithFunc);

它适用于本地环境的小数据。但我们无法让它在生产中发挥作用。我们的设置是:5 个 pod,两个主题都有 30 个分区。使用默认配置,它开始处理,但在处理非常小的数据后卡住了。我们看到尝试心跳失败,因为组正在重新平衡日志。然后我们将配置更改为:

- max.poll.interval.ms = 3600000
- request.timeout.ms = 7200000
- session.timeout.ms = 900000
- num.stream.threads = 6 

但没有运气。它甚至无法处理一条记录。我们遇到了这个代理日志:组 x 中的成员 x 失败,将其从组中删除(kafka.coordinator.group.GroupCoordinator)

我的第一个问题是我们的用例是否有效。如果有效,我们如何追踪潜在问题?

标签: apache-kafka-streamsspring-kafka

解决方案


推荐阅读