首页 > 解决方案 > 使用变更日志会导致应用程序本身出现瓶颈吗?

问题描述

我有一个 spring cloud kafka 流应用程序,它重新输入数据以能够加入两个主题,selectkeys,mapvalues 和聚合数据。随着时间的推移,消费者的滞后似乎在增加,通过添加应用程序的多个实例来扩展并没有一点帮助。在每一个例子中,消费者的滞后似乎都在增加。

我将实例从 1 放大和缩小到 18,但没有发现大的差异。它落后的消息数量,每 5 秒不断增加,与实例数量无关

KStream<String, MappedOriginalSensorData> flattenedOriginalData = originalData
                .flatMap(flattenOriginalData())
                .through("atl-mapped-original-sensor-data-repartition", Produced.with(Serdes.String(), new MappedOriginalSensorDataSerde()));


        //#2. Save modelid and algorithm parts of the key of the errorscore topic and reduce the key
        //    to installationId:assetId:tagName
        //Repartition ahead of time avoiding multiple repartition topics and thereby duplicating data
        KStream<String, MappedErrorScoreData> enrichedErrorData = errorScoreData
                .map(enrichWithModelAndAlgorithmAndReduceKey())
                .through("atl-mapped-error-score-data-repartition", Produced.with(Serdes.String(), new MappedErrorScoreDataSerde()));


        return enrichedErrorData
                //#3. Join
                .join(flattenedOriginalData, join(),
                        JoinWindows.of(
                                // allow messages within one second to be joined together based on their timestamp
                                Duration.ofMillis(1000).toMillis())
                                // configure the retention period of the local state store involved in this join
                                .until(Long.parseLong(retention)),
                        Joined.with(
                                Serdes.String(),
                                new MappedErrorScoreDataSerde(),
                                new MappedOriginalSensorDataSerde()))
                //#4. Set instalation:assetid:modelinstance:algorithm::tag key back
                .selectKey((k,v) -> v.getOriginalKey())
                //#5. Map to ErrorScore (basically removing the originalKey field)
                .mapValues(removeOriginalKeyField())
                .through("atl-joined-data-repartition");

然后是聚合部分:

        Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> materialized = Materialized
                .as(localStore.getStoreName());

        // Set retention of changelog topic
        materialized.withLoggingEnabled(topicConfig);

        // Configure how windows looks like and how long data will be retained in local stores
        TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
                localStore.getTimeUnit(), Long.parseLong(topicConfig.get(RETENTION_MS)));

        // Processing description:
        // 2. With the groupByKey we group  the data on the new key
        // 3. With windowedBy we split up the data in time intervals depending on the provided LocalStore enum
        // 4. With reduce we determine the maximum value in the time window
        // 5. Materialized will make it stored in a table
        stream.groupByKey()
        .windowedBy(configuredTimeWindows)
        .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue), materialized);
    }

    private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long retentionMs) {
        TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
        timeWindows.until(retentionMs);
        return timeWindows;
    }

我希望增加实例数量会极大地减少消费者滞后。

因此,在此设置中涉及多个主题,例如: * original-sensor-data * error-score * kstream-joinother * kstream-jointhis * atl-mapped-original-sensor-data-repartition * atl-mapped-error-score -data-repartition * atl-joined-data-repartition

这个想法是将原始传感器数据与错误分数结合起来。重新生成密钥需要 atl-mapped-* 主题。然后加入将使用 kstream* 主题,最后作为加入的结果,atl-joined-data-repartition 被填充。之后聚合也会创建主题,但我现在将其排除在范围之外。

original-sensor-data 
\
 \
  \   atl-mapped-original-sensor-data-repartition-- kstream-jointhis -\
  /   atl-mapped-error-score-data-repartition    -- kstream-joinother -\
 /                                                                      \ 
error-score                                          atl-joined-data-repartition 

自从我介绍了联接和 atl 映射主题以来,增加实例数量似乎不再产生太大影响,我想知道这种拓扑是否有可能成为其自身的瓶颈。从消费者滞后来看,与 atl-mapped-* 主题相比,原始传感器数据和错误分数主题的消费者滞后似乎要小得多。有没有办法通过删除这些变更日志来解决这个问题,或者这是否会导致无法扩展?

标签: apache-kafka-streams

解决方案


推荐阅读