首页 > 解决方案 > mapWithState 中的 stateSnapshots 按需

问题描述

我正在从 Kafka 流式传输数据(批处理间隔 10 秒),将 RDD 转换为 PairRDD,然后使用 mapWithState() 将 RDD 存储到状态中。下面是代码:

    JavaPairDStream<String, Object> transformedStream = stream
            .mapToPair(record -> new Tuple2<>(record.getKey(), record))
            .mapWithState(StateSpec.function(updateDataFuncGDM).numPartitions(32)).stateSnapshots();

transformedStream.foreachRDD(rdd -> {
        //if flag is true, put the RDD to a SQL table, and run a query to do some aggregations liek sum, avg etc
       // if flag is false, return;
        }

现在,我不断更新状态中的数据,并且在某个事件中,我将标志更改为 true,并将这些数据放入表中,然后进行计算。

这里的问题是,由于我在每批中都获得“stateSnapshots”,因此效率不高,并且 mapWithState 将大量数据保存在内存中,并且随着状态的增长,它会变得更糟。此外,由于 mapWithState 每 10 次迭代后检查点数据,因此需要大量时间,因为数据非常大。

我只想按需获取状态的 stateSnapshot(即仅在标志为真时 foreachRDD 的迭代中)但我没有找到很多方法来处理状态

标签: apache-spark-sqlspark-streaming

解决方案


推荐阅读