首页 > 解决方案 > 在 Apache Airflow 中使用 S3FileTransformOperator 上传文件时出错

问题描述

我正在将 XML 文件转换为 Parquet。这是相关代码:

文件 s3_test.py:

with DAG("s3-dag", default_args=default_args, schedule_interval= '@once') as dag:
    t1 = BashOperator(
        task_id='bash_test',
        bash_command='echo 1',
        dag=dag
    )
    transformer = S3FileTransformOperator(
        task_id='S3_ETL_OP',
        source_s3_key='s3://<my bucket>/origin/105.xml',
        dest_s3_key='s3://<my bucket>/s3/105.parquet',
        replace=False,
        transform_script='/usr/local/airflow/dags/scripts/transform.py',
        source_aws_conn_id='s3_connection',
        dest_aws_conn_id='s3_connection'
    )
    t1.set_upstream(transformer)

文件转换.py:

import sys
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from random import random
from operator import add

output=sys.argv[2]
input_file = sys.argv[1]

print("Starting data transformation...")
spark = SparkSession\
        .builder\
        .appName("XmlToParquet")\
        .getOrCreate()

df = spark.read.format("com.databricks.spark.xml") \
        .options(rowTag="PatientMatching") \
        .load(input_file)

print("Dataframe read")
df.repartition(1).write.mode('overwrite').parquet(output)
print("Completed data transformation!")

此代码正确读取我的 XML 并创建镶木地板文件。显然,它没有将文件上传到 S3,因为它将临时文件与文件夹混淆了。

[2020-10-26 18:22:13,819] {{s3_file_transform_operator.py:147}} INFO - Dataframe read
[2020-10-26 18:22:13,819] {{s3_file_transform_operator.py:147}} INFO - Completed data transformation!
[2020-10-26 18:22:14,667] {{s3_file_transform_operator.py:158}} INFO - Transform script successful. Output temporarily located at /tmp/tmpwma2zhp6
[2020-10-26 18:22:14,667] {{s3_file_transform_operator.py:161}} INFO - Uploading transformed file to S3
[2020-10-26 18:22:18,825] {{S3_hook.py:209}} INFO - Not Found
[2020-10-26 18:22:18,939] {{taskinstance.py:1150}} ERROR - [Errno 21] Is a directory: '/tmp/tmpwma2zhp6'
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/s3_file_transform_operator.py", line 166, in execute
    replace=self.replace
  File "/usr/local/lib/python3.7/site-packages/airflow/hooks/S3_hook.py", line 373, in load_file
    client.upload_file(filename, bucket_name, key, ExtraArgs=extra_args)
  File "/usr/local/lib/python3.7/site-packages/boto3/s3/inject.py", line 131, in upload_file
    extra_args=ExtraArgs, callback=Callback)

我真的不想修补 Airflow 代码。

有什么帮助吗?

标签: airflowparquet

解决方案


我刚刚用以下方法进行了测试,S3FileTransformOperator它成功了:

transformer = S3FileTransformOperator(
        task_id='S3_ETL_OP',
        source_s3_key='s3://<<bucket>>/tmp/test_input.csv',
        dest_s3_key='s3://<<bucket>>/tmp/test_output.csv',
        replace=False,
        transform_script='/bin/cp',
        source_aws_conn_id='s3-connection',
        dest_aws_conn_id='s3-connection'
    )

使用/bin/cp是最简单的脚本,实际上只是在两个临时位置之间复制并成功:

[2020-10-27 16:42:41,084] {s3_file_transform_operator.py:114} INFO - Downloading source S3 file s3://<<bucket>>/tmp/test_input.csv
[2020-10-27 16:42:41,650] {s3_file_transform_operator.py:123} INFO - Dumping S3 file s3://<<bucket>>/tmp/test_input.csv contents to local file /tmp/tmpbcogzsdk
[2020-10-27 16:42:41,847] {s3_file_transform_operator.py:144} INFO - Output:
[2020-10-27 16:42:41,851] {s3_file_transform_operator.py:157} INFO - Transform script successful. Output temporarily located at /tmp/tmp3cpn7gd_
[2020-10-27 16:42:41,851] {s3_file_transform_operator.py:160} INFO - Uploading transformed file to S3
[2020-10-27 16:42:42,070] {S3_hook.py:201} INFO - Not Found
[2020-10-27 16:42:43,025] {s3_file_transform_operator.py:167} INFO - Upload successful

我也建议尝试这种方法。否则,很高兴知道您正在运行哪个 Airflow 版本以及哪个 boto/boto3 包版本。


推荐阅读