airflow - 气流没有因子任务错误而失败
问题描述
也许有人可以告诉我我在这里做错了什么。我在 Airflow 中有一个运行命令的任务,在日志中我收到此错误:
[2018-05-30 11:22:43,814] {models.py:1428} INFO - Executing <Task(PythonOperator): computer_unload_and_load> on 2018-05-30 15:22:41.595535
[2018-05-30 11:22:43,814] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'airflow run copy_kiosk_status computer_unload_and_load 2018-05-30T15:22:41.595535 --job_id 23 --raw -sd DAGS_FOLDER/copy_poc.py']
[2018-05-30 11:22:44,367] {base_task_runner.py:98} INFO - Subtask: [2018-05-30 11:22:44,367] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-05-30 11:22:44,412] {base_task_runner.py:98} INFO - Subtask: [2018-05-30 11:22:44,412] {models.py:189} INFO - Filling up the DagBag from /other/airflow/dags/copy_kiosk.py
[2018-05-30 11:22:44,570] {cli.py:374} INFO - Running on host [redacted]
[2018-05-30 11:22:46,967] {base_task_runner.py:98} INFO - Subtask: ERROR: schema "dw2" does not exist
[2018-05-30 11:22:46,969] {base_task_runner.py:98} INFO - Subtask: [2018-05-30 11:22:46,968] {python_operator.py:90} INFO - Done. Returned value was: None
这是任务:
computer_load_task = PythonOperator(
task_id='computer_unload_and_load',
python_callable=unload_and_load.unload_and_load_table,
op_args=(source_host, source_db, "public", "computer", "computer_unload_task")
dag=dag
)
这是它正在调用的函数:
def load_table(host, db, schema, table, unload_task_id=False, file_path=False, **kwargs):
""" load a csv file into a table
if no file_path is given, uses XCOM to get the file_name returned by the unload task"""
try:
if not file_path:
file_path = kwargs['ti'].xcom_pull(task_ids=unload_task_id)
load_cmd = "\copy {}.{} FROM {} WITH (FORMAT CSV, NULL '^', HEADER)".format(schema, table, file_path)
command = [ "psql",
"-U", "root",
"-h", host,
"-d", db,
"-c", load_cmd
]
subprocess.run(command)
except:
raise
显然我知道如何修复错误(我在那里有错误的架构),但我想知道为什么任务在 Airflow 中成功而不是失败?我一定遗漏了一些明显的东西。
解决方案
您的代码示例看起来不错 - 不过是一回事。provide_context=True
在您的任务中缺少此代码示例。
除此之外,我认为这与子流程处理错误有关。您可以尝试设置该属性check=True
,以便子进程在出现问题时抛出异常。
或者,您可以检查子进程check_returncode()
并在它非零时抛出您自己的异常。
以下是子进程的 Python 3 文档:https ://docs.python.org/3/library/subprocess.html
要单独尝试异常处理,最好创建一个任务,raise
如果这不起作用,则只创建一个异常。
推荐阅读
- c++ - 类模板问题
- wordpress - 无法从 Wordpress 连接到 AWS RDS
- javascript - Gulpfile 创建 2 组 CSS 文件
- python - 将 Flask 与其他 python 库一起使用
- c++ - 类方法内部链接
- amazon-web-services - 为 AWS Application Load Balancer 使用静态 ip
- c - 比较字符串和指针
- java - Spring Boot + Cloud Firestore 集成测试
- facebook - Facebook 市场数据 API?
- javascript - 如何使用 jQuery 使单击的字母滚动到其相关的 html 容器