python - 通过 Airflow DAG 的数据流作业
问题描述
我正在尝试通过 Airflow 中的 BashOperator 使用数据流运行器执行 apache 光束管道 python 文件。我知道如何将参数动态传递给 python 文件。我期待优化参数 - 避免单独发送所有参数。示例片段:
text_context.py
import sys
def run_awc_orders(*args, **kwargs):
print("all arguments -> ", args)
if __name__ == "__main__":
print("all params -> ", sys.argv)
run_awc_orders( sys.argv[1], sys.argv[2], sys.argv[3])
my_dag.py
test_DF_job = BashOperator(
task_id='test_DF_job',
provide_context=True,
bash_command="python /usr/local/airflow/dags/test_context.py {{ execution_date }} {{ next_execution_date }} {{ params.db_params.new_text }} --runner DataflowRunner --key path_to_creds_json_file --project project_name --staging_location staging_gcp_bucket_location --temp_location=temp_gcp_bucket_location --job_name test-job",
params={
'db_params': {
'new_text': 'Hello World'
}
},
dag=dag
)
因此,这就是我们可以在气流 UI 的日志中看到的内容。
[2019-09-25 06:44:44,103] {bash_operator.py:128} INFO - all params -> ['/usr/local/airflow/dags/test_context.py', '2019-09-23T00:00:00+00:00', '2019-09-24T00:00:00+00:00', '127.0.0.1']
[2019-09-25 06:44:44,103] {bash_operator.py:128} INFO - all arguments -> ('2019-09-23T00:00:00+00:00', '2019-09-24T00:00:00+00:00', '127.0.0.1')
[2019-09-25 06:44:44,106] {bash_operator.py:132} INFO - Command exited with return code 0
解决方案
我相信推荐的方法是使用Airflow'sDataflowPythonOperator
,它直接接收 Python 和 Dataflow 选项。
你会做这样的事情:
test_DF_job = DataflowPythonOperator(
py_file='/usr/local/airflow/dags/test_context.py',
py_options=[...],
dataflow_default_options={...},
dag=dag
)
推荐阅读
- arrays - 我有一个动态内存分配问题
- apache-superset - 为什么超集无法使用 PyAthena 和 rest 方案连接到 Athena 并抛出 HTTP 422“意外错误”?
- java - VSCode 无法识别 Gradle 源依赖
- serverless-framework - 如何使用无服务器框架向 cloudWatchLog 添加过滤器名称?
- python - python正则表达式搜索在路径中找不到下划线
- google-cloud-platform - GKE 中证书管理器的防火墙规则
- csv - Freemarker CSV 生成 - 带有中文文本的 CSV 会截断 csv 内容
- azure - 如何在代码中找到 Azure Function App 的部署环境
- java - 有没有办法在 Quarkus 单元测试中回滚事务?
- python - 透视 Pandas 数据框以查看是否满足条件