首页 > 解决方案 > 在 aws 胶水中运行的 pandas_udf 函数不会在没有打印功能的情况下将对象放入 s3

问题描述

这就是我想要做的。

SparkDataframe -> groupby -> 调用 pandas_udf 方法,使用 boto3 在 s3 中创建一些文件

pandas_udf 方法

# Pandas_UDF method
saveSNFile_schema = StructType([
    StructField('cloc', StringType(), True), # chunk file location
    StructField('aggloc', StringType(), True), # chunk file location
    StructField('trackloc', StringType(), True), # chunk file location
    StructField('sn',StringType(),True), # serial number
    StructField('len',LongType(),True), # number of rows saved
])
@pandas_udf(saveSNFile_schema, PandasUDFType.GROUPED_MAP)
def saveSNFile(sndf):
    """
        Save dataframe as a parquet file in S3

    parameters:
        sndf: pandas dataframe 

    return:
        pandas dataframe
    """

    fname = sndf.iloc[0]['field1']
    sn = "%s" %(sndf.iloc[0]['tmp_sn'])
    year = sndf.iloc[0]['partition_0']
    month = sndf.iloc[0]['partition_1']
    day = sndf.iloc[0]['partition_2']

    # get the file name in Tier-3 to save the dataframe
    chunkFileLocation, aggFileLocation, tradkingFileLocation = get_ChunkLocation(fname, sn, year, month, day)

    csv_buffer = BytesIO()
    # convert pandas dataframe to parquet format
    sndf.to_parquet(csv_buffer, compression='gzip')
    s3_resource = boto3.resource('s3')

    # save the file to S3
    s3_resource.Object("S3bucketName", chunkFileLocation).put(Body=csv_buffer.getvalue())

    # create a tracking empty file
    s3_resource.Object("S3bucketName", tradkingFileLocation).put()

    # create empty file with SN as file name
    s3_resource.Object("S3bucketName", "%s/aggregated_sn/%s" % (T3TrackingPrefix, sn)).put()

    # delete the aggregated files since we have new files in chunks location 
    rval = s3_resource.Object("S3bucketName", aggFileLocation).delete()

    # Return pandas dataframe with additional information as schema defined above 
    rdf = pd.DataFrame([[chunkFileLocation, aggFileLocation, tradkingFileLocation, sn, len(sndf)]], columns=[ 'cloc', 'aggloc', 'trackloc','sn', 'len'])


    return rdf

调用 pandas_udf 方法

rdf = dataFromS3Df.groupby("tmp_sn").apply(saveSNFile)
print(rdf.show()) # <---- without this print method, udf function does not create any files in S3

通过print(rdf.show())上述方法,该方法按预期工作

如果没有print(rdf.show())udf 函数,则不会将任何 put 发送到 s3 存储桶。

这对我来说真的很奇怪。我也和aws支持谈过,他们不知道。

我知道 pandas_udf 方法在工作节点中运行,因此我必须在函数内创建到 s3 的连接,但奇怪的是,即使在我的笔记本运行之后,直到我对数据框(rdf)进行一些操作,它也不会在 s3 中创建文件.

标签: pythonamazon-web-servicesamazon-s3user-defined-functionsaws-glue

解决方案


rdf.collect()解决问题。


推荐阅读