python - 用修改后的 PySpark DataFrame 覆盖现有 Parquet 数据集
问题描述
用例是将列附加到 Parquet 数据集,然后在同一位置有效地重写。这是一个最小的例子。
创建一个pandas
DataFrame 并作为分区 Parquet 数据集写入。
import pandas as pd
df = pd.DataFrame({
'id': ['a','a','a','b','b','b','b','c','c'],
'value': [0,1,2,3,4,5,6,7,8]})
path = r'c:/data.parquet'
df.to_parquet(path=path, engine='pyarrow', compression='snappy', index=False, partition_cols=['id'], flavor='spark')
然后将 Parquet 数据集加载为pyspark
视图,并将修改后的数据集创建为pyspark
DataFrame。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.parquet(path).createTempView('data')
sf = spark.sql(f"""SELECT id, value, 0 AS segment FROM data""")
此时sf
数据与数据相同,df
但有一个全为零的附加segment
列。我想有效地将现有的 Parquet 数据集覆盖path
为sf
同一位置的 Parquet 数据集。下面是什么不起作用。也不想写入sf
新位置,删除旧 Parquet 数据集,重命名似乎效率不高。
# saves existing data and new data
sf.write.partitionBy('id').mode('append').parquet(path)
# immediately deletes existing data then crashes
sf.write.partitionBy('id').mode('overwrite').parquet(path)
解决方案
简而言之,我的回答是:你不应该:\
大数据的一个原则(spark 是针对大数据的)是永远不要覆盖东西。当然,存在.mode('overwrite')
,但这不是正确的用法。
我对它为什么会(应该)失败的猜测:
- 您添加一列,因此写入的数据集具有与当前存储在那里的格式不同的格式。这可能会造成架构混乱
- 您在处理时覆盖输入数据。所以 spark 读取一些行,处理它们并覆盖输入文件。但是这些文件仍然是其他行处理的输入。
在这种情况下,我通常做的是创建另一个数据集,当没有理由保留旧数据集时(即处理完全完成时),清理它。要删除文件,您可以查看这篇关于如何删除 hdfs 文件的帖子。它应该适用于 spark 可访问的所有文件。但是它在scala中,所以我不确定它是否可以适应pyspark。
请注意,效率不是重写的好理由,它比简单地编写更多的工作。
推荐阅读
- ansible - Ansible 任务 - 我如何循环所有预定义的变量以创建这种格式的 linux 组?
- python - 在 NumPy 中使用掩码编写有效的代码以应用反向操作
- javascript - Jest 确实在等待 promise 解决
- node.js - 使用“firebase serve”命令在 Firebase 上部署 Nodejs 应用程序(使用 typecrpt 而非 javascript 开发)部署问题
- r - 将 DataFrame 列转换为因子
- java - 远程调试 - Java
- c# - 为什么我不能在 switch 表达式中使用我的枚举类型?
- haskell - 用于在元组中构建列表的有状态递归
- python - itertools cycle() 似乎没有开始索引。什么是让循环从 0 以外的任意索引开始的好方法?
- mysql - 如何检测网站视图中的点击农场