首页 > 解决方案 > 通过 Airflow DAG 的数据流作业

问题描述

我正在尝试通过 Airflow 中的 BashOperator 使用数据流运行器执行 apache 光束管道 python 文件。我知道如何将参数动态传递给 python 文件。我期待优化参数 - 避免单独发送所有参数。示例片段:

text_context.py

import sys

def run_awc_orders(*args, **kwargs):
    print("all arguments -> ",  args)

if __name__ == "__main__":
    print("all params -> ", sys.argv)
    run_awc_orders( sys.argv[1],  sys.argv[2], sys.argv[3])

my_dag.py

test_DF_job = BashOperator(
    task_id='test_DF_job',
    provide_context=True,
    bash_command="python /usr/local/airflow/dags/test_context.py {{ execution_date }} {{ next_execution_date }} {{ params.db_params.new_text }}  --runner DataflowRunner --key path_to_creds_json_file --project project_name --staging_location staging_gcp_bucket_location --temp_location=temp_gcp_bucket_location --job_name test-job",
    params={
              'db_params': {
                'new_text': 'Hello World'
              }
            },
    dag=dag
)

因此,这就是我们可以在气流 UI 的日志中看到的内容。

[2019-09-25 06:44:44,103] {bash_operator.py:128} INFO - all params ->  ['/usr/local/airflow/dags/test_context.py', '2019-09-23T00:00:00+00:00', '2019-09-24T00:00:00+00:00', '127.0.0.1']
[2019-09-25 06:44:44,103] {bash_operator.py:128} INFO - all arguments ->  ('2019-09-23T00:00:00+00:00', '2019-09-24T00:00:00+00:00', '127.0.0.1')
[2019-09-25 06:44:44,106] {bash_operator.py:132} INFO - Command exited with return code 0

标签: pythongoogle-cloud-dataflowairflowapache-beam

解决方案


我相信推荐的方法是使用Airflow'sDataflowPythonOperator,它直接接收 Python 和 Dataflow 选项。

你会做这样的事情:

test_DF_job = DataflowPythonOperator(
  py_file='/usr/local/airflow/dags/test_context.py',
  py_options=[...],
  dataflow_default_options={...},
  dag=dag
)

推荐阅读