首页 > 解决方案 > Apache Spark 在完全分布式模式下对 Executors 采取行动

问题描述

我是 spark 新手,我对转型和行动的工作方式有基本的了解(指南)。我正在文本文件中的每一行(基本上是段落)上尝试一些 NLP 操作。处理后,应将结果发送到服务器(REST Api)进行存储。该程序在 10 个节点的集群上作为 spark 作业(使用 spark-submit 提交)yarn运行。这就是我到目前为止所做的。

...
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<String> processedLines = lines
    .map(line -> {
        // processed here
        return result;
    });
processedLines.foreach(line -> {
    // Send to server
});

这可行,但foreach循环似乎是连续的,似乎它没有在工作节点上以分布式模式运行。我对么?

我尝试了以下代码,但它不起作用。错误:java: incompatible types: inferred type does not conform to upper bound(s)。显然它是错误的,因为它map是一种转变,而不是一种行动。

lines.map(line -> { /* processing */ })
     .map(line -> { /* Send to server */ });

我也试过了take(),但它需要int并且processedLines.count()是类型long

processedLines.take(processedLines.count()).forEach(pl -> { /* Send to server */ });

数据量很大(大于 100gb)。我想要的是处理和发送到服务器都应该在工作节点上完成。反抗中的处理部分map发生在工作节点上。但是我如何将处理后的数据从工作节点发送到服务器,因为foreach似乎在驱动程序中发生了顺序循环(如果我是正确的)。简单地说,如何action在工作节点而不是驱动程序中执行。

任何帮助将不胜感激。

标签: javaapache-sparkactionrddtransformation

解决方案


foreach是火花中的一个动作。它基本上采用 RDD 的每个元素并将函数应用于该元素。

foreach在 executor 节点或 worker 节点上执行。它不会应用于驱动程序节点。请注意,在运行 spark 的本地执行模式下,驱动程序和执行程序节点可以驻留在同一个 JVM 上。

检查this以供参考foreach解释

您尝试映射 RDD 的每个元素然后应用于foreach每个元素的方法看起来不错。我能想到为什么需要时间的原因是因为您正在处理的数据大小(~100GB)。

对此进行优化的一种方法是repartition输入数据集。理想情况下,每个分区的大小应为 128MB,以获得更好的性能结果。您可以找到许多关于对数据进行重新分区的最佳实践的文章。我建议你关注他们,它会给性能带来一些好处。

您可以想到的第二个优化是您分配给每个执行程序节点的内存。它在进行火花调谐时起着非常重要的作用。

您能想到的第三个优化是批量对服务器的网络调用。您当前正在为 RDD 的每个元素对服务器进行网络调用。如果您的设计允许您批量处理这些网络调用,那么您可以在单个网络调用中发送多个元素。如果产生的延迟主要是由这些网络调用引起的,这也可能会有所帮助。

我希望这有帮助。


推荐阅读