scala - 在 Spark Scala 中注册前一个 DF 后创建一个 DF
问题描述
我是 Spark Scala 的新开发人员,我想问你我的问题。
我有两个巨大的数据框,我的第二个数据框是从第一个数据框计算出来的(它包含与第一个数据框不同的列)。
为了优化我的代码,我想到了这种方法:
- 在 HDFS 中将我的第一个数据帧注册为 .csv 文件
- 然后只需读取此 .csv 文件即可计算第二个数据帧。
所以,它写了这个:
//val temp1 is my first DF
writeAsTextFileAndMerge("result1.csv", "/user/result", temp1, spark.sparkContext.hadoopConfiguration)
val temp2 = spark.read.options(Map("header" -> "true", "delimiter" -> ";"))
.csv("/user/result/result1.csv").select("ID").distinct
writeAsTextFileAndMerge("result2.csv", "/user/result",
temp2, spark.sparkContext.hadoopConfiguration)
这是我的保存功能:
def writeAsTextFileAndMerge(fileName: String, outputPath: String, df: DataFrame, conf: Configuration) {
val sourceFile = WorkingDirectory
df.write.options(Map("header" -> "true", "delimiter" -> ";")).mode("overwrite").csv(sourceFile)
merge(fileName, sourceFile, outputPath, conf)
}
def merge(fileName: String, srcPath: String, dstPath: String, conf: Configuration) {
val hdfs = FileSystem.get(conf)
val destinationPath = new Path(dstPath)
if (!hdfs.exists(destinationPath))
hdfs.mkdirs(destinationPath)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName),
true, conf, null)
}
对我来说这似乎是“合乎逻辑的”,但我这样做时出错了。我想 Spark 不可能“等待”直到在 HDFS 中注册我的第一个 DF 并在读取这个新文件之后(或者我的保存功能可能有一些错误?)。
这是我得到的例外:
19/02/16 17:27:56 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.ArrayIndexOutOfBoundsException: 1
java.lang.ArrayIndexOutOfBoundsException: 1
你能帮我解决这个问题吗?
解决方案
问题在于合并 - Spark 不知道,因此与您正在进行的所有 HDFS 操作不同步。
好消息是您不需要这样做。只需执行 df.write,然后使用读取创建一个新的数据帧(spark 会将所有部分读入单个 df)
即以下将工作得很好
temp1.write.options(Map("header" -> "true", "delimiter" -> ";")).mode("overwrite").csv("/user/result/result1.csv")
val temp2 = spark.read.options(Map("header" -> "true", "delimiter" -> ";"))
.csv("/user/result/result1.csv").select("ID").distinct
推荐阅读
- quickjs - QuickJS:由于修改函数原型可能导致内存泄漏
- r - 根据其他两列(纬度、经度)中的值添加唯一 ID 列
- python - 如何使用 Twitch api python 检查是否有人在 twitch 上直播
- recursion - 在 Prolog 中结束递归
- android - 从 Material Design TextInputEditText 的布局 XML 调用 onTextChanged 处理程序
- python - 运行 FastAPI 的 docker img 时出现 ModuleNotFoundError
- statistics - 来自 TTestIndPower(Python) 与 Gpower/R 的比例测试的功率/样本大小估计
- java - base64 编码的签名属性 DER 结构中的消息摘要
- python - 具有生成中间带有通配符的回文字符串的功能。需要帮助来改进它
- c++ - 为什么 C 语言不是面向对象的语言,即使它具有与 C++ 的 Class 关键字相同的(结构)struct 关键字?