airflow - 在 Apache Airflow 中使用 S3FileTransformOperator 上传文件时出错
问题描述
我正在将 XML 文件转换为 Parquet。这是相关代码:
文件 s3_test.py:
with DAG("s3-dag", default_args=default_args, schedule_interval= '@once') as dag:
t1 = BashOperator(
task_id='bash_test',
bash_command='echo 1',
dag=dag
)
transformer = S3FileTransformOperator(
task_id='S3_ETL_OP',
source_s3_key='s3://<my bucket>/origin/105.xml',
dest_s3_key='s3://<my bucket>/s3/105.parquet',
replace=False,
transform_script='/usr/local/airflow/dags/scripts/transform.py',
source_aws_conn_id='s3_connection',
dest_aws_conn_id='s3_connection'
)
t1.set_upstream(transformer)
文件转换.py:
import sys
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from random import random
from operator import add
output=sys.argv[2]
input_file = sys.argv[1]
print("Starting data transformation...")
spark = SparkSession\
.builder\
.appName("XmlToParquet")\
.getOrCreate()
df = spark.read.format("com.databricks.spark.xml") \
.options(rowTag="PatientMatching") \
.load(input_file)
print("Dataframe read")
df.repartition(1).write.mode('overwrite').parquet(output)
print("Completed data transformation!")
此代码正确读取我的 XML 并创建镶木地板文件。显然,它没有将文件上传到 S3,因为它将临时文件与文件夹混淆了。
[2020-10-26 18:22:13,819] {{s3_file_transform_operator.py:147}} INFO - Dataframe read
[2020-10-26 18:22:13,819] {{s3_file_transform_operator.py:147}} INFO - Completed data transformation!
[2020-10-26 18:22:14,667] {{s3_file_transform_operator.py:158}} INFO - Transform script successful. Output temporarily located at /tmp/tmpwma2zhp6
[2020-10-26 18:22:14,667] {{s3_file_transform_operator.py:161}} INFO - Uploading transformed file to S3
[2020-10-26 18:22:18,825] {{S3_hook.py:209}} INFO - Not Found
[2020-10-26 18:22:18,939] {{taskinstance.py:1150}} ERROR - [Errno 21] Is a directory: '/tmp/tmpwma2zhp6'
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/operators/s3_file_transform_operator.py", line 166, in execute
replace=self.replace
File "/usr/local/lib/python3.7/site-packages/airflow/hooks/S3_hook.py", line 373, in load_file
client.upload_file(filename, bucket_name, key, ExtraArgs=extra_args)
File "/usr/local/lib/python3.7/site-packages/boto3/s3/inject.py", line 131, in upload_file
extra_args=ExtraArgs, callback=Callback)
我真的不想修补 Airflow 代码。
有什么帮助吗?
解决方案
我刚刚用以下方法进行了测试,S3FileTransformOperator
它成功了:
transformer = S3FileTransformOperator(
task_id='S3_ETL_OP',
source_s3_key='s3://<<bucket>>/tmp/test_input.csv',
dest_s3_key='s3://<<bucket>>/tmp/test_output.csv',
replace=False,
transform_script='/bin/cp',
source_aws_conn_id='s3-connection',
dest_aws_conn_id='s3-connection'
)
使用/bin/cp
是最简单的脚本,实际上只是在两个临时位置之间复制并成功:
[2020-10-27 16:42:41,084] {s3_file_transform_operator.py:114} INFO - Downloading source S3 file s3://<<bucket>>/tmp/test_input.csv
[2020-10-27 16:42:41,650] {s3_file_transform_operator.py:123} INFO - Dumping S3 file s3://<<bucket>>/tmp/test_input.csv contents to local file /tmp/tmpbcogzsdk
[2020-10-27 16:42:41,847] {s3_file_transform_operator.py:144} INFO - Output:
[2020-10-27 16:42:41,851] {s3_file_transform_operator.py:157} INFO - Transform script successful. Output temporarily located at /tmp/tmp3cpn7gd_
[2020-10-27 16:42:41,851] {s3_file_transform_operator.py:160} INFO - Uploading transformed file to S3
[2020-10-27 16:42:42,070] {S3_hook.py:201} INFO - Not Found
[2020-10-27 16:42:43,025] {s3_file_transform_operator.py:167} INFO - Upload successful
我也建议尝试这种方法。否则,很高兴知道您正在运行哪个 Airflow 版本以及哪个 boto/boto3 包版本。
推荐阅读
- c# - 代码=403,ACCESS_REFUSED
- php - 当我尝试使用 PHP 变量进行查询时,PDO 查询返回 bool(false)
- laravel - 连接到新服务器上的数据库 SQLSTATE[HY000] [1045]
- regex - 是否可以使用正则表达式将一行添加到文件中的特定行组中?
- button - 使用树莓派将 GPIO 输入映射到键盘键,使用程序集
- keras - Keras Conv2D 参数顺序
- corda - 在 Corda 中,根据 Java 中的自定义模式字段查询状态
- css - 表单的 translateY 将 div 隐藏在其下,我无法将其移动到表单后显示
- arduino - ESP8266 编译后关机
- vbscript - 如何在iframe中自动单击VBScript中没有名称或ID的按钮