首页 > 解决方案 > 聚合后Kafka Streams加入不适用于多个分区

问题描述

问题陈述:

主题 1:“key = empId, value = empname, deptName , ...”

主题 2:“键 =部门名称,值 = 部门名称”

我需要来自主题 1 的数据,其中 deptName(主题 1 中的值属性) 等于主题 2 的键。

脚步:

  1. 从主题 1 创建一个流,按 deptName 对其进行分组,然后进行聚合。它将返回 Ktable(key =deptName, value = "empId1,empId2,empId3 ..")
  2. 从主题 2 创建一个流(key ="deptName" value = "deptName")
  3. 对 Ktable(步骤 1)和 KSteam(步骤 2)执行左连接操作。(KStream-Ktable)
  4. 并加入返回所需的结果。

在单个分区中一切正常,但是,在切换到多个分区后,join 不会返回任何数据。

步骤1:

KGroupedStream<String, Object> groupedStream = adStream.groupBy((key, value) -> value.getOrganizationId().toString());
        groupedStream
                .aggregate(() -> (new String()),
                        (aggKey, newValue, aggValue) -> addCurrentValue(aggValue,
                                String.valueOf(newValue.getOriginId())),
                        Materialized.as("aggregated-stream-store").with(strSerde, strSerde))
                .toStream().to(Constant.AD_AGGREGATED_DATA, Produced.with(strSerde, strSerde));

第2步:

KStream<String, String> swgOrgStream = builder.stream(Constant.SWG_ORG_TOPIC,Consumed.with(strSerde, strSerde));

第 3 步:

 KStream<String, String> filteredOrgStream = swgOrgStream.leftJoin(aggregatedTable,
                (leftValue, rightValue) -> rightValue);

标签: apache-kafka-streams

解决方案


推荐阅读