python - 使用 PySpark 将每一行的每一列作为单独的文件写入 S3
问题描述
我有一个用例,程序需要将数据帧中的每一列作为单独的文件写入 S3 或 EMR 上的 HDFS。我正在对原始数据进行一些处理,输出数据框如下所示;
+------+--------------------+--------------------+--------------------+--------------------+
| id| processed_1| processed_2| processed_3| error|
+------+--------------------+--------------------+--------------------+--------------------+
|324650|some processed data |some processed data | some processed data| null|
+------+--------------------+--------------------+--------------------+--------------------+
对于 3 列processed_1
, processed_2
, processed_3
,我想将每一行的每一列存储在一个单独的文件中。我有 10 万行已处理的数据。我尝试使用 UDF 和 Python 来做到这一点;
def writeToDisk(doc_id,error, processed_1, processed_2, processed_3):
try:
if error is None:
with open(r'hdfs://processed_1.json'.format(doc_id),'w',encoding='utf-8') as f:
f.write(processed_1)
with open(r'hdfs://processed_2.json'.format(doc_id),'w') as f:
f.write(processed_2)
with open(r'hdfs://processed_3.json'.format(doc_id),'w') as f:
f.write(processed_3)
return "SUCCESS"
else:
error_prefix='{} - root - ERROR - '.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
final_error_msg='{}{}'.format(error_prefix,error)
with open(r'hdfs://error.log'.format(doc_id),'w') as f:
f.write(unprocessed_html)
return "SUCCESS"
except Exception as e:
with open(r'hdfs://error.log','w') as f:
f.write("Failed : {}".format(str(e)))
return "FAILED"
并将上述函数注册为udf并在as中使用;
store_data_udf = udf(writeToDisk, StringType())
stored_data = final_data.withColumn("store_results",store_data_udf("id","error","processed_1","processed_2","processed_3"))
上述方法不起作用。我不确定我在这里错过了什么。
任何关于如何完成这项任务的想法都将受到高度赞赏。
解决方案
您不能使用 python 写入文件函数写入 HDFS。相反,您可以创建 3 个具有所需列的单独数据帧并将其写入 hdfs/s3。
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
file_1 = {"id": 1, "error": 20, 'processed_1': "test", 'processed_2': "test2", 'processed_3': "test3"}
file_2 = {"id": 2, "error": 30, 'processed_1': "test5", 'processed_2': "test6", 'processed_3': "test7"}
final_data = spark.read.json(sc.parallelize([file_1,file_2]))
df1=final_data.select("id","error","processed_1").withColumn("num", monotonically_increasing_id())
df2=final_data.select("id","error","processed_2").withColumn("num", monotonically_increasing_id())
df3=final_data.select("id","error","processed_3").withColumn("num", monotonically_increasing_id())
df1.coalesce(1).write.partitionBy("num").parquet("df1/")
df2.coalesce(1).write.partitionBy("num").parquet("df2/")
df3.coalesce(1).write.partitionBy("num").parquet("df3/")
推荐阅读
- android - 如何在 Android Auto 媒体应用中显示播放头位置/进度指示器?
- python - 如何有效地将数据框中的 pd.Series 列表更改为 np.arrays 的 pd.Series
- influxdb - InfluxDB如何查询每n个值
- java - 双向链表没有导入
- kubernetes - 气流没有名为“kubernetes”的模块
- javascript - 使用 ffmpeg node.js 的控制台日志流信息
- javascript - onchange 事件函数显示未定义(Javascript-HTML-DOM)
- python-3.x - Jupyter Lab - 内核保持“重新连接”
- python - 创建仅包含最大公共组对的字典
- enums - 派生特定变体的特征