首页 > 解决方案 > 气流 S3ToRedshiftTransfer

问题描述

我需要使用复制命令将 s3 文件复制到 redshift。我对气流有点陌生并且遇到了问题。有人可以更正以下代码。我可以这样调用 rs.execute() 吗?

Error:
    op.execute()
TypeError: execute() missing 1 required positional argument: 'context'

代码:

import os
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.operators.s3_to_redshift_operator import S3ToRedshiftTransfer

default_args = {
    'owner': 'gra',
    'depends_on_past': False,
    'start_date': datetime(2020, 12, 13),
    'email': ['ss.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'schedule_interval': '@daily',
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}



def job1():
    print('First Job to start')

def s3_redshift(**kwargs):
    rs= S3ToRedshiftTransfer(redshift_conn_id ='12as',
                            aws_conn_id='gt_read',
                            schema='test',
                            table='dept',
                            s3_bucket="gng-test",
                            s3_key="copt.csv",
                            task_id="copy_redshift"
                            #copy_options=copy_options_,
                            )
    rs.execute()

copy_redshift=PythonOperator(task_id='copy_redshift', python_callable=s3_redshift,provide_context=True, dag=dag)
app_start >> copy_redshift

标签: airflow

解决方案


我能够使用 boto3 执行从 s3 到 redshift 的复制。S3ToRedshiftTransfer 可以用来做同样的事情。

# airflow related
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
# other packages
from datetime import datetime
from datetime import timedelta
# from airflow.hooks import PostgresHook
from airflow.operators.s3_to_redshift_operator import S3ToRedshiftTransfer
#from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from airflow.operators import SimpleHttpOperator, HttpSensor,   BashOperator, EmailOperator, S3KeySensor
import boto3

default_args = {
    'owner': 'grit_delta',
    'depends_on_past': False,
    'start_date': datetime(2020, 12, 13),
    'email': ['sa.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'schedule_interval': '@daily',
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}


dag=DAG(dag_id='veritas_test',default_args=default_args,schedule_interval=timedelta(1))


def job1():
    print('First Job to start')

file_sensor = S3KeySensor(task_id = 's3_key_sensor_task',
                s3_conn_id='_read',
                poke_interval=120,
                timeout=18*60*60,
                bucket_key = "data/test.*",
                bucket_name = "g-test",
                wildcard_match = True,
                dag = dag
)

app_start=PythonOperator(task_id='app_start', python_callable=job1, dag=dag)


def s3_redshift(**kwargs):
    rsd = boto3.client('redshift-data')
    deptKey='s3://airflow-dev/code/gta/dag/dept.csv'
    sqlQuery="copy test.dept  from 's3://airflow-grole' CSV ;"
    #sqlQuery="insert into test.dept values('d1221',100)"
    print(sqlQuery)
    resp = rsd.execute_statement(
        ClusterIdentifier="opes",
        Database="ee",
        DbUser="aa",
        Sql=sqlQuery
        #Sql="CREATE TABLE IF NOT EXISTS test.dept (title varchar(10), rating   int);"
            )
    print(resp)
    print(" completed")
    return "OK"


copy_redshift=PythonOperator(task_id='copy_redshift', python_callable=s3_redshift,provide_context=True, dag=dag)
file_sensor >>app_start >> copy_redshift

推荐阅读