java - Apache Spark 在完全分布式模式下对 Executors 采取行动
问题描述
我是 spark 新手,我对转型和行动的工作方式有基本的了解(指南)。我正在文本文件中的每一行(基本上是段落)上尝试一些 NLP 操作。处理后,应将结果发送到服务器(REST Api)进行存储。该程序在 10 个节点的集群上作为 spark 作业(使用 spark-submit 提交)yarn
运行。这就是我到目前为止所做的。
...
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<String> processedLines = lines
.map(line -> {
// processed here
return result;
});
processedLines.foreach(line -> {
// Send to server
});
这可行,但foreach
循环似乎是连续的,似乎它没有在工作节点上以分布式模式运行。我对么?
我尝试了以下代码,但它不起作用。错误:java: incompatible types: inferred type does not conform to upper bound(s)
。显然它是错误的,因为它map
是一种转变,而不是一种行动。
lines.map(line -> { /* processing */ })
.map(line -> { /* Send to server */ });
我也试过了take()
,但它需要int
并且processedLines.count()
是类型long
。
processedLines.take(processedLines.count()).forEach(pl -> { /* Send to server */ });
数据量很大(大于 100gb)。我想要的是处理和发送到服务器都应该在工作节点上完成。反抗中的处理部分map
发生在工作节点上。但是我如何将处理后的数据从工作节点发送到服务器,因为foreach
似乎在驱动程序中发生了顺序循环(如果我是正确的)。简单地说,如何action
在工作节点而不是驱动程序中执行。
任何帮助将不胜感激。
解决方案
foreach
是火花中的一个动作。它基本上采用 RDD 的每个元素并将函数应用于该元素。
foreach
在 executor 节点或 worker 节点上执行。它不会应用于驱动程序节点。请注意,在运行 spark 的本地执行模式下,驱动程序和执行程序节点可以驻留在同一个 JVM 上。
检查this以供参考foreach解释
您尝试映射 RDD 的每个元素然后应用于foreach
每个元素的方法看起来不错。我能想到为什么需要时间的原因是因为您正在处理的数据大小(~100GB)。
对此进行优化的一种方法是repartition
输入数据集。理想情况下,每个分区的大小应为 128MB,以获得更好的性能结果。您可以找到许多关于对数据进行重新分区的最佳实践的文章。我建议你关注他们,它会给性能带来一些好处。
您可以想到的第二个优化是您分配给每个执行程序节点的内存。它在进行火花调谐时起着非常重要的作用。
您能想到的第三个优化是批量对服务器的网络调用。您当前正在为 RDD 的每个元素对服务器进行网络调用。如果您的设计允许您批量处理这些网络调用,那么您可以在单个网络调用中发送多个元素。如果产生的延迟主要是由这些网络调用引起的,这也可能会有所帮助。
我希望这有帮助。
推荐阅读
- kotlin - 在 Kotlin 中向父类的方法中添加一些命令
- javascript - Javascript嵌套函数未定义
- django - 'put_object() 只接受关键字参数。'
- spring - Spring MVC 安装
- python - 使用 API Django Rest Framework 创建用户
- python - 使用找到我自己的模块的 dask 设置 PBS 集群时遇到问题
- c++ - (Windows C++) 如何在 FPS 游戏中模拟鼠标移动?
- c# - Process.Start() 工作但没有弹出窗口
- linux - 在二进制文件中搜索字符串
- python - 如何使用自定义层 Tensorflow2 保存自定义模型