apache-spark - 在哪种情况下,来自驱动程序节点的对象被序列化并发送到 apache spark 中的工作节点
问题描述
假设我声明了一个变量,并在 spark 的 map/filter 函数中使用它。对于map/filter的值的每个操作,我上面声明的变量是否每次都从驱动程序发送到工作人员。
我的helloVariable是否针对 consumerRecords 的每个值发送到工作节点?如果是如何避免呢?
String helloVariable = "hello testing"; //or some config/json object
JavaDStream<String> javaDStream = consumerRecordJavaInputDStream.map(
consumerRecord -> {
return consumerRecord.value()+" --- "+helloVariable;
} );
解决方案
是的。当您通常将函数传递给 Spark 时,例如 map() 或 filter(),这些函数可以使用在驱动程序中定义在它们之外的变量,但是在集群上运行的每个任务都会获得每个变量的新副本(使用序列化并通过网络发送),并且来自这些副本的更新不会传播回驱动程序。
所以这个场景的常见情况是使用广播变量。
广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务一起发送它的副本。如果您对广播机制感兴趣,可以在这里阅读一个非常好的简短说明。
根据 Spark 文档,这个过程可以用图形表示,如下所示:
例如,可以使用广播变量以有效的方式为每个节点提供大型数据集(例如,具有关键字列表的字典)的副本。Spark 还尝试使用高效的广播算法来分发广播变量,以降低通信成本。
因此,在您的情况下,您的代码可能如下所示:
Broadcast<String> broadcastVar = sc.broadcast("hello testing");
JavaDStream<String> javaDStream = consumerRecordJavaInputDStream.map(
consumerRecord -> {
return consumerRecord.value() + " --- " + broadcastVar.value();
});
推荐阅读
- javascript - 在html中打开仪表板主页时如何显示空视图?
- node.js - 在任何 npm install 命令之后挂钩(特别是安装后)
- javascript - Vuex 操作中非常奇怪的 console.log 行为
- c# - 如何获取 C# 方法调用中执行的行数?
- scala - Simulacrum:找不到宏实现
- dart - 如何在 Dart 中导入本地创建的包?
- orientdb - OrientDB:如何在 orientdb 中按节点过滤 Edge
- angular - “错误:没有 Sugerir 的提供者!” 在 ionic 3 打字稿/角度应用程序中
- javascript - 在输入文本中使用粘贴时数据过滤器不起作用
- c# - 使用数组进行独特行程选择的最佳性能算法?