首页 > 解决方案 > 在 Spark 中读取 reduceByKey() 方法中的文件 - Java

问题描述

我正在开发一个 Spark 应用程序,该应用程序通过将相邻顶点添加到边缘来扩展边缘。我正在使用 Map/reduce 范例来划分边的总数并将它们扩展到不同的工作节点中。

为此,我需要根据键值读取工作节点中的分区相邻列表。但是在尝试在reduceByKey()方法中加载文件时出现错误。它说该任务不可序列化。我的代码:

public class MyClass implements Serializable{
    public static void main(String args[]) throws IOException {     
       SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
       JavaSparkContext sc = new JavaSparkContext(conf);               
       JavaRDD<String> file = sc.textFile("hdfs://localhost:9000/mainFile.txt");
       ... ... ... //Mapping done successfully
       JavaPairRDD<String, String> rdd1 = pairs.reduceByKey(new Function2<String, String, String>() {
       @Override
       public String call(String v1, String v2) throws Exception {
          ... ... ...
          JavaRDD <String> adj = sc.textFile("hdfs://localhost:9000/adjacencyList_"+key+"txt");
          //Here I to expand the edges after reading the adjacency list.
        }
    }

但我收到一个错误任务不可序列化。引起:java.io.NotSerializableException:org.apache.spark.api.java.JavaSparkContext 序列化堆栈:-对象不可序列化。我认为这是因为我在工作节点中使用与驱动程序中相同的火花上下文。如果我尝试在reduceByKey()方法中创建一个新的 Spark 上下文,它也会给我一个错误,说只有一个 SparkContext 应该在这个 JVM 中运行

谁能告诉我如何在 reduceByKey()方法中读取文件?还有其他方法可以完成我的任务吗?我希望扩展工作节点中的边缘,以便它们可以以分布式方式运行。

提前致谢。

标签: apache-sparkapache-spark-sqlspark-streaming

解决方案


推荐阅读