首页 > 解决方案 > 是否可以在不使用累加器的情况下创建可变共享数据结构?

问题描述

我是新来的火花,有些东西对我来说很不清楚。但基本知识表明,只有累加器是可变变量,可以跨执行器更新,并且它的值可以由驱动程序检索。在代码中初始化的任何其他变量,在执行程序之间更新,更新的值不会中继回驱动程序,因为它们是单独的 JVM。

我正在从事一个项目的一部分,该项目将来自 Zookeeper 的偏移量存储在数据结构中以供将来使用。由于偏移量是在执行程序上获得的,因此几乎不可能拥有一个共享数据结构,它将每个分区的偏移量也更新回驱动程序。直到我在https://spark.apache.org/中遇到此代码docs/2.3.0/streaming-kafka-0-8-integration.html

AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
directKafkaStream.transformToPair(rdd -> { 
    OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    offsetRanges.set(offsets);    return rdd;
}).map(
    ...
    ).foreachRDD(rdd -> {    for (OffsetRange o : offsetRanges.get()) {
        System.out.println(
            o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
        );}    
        ...
    });
System.out.println(Arrays.toString(offsetRanges.get()));

这与基本理论相矛盾,因为当我访问 AtomicReference<OffsetRange[]> offsetRanges驱动程序中的值时,我得到了正确的更新值(transformToPair在执行程序代码中的方法中更新),即使它应该返回一个 null 或空响应。请有人可以解释一下这种行为吗?

标签: apache-sparkspark-streaming

解决方案


是否可以在不使用累加器的情况下创建可变共享数据结构?

不。

这与基本理论相矛盾,因为当我访问

它没有,因为该值未在驱动程序外部修改。关闭transformToPair是在驱动程序上执行的,而不是在执行程序上执行的。

因此offsetRanges.set(offsets)在原始offsetRanges值所在的同一个 JVM 上执行。


推荐阅读