apache-kafka-streams - 使用变更日志会导致应用程序本身出现瓶颈吗?
问题描述
我有一个 spring cloud kafka 流应用程序,它重新输入数据以能够加入两个主题,selectkeys,mapvalues 和聚合数据。随着时间的推移,消费者的滞后似乎在增加,通过添加应用程序的多个实例来扩展并没有一点帮助。在每一个例子中,消费者的滞后似乎都在增加。
我将实例从 1 放大和缩小到 18,但没有发现大的差异。它落后的消息数量,每 5 秒不断增加,与实例数量无关
KStream<String, MappedOriginalSensorData> flattenedOriginalData = originalData
.flatMap(flattenOriginalData())
.through("atl-mapped-original-sensor-data-repartition", Produced.with(Serdes.String(), new MappedOriginalSensorDataSerde()));
//#2. Save modelid and algorithm parts of the key of the errorscore topic and reduce the key
// to installationId:assetId:tagName
//Repartition ahead of time avoiding multiple repartition topics and thereby duplicating data
KStream<String, MappedErrorScoreData> enrichedErrorData = errorScoreData
.map(enrichWithModelAndAlgorithmAndReduceKey())
.through("atl-mapped-error-score-data-repartition", Produced.with(Serdes.String(), new MappedErrorScoreDataSerde()));
return enrichedErrorData
//#3. Join
.join(flattenedOriginalData, join(),
JoinWindows.of(
// allow messages within one second to be joined together based on their timestamp
Duration.ofMillis(1000).toMillis())
// configure the retention period of the local state store involved in this join
.until(Long.parseLong(retention)),
Joined.with(
Serdes.String(),
new MappedErrorScoreDataSerde(),
new MappedOriginalSensorDataSerde()))
//#4. Set instalation:assetid:modelinstance:algorithm::tag key back
.selectKey((k,v) -> v.getOriginalKey())
//#5. Map to ErrorScore (basically removing the originalKey field)
.mapValues(removeOriginalKeyField())
.through("atl-joined-data-repartition");
然后是聚合部分:
Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> materialized = Materialized
.as(localStore.getStoreName());
// Set retention of changelog topic
materialized.withLoggingEnabled(topicConfig);
// Configure how windows looks like and how long data will be retained in local stores
TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
localStore.getTimeUnit(), Long.parseLong(topicConfig.get(RETENTION_MS)));
// Processing description:
// 2. With the groupByKey we group the data on the new key
// 3. With windowedBy we split up the data in time intervals depending on the provided LocalStore enum
// 4. With reduce we determine the maximum value in the time window
// 5. Materialized will make it stored in a table
stream.groupByKey()
.windowedBy(configuredTimeWindows)
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue), materialized);
}
private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long retentionMs) {
TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
timeWindows.until(retentionMs);
return timeWindows;
}
我希望增加实例数量会极大地减少消费者滞后。
因此,在此设置中涉及多个主题,例如: * original-sensor-data * error-score * kstream-joinother * kstream-jointhis * atl-mapped-original-sensor-data-repartition * atl-mapped-error-score -data-repartition * atl-joined-data-repartition
这个想法是将原始传感器数据与错误分数结合起来。重新生成密钥需要 atl-mapped-* 主题。然后加入将使用 kstream* 主题,最后作为加入的结果,atl-joined-data-repartition 被填充。之后聚合也会创建主题,但我现在将其排除在范围之外。
original-sensor-data
\
\
\ atl-mapped-original-sensor-data-repartition-- kstream-jointhis -\
/ atl-mapped-error-score-data-repartition -- kstream-joinother -\
/ \
error-score atl-joined-data-repartition
自从我介绍了联接和 atl 映射主题以来,增加实例数量似乎不再产生太大影响,我想知道这种拓扑是否有可能成为其自身的瓶颈。从消费者滞后来看,与 atl-mapped-* 主题相比,原始传感器数据和错误分数主题的消费者滞后似乎要小得多。有没有办法通过删除这些变更日志来解决这个问题,或者这是否会导致无法扩展?
解决方案
推荐阅读
- jena - fuseki 可以配置为使用两种不同类型的数据集来创建联合数据集吗?
- javascript - 如何清除 mapbox 地理编码器?
- jaxb - 将 Lombok 与 OpenCSV 和 JAXB 集成
- matrix - 八度矩阵上的替换元素
- delphi - 创建两个查询delphi之间的关系
- java - 在 scala-akka 框架中注释接收参与者变量
- java - SQLite 到 MS SQL 日期时间
- javascript - 文件类型可记录但未定义的属性在尝试从原型函数访问所述属性时返回
- saml - OKTA SAML 集成
- javascript - ol-ext:地图控制栏不显示在地图上