python - 在 Apache Airflow 中,有没有办法捕获 bash 命令的错误?
问题描述
在 Apache Airflow 中,是否可以捕获失败的 bash 命令产生的原始错误消息,而不是 Apache Airflow 产生的回溯错误,它告诉您该行失败但不完全是失败的原因?
Dag 中的示例行:
gsutil_rsync = BashOperator(
task_id="task1",
bash_command='gsutil rsync -r s3://bucket/ gs://bucket',
dag=dag)
解决方案
我用 python 函数编写了这个解决方案,PythonOperator
并设置xcom_push=True
在PythonOperator
.
import subprocess
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 15),
'email': 'me@airflow.com',
'email_on_failure': False,
'retries': 1,
'retry_delay': 1,
}
def run_bash():
result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)
return result.stdout
run_bash()
with DAG('bash_dag', schedule_interval="@daily", default_args=default_args) as dag:
start_brach = DummyOperator(task_id='start')
gsutil_rsync_py = PythonOperator(
task_id="task1",
python_callable=run_bash,
xcom_push=True,
dag=dag)
start_brach.set_downstream(gsutil_rsync_py)
结果是这样;
推荐阅读
- html - HTML 文件找不到引导程序 CSS 文件
- api - 通过 API 修改 Github 配置文件但出现“找不到消息”错误
- angular - Typescript获取对象数组中两列值的数组
- google-apps-script - JSON.stringify.replace 用于更新使用 Gmail 草稿作为模板的邮件合并代码
- c# - 解析json时如何将字符串添加到集合中
- node.js - 如何根据与来自服务器的 id 匹配的对象 ID 显示对象
- python - jupyter 中的 %matplotlib notebook 魔术命令只能间歇性地工作
- ruby-on-rails - Ruby on Rails Active Record Bank 数据库“统一”
- matlab - 不确定如何使用“ODE45”命令修复错误
- python - Python,Django:有没有办法在一个模板中编辑多个数据集