apache-spark - 是否可以在不使用累加器的情况下创建可变共享数据结构?
问题描述
我是新来的火花,有些东西对我来说很不清楚。但基本知识表明,只有累加器是可变变量,可以跨执行器更新,并且它的值可以由驱动程序检索。在代码中初始化的任何其他变量,在执行程序之间更新,更新的值不会中继回驱动程序,因为它们是单独的 JVM。
我正在从事一个项目的一部分,该项目将来自 Zookeeper 的偏移量存储在数据结构中以供将来使用。由于偏移量是在执行程序上获得的,因此几乎不可能拥有一个共享数据结构,它将每个分区的偏移量也更新回驱动程序。直到我在https://spark.apache.org/中遇到此代码docs/2.3.0/streaming-kafka-0-8-integration.html。
AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
directKafkaStream.transformToPair(rdd -> {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets); return rdd;
}).map(
...
).foreachRDD(rdd -> { for (OffsetRange o : offsetRanges.get()) {
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
);}
...
});
System.out.println(Arrays.toString(offsetRanges.get()));
这与基本理论相矛盾,因为当我访问 AtomicReference<OffsetRange[]> offsetRanges
驱动程序中的值时,我得到了正确的更新值(transformToPair
在执行程序代码中的方法中更新),即使它应该返回一个 null 或空响应。请有人可以解释一下这种行为吗?
解决方案
是否可以在不使用累加器的情况下创建可变共享数据结构?
不。
这与基本理论相矛盾,因为当我访问
它没有,因为该值未在驱动程序外部修改。关闭transformToPair
是在驱动程序上执行的,而不是在执行程序上执行的。
因此offsetRanges.set(offsets)
在原始offsetRanges
值所在的同一个 JVM 上执行。
推荐阅读
- java - 当倒计时为 0 时开始一个新的活动
- sql-server - C# SQL 命令查询产生“无法绑定”错误
- ssl - 从 Base64 编码的字符串创建 JKS 文件
- python - 重命名 Excel 选项卡
- excel - 在 Excel/VBA 中设置默认图表参数
- javascript - js getHours() 返回时间提前 +5 小时
- vba - 当我在 Outlook 中选择“运行脚本”规则时,如何将 ThisOutlookSession 中的代码保存为我可以选择的脚本
- python - tensorflow_hub 将 BERT 嵌入到 Windows 机器上
- python - 从命令行运行 Eclipse IDE Python 项目
- kubernetes - 自动配置的节点池未清理