首页 > 解决方案 > 使用 PySpark 将每一行的每一列作为单独的文件写入 S3

问题描述

我有一个用例,程序需要将数据帧中的每一列作为单独的文件写入 S3 或 EMR 上的 HDFS。我正在对原始数据进行一些处理,输出数据框如下所示;

+------+--------------------+--------------------+--------------------+--------------------+
|    id|         processed_1|         processed_2|         processed_3|               error|
+------+--------------------+--------------------+--------------------+--------------------+
|324650|some processed data |some processed data | some processed data|                null|
+------+--------------------+--------------------+--------------------+--------------------+

对于 3 列processed_1, processed_2, processed_3,我想将每一行的每一列存储在一个单独的文件中。我有 10 万行已处理的数据。我尝试使用 UDF 和 Python 来做到这一点;

def writeToDisk(doc_id,error, processed_1, processed_2, processed_3):
    
    try:
        if error is None:
            with open(r'hdfs://processed_1.json'.format(doc_id),'w',encoding='utf-8') as f:
                f.write(processed_1)

            with open(r'hdfs://processed_2.json'.format(doc_id),'w') as f:
                f.write(processed_2)
                
            with open(r'hdfs://processed_3.json'.format(doc_id),'w') as f:
                f.write(processed_3)

            return "SUCCESS"
        
        else:
            error_prefix='{} - root - ERROR - '.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

            final_error_msg='{}{}'.format(error_prefix,error)

            with open(r'hdfs://error.log'.format(doc_id),'w') as f:
                f.write(unprocessed_html)

            
            return "SUCCESS"
    
    except Exception as e:

        with open(r'hdfs://error.log','w') as f:
            f.write("Failed : {}".format(str(e)))

        return "FAILED"

并将上述函数注册为udf并在as中使用;

store_data_udf = udf(writeToDisk, StringType())

stored_data = final_data.withColumn("store_results",store_data_udf("id","error","processed_1","processed_2","processed_3"))

上述方法不起作用。我不确定我在这里错过了什么。

任何关于如何完成这项任务的想法都将受到高度赞赏。

标签: pythonamazon-s3pysparkuser-defined-functionsamazon-emr

解决方案


您不能使用 python 写入文件函数写入 HDFS。相反,您可以创建 3 个具有所需列的单独数据帧并将其写入 hdfs/s3。

from pyspark.sql import SparkSession
from pyspark.sql.functions import  monotonically_increasing_id

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

file_1 = {"id": 1, "error": 20, 'processed_1': "test", 'processed_2': "test2", 'processed_3': "test3"}

file_2 = {"id": 2, "error": 30, 'processed_1': "test5", 'processed_2': "test6", 'processed_3': "test7"}

final_data = spark.read.json(sc.parallelize([file_1,file_2]))

df1=final_data.select("id","error","processed_1").withColumn("num", monotonically_increasing_id())
df2=final_data.select("id","error","processed_2").withColumn("num", monotonically_increasing_id())
df3=final_data.select("id","error","processed_3").withColumn("num", monotonically_increasing_id())

df1.coalesce(1).write.partitionBy("num").parquet("df1/")
df2.coalesce(1).write.partitionBy("num").parquet("df2/")
df3.coalesce(1).write.partitionBy("num").parquet("df3/")



推荐阅读