apache-spark - 在 Spark 中读取 reduceByKey() 方法中的文件 - Java
问题描述
我正在开发一个 Spark 应用程序,该应用程序通过将相邻顶点添加到边缘来扩展边缘。我正在使用 Map/reduce 范例来划分边的总数并将它们扩展到不同的工作节点中。
为此,我需要根据键值读取工作节点中的分区相邻列表。但是在尝试在reduceByKey()方法中加载文件时出现错误。它说该任务不可序列化。我的代码:
public class MyClass implements Serializable{
public static void main(String args[]) throws IOException {
SparkConf conf = new SparkConf().setAppName("startingSpark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> file = sc.textFile("hdfs://localhost:9000/mainFile.txt");
... ... ... //Mapping done successfully
JavaPairRDD<String, String> rdd1 = pairs.reduceByKey(new Function2<String, String, String>() {
@Override
public String call(String v1, String v2) throws Exception {
... ... ...
JavaRDD <String> adj = sc.textFile("hdfs://localhost:9000/adjacencyList_"+key+"txt");
//Here I to expand the edges after reading the adjacency list.
}
}
但我收到一个错误任务不可序列化。引起:java.io.NotSerializableException:org.apache.spark.api.java.JavaSparkContext 序列化堆栈:-对象不可序列化。我认为这是因为我在工作节点中使用与驱动程序中相同的火花上下文。如果我尝试在reduceByKey()方法中创建一个新的 Spark 上下文,它也会给我一个错误,说只有一个 SparkContext 应该在这个 JVM 中运行。
谁能告诉我如何在 reduceByKey()方法中读取文件?还有其他方法可以完成我的任务吗?我希望扩展工作节点中的边缘,以便它们可以以分布式方式运行。
提前致谢。
解决方案
推荐阅读
- python - Python无缘无故地带来了空的数据框
- python - 未在类和单元测试中定义的用户函数
- python - 如何实现布局以将值解析为并获取文件作为回报?
- watson-discovery - 在 Watson Discovery 中进行了查询,尝试通过 nodejs sdk 进行复制,通道数组为空
- swift - 我的日期选择器中的文本显示在模拟器中,但在我的 ios14 手机上运行时没有
- android - 如何在不从 android 创建子节点的情况下将字符串值填充到实时 firebase 数据库?
- laravel - SQLSTATE [42S22]:未找到列:1054 '字段列表'中的未知列 'Update_Profile'(SQL:更新 `users` 设置 `updated_at` = 2021-01-11 00:01:39,
- asp.net-core-mvc - .net core razor 页面@Html.Display 用于显示随机数
- python - Pytorch CNN 不学习
- javascript - Plotly.js 曲面图数据格式化