apache-kafka-streams - 聚合后Kafka Streams加入不适用于多个分区
问题描述
问题陈述:
主题 1:“key = empId, value = empname, deptName , ...”
主题 2:“键 =部门名称,值 = 部门名称”
我需要来自主题 1 的数据,其中 deptName(主题 1 中的值属性) 等于主题 2 的键。
脚步:
- 从主题 1 创建一个流,按 deptName 对其进行分组,然后进行聚合。它将返回 Ktable
(key =deptName, value = "empId1,empId2,empId3 ..")
- 从主题 2 创建一个流
(key ="deptName" value = "deptName")
- 对 Ktable(步骤 1)和 KSteam(步骤 2)执行左连接操作。
(KStream-Ktable)
- 并加入返回所需的结果。
在单个分区中一切正常,但是,在切换到多个分区后,join 不会返回任何数据。
步骤1:
KGroupedStream<String, Object> groupedStream = adStream.groupBy((key, value) -> value.getOrganizationId().toString());
groupedStream
.aggregate(() -> (new String()),
(aggKey, newValue, aggValue) -> addCurrentValue(aggValue,
String.valueOf(newValue.getOriginId())),
Materialized.as("aggregated-stream-store").with(strSerde, strSerde))
.toStream().to(Constant.AD_AGGREGATED_DATA, Produced.with(strSerde, strSerde));
第2步:
KStream<String, String> swgOrgStream = builder.stream(Constant.SWG_ORG_TOPIC,Consumed.with(strSerde, strSerde));
第 3 步:
KStream<String, String> filteredOrgStream = swgOrgStream.leftJoin(aggregatedTable,
(leftValue, rightValue) -> rightValue);
解决方案
推荐阅读
- clojure - 带有基座 clojure 服务器的通用 [/*proxy :any] 和特定 [/service/x :post..] 路由
- html - 将 css 属性分配给父级,而不是子级
- javascript - JavaScript - 在同一窗口/选项卡中打开链接
- sql-server - 使用 sql server 将数字从 002541500(后 4 位为小数位)转换为 254.1500
- xml - 简单转换反序列化:匹配元素错误
- speech-recognition - 如何在 dictation.io/speech 中捕捉/精确文本
- php - 如何使用 Laravel 5.* 获取数据类型的绑定
- php - 在布尔值中调用成员函数 getTimestamp()
- c# - 如何使用 C# 字典从 Dapper 中的银行映射 JSON?
- python - Windows 错误消息中的烧瓶