python - 无法写入从两个压缩数据帧创建的 PySpark 数据帧
问题描述
我正在尝试按照此处给出的示例来组合两个没有共享连接键的数据帧(通过数据库表或熊猫数据帧中的“索引”组合,除了 PySpark 没有这个概念):
我的代码
left_df = left_df.repartition(right_df.rdd.getNumPartitions()) # FWIW, num of partitions = 303
joined_schema = StructType(left_df.schema.fields + right_df.schema.fields)
interim_rdd = left_df.rdd.zip(right_df.rdd).map(lambda x: x[0] + x[1])
full_data = spark.createDataFrame(interim_rdd, joined_schema)
这一切似乎都很好。我在使用 DataBricks 时对其进行了测试,我可以毫无问题地运行上面的“单元”。但是当我去保存它时,我无法,因为它抱怨分区不匹配(???)。我已经确认分区的数量匹配,但您也可以在上面看到我明确确保它们匹配。我的保存命令:
full_data.write.parquet(my_data_path, mode="overwrite")
错误
我收到以下错误:
Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
我猜
我怀疑问题是,即使我匹配了分区数,但每个分区中的行数并不相同。但我不知道该怎么做。我只知道如何指定分区数,不知道分区的方式。
或者,更具体地说,如果没有我可以使用的列,我不知道如何指定如何分区。请记住,它们没有共享列。
我怎么知道我可以通过这种方式组合它们,而无需共享连接键?在这种情况下,这是因为我试图将模型预测与输入数据连接起来,但实际上我更普遍地遇到了这种情况,在不仅仅是模型数据 + 预测的情况下。
我的问题
- 特别是在上述情况下,如何正确设置分区以使其正常工作?
- 我应该如何通过行索引连接两个数据框?
- (我知道标准的反应是“你不应该......分区使索引变得毫无意义”,但在 Spark 创建不会像我在上面的链接中描述的那样强制数据丢失的 ML 库之前,这将始终是一个问题。)
解决方案
RDD 是旧帽子,但从这个角度回答错误。
来自拉筹伯大学http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#zip以下内容:
通过将任一分区的第 i 个相互组合来连接两个 RDD。生成的 RDD 将由双组件元组组成,这些元组通过 PairRDDFunctions 扩展提供的方法解释为键值对。
注意对。
这意味着您必须具有相同的分区器,其分区数和每个分区的 kv 数,否则上面的定义不成立。
最好从文件中读取,因为 repartition(n) 可能不会给出相同的分布。
解决这个问题的一个小技巧是使用 zipWithIndex 作为 k 的 k,v,就像这样(Scala 不是 pyspark 特定的方面):
val rddA = sc.parallelize(Seq(
("ICCH 1", 10.0), ("ICCH 2", 10.0), ("ICCH 4", 100.0), ("ICCH 5", 100.0)
))
val rddAA = rddA.zipWithIndex().map(x => (x._2, x._1)).repartition(5)
val rddB = sc.parallelize(Seq(
(10.0, "A"), (64.0, "B"), (39.0, "A"), (9.0, "C"), (80.0, "D"), (89.0, "D")
))
val rddBB = rddA.zipWithIndex().map(x => (x._2, x._1)).repartition(5)
val zippedRDD = (rddAA zip rddBB).map{ case ((id, x), (y, c)) => (id, x, y, c) }
zippedRDD.collect
然后 repartition(n) 似乎可以工作,因为 k 是相同的类型。
但是每个分区必须有相同的 num 个元素。它就是这样,但它是有道理的。
推荐阅读
- node.js - Lambda 容器会自动销毁 Postgres 连接吗?
- javascript - 在 Typescript 中,Object.prototype 函数可以返回子类型实例吗?
- html - 如何在两列之间有固定的间隔?
- swift - Firebase Observer 中存储内容的生命周期
- node.js - 将 req.user 传递给 graphQL
- android - 为什么来自 xml Web 服务的字符串在 Android TextView 中不显示新行?
- mongodb - 如何通过删除 mongoDB 中的时间来更新日期字段?
- iphone - iOS 12.1.2 Xcode 版本 10.0 (10A255) 无法加载构建到 iPhone 6
- javascript - Redux 状态未在 reducer 中按预期构建
- git - 列出 Git 存储库中特定用户更改的所有文件