python - 在 aws 胶水中运行的 pandas_udf 函数不会在没有打印功能的情况下将对象放入 s3
问题描述
这就是我想要做的。
SparkDataframe -> groupby -> 调用 pandas_udf 方法,使用 boto3 在 s3 中创建一些文件
pandas_udf 方法
# Pandas_UDF method
saveSNFile_schema = StructType([
StructField('cloc', StringType(), True), # chunk file location
StructField('aggloc', StringType(), True), # chunk file location
StructField('trackloc', StringType(), True), # chunk file location
StructField('sn',StringType(),True), # serial number
StructField('len',LongType(),True), # number of rows saved
])
@pandas_udf(saveSNFile_schema, PandasUDFType.GROUPED_MAP)
def saveSNFile(sndf):
"""
Save dataframe as a parquet file in S3
parameters:
sndf: pandas dataframe
return:
pandas dataframe
"""
fname = sndf.iloc[0]['field1']
sn = "%s" %(sndf.iloc[0]['tmp_sn'])
year = sndf.iloc[0]['partition_0']
month = sndf.iloc[0]['partition_1']
day = sndf.iloc[0]['partition_2']
# get the file name in Tier-3 to save the dataframe
chunkFileLocation, aggFileLocation, tradkingFileLocation = get_ChunkLocation(fname, sn, year, month, day)
csv_buffer = BytesIO()
# convert pandas dataframe to parquet format
sndf.to_parquet(csv_buffer, compression='gzip')
s3_resource = boto3.resource('s3')
# save the file to S3
s3_resource.Object("S3bucketName", chunkFileLocation).put(Body=csv_buffer.getvalue())
# create a tracking empty file
s3_resource.Object("S3bucketName", tradkingFileLocation).put()
# create empty file with SN as file name
s3_resource.Object("S3bucketName", "%s/aggregated_sn/%s" % (T3TrackingPrefix, sn)).put()
# delete the aggregated files since we have new files in chunks location
rval = s3_resource.Object("S3bucketName", aggFileLocation).delete()
# Return pandas dataframe with additional information as schema defined above
rdf = pd.DataFrame([[chunkFileLocation, aggFileLocation, tradkingFileLocation, sn, len(sndf)]], columns=[ 'cloc', 'aggloc', 'trackloc','sn', 'len'])
return rdf
调用 pandas_udf 方法
rdf = dataFromS3Df.groupby("tmp_sn").apply(saveSNFile)
print(rdf.show()) # <---- without this print method, udf function does not create any files in S3
通过print(rdf.show())
上述方法,该方法按预期工作
如果没有print(rdf.show())
udf 函数,则不会将任何 put 发送到 s3 存储桶。
这对我来说真的很奇怪。我也和aws支持谈过,他们不知道。
我知道 pandas_udf 方法在工作节点中运行,因此我必须在函数内创建到 s3 的连接,但奇怪的是,即使在我的笔记本运行之后,直到我对数据框(rdf
)进行一些操作,它也不会在 s3 中创建文件.
解决方案
rdf.collect()
解决问题。
推荐阅读
- r - 如何根据列的值对观察值重新编号
- javascript - 如何将 HTML/JS 小部件包含到 reactjs?
- java - 我正在尝试插入 sql 但它不断给我错误
- android - 如何在所有单选按钮可以交互的每个项目中使用一个单选按钮创建 Android RecyclerView?
- android - 共享元素转换后 Gif 不播放。滑翔 v 4.8.0
- javascript - 使用 pwa angular 在 ios 中打开新选项卡/窗口
- php - 如何修复此错误“无法将要求解析为可安装的软件包集。”
- opencv - 检测物体上的光反射的最可靠方法是什么
- ocr - 使用 pytesseract 加速 OCR
- google-cloud-firestore - Google Cloud Firestore - 如何对数组项进行 OR 查询