python-3.x - 当 PythonOperator 出现错误“Negsignal.SIGKILL”时,气流 DAG 失败
问题描述
我在 Cloud Composer v1.16.16 上运行 Airflowv1.10.15。
我的 DAG 看起来像这样:
from datetime import datetime, timedelta
# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large
default_args = {
'owner': 'xxxx',
'depends_on_past': False,
'start_date': datetime(2021, 9, 14),
'email_on_failure': True,
'email': ['xxxx'],
'retries': 1,
'retry_delay': timedelta(minutes=2),
'catchup': False
}
# Define the DAG with parameters
dag = DAG(
dag_id='xxxx_v1',
schedule_interval='0 20 * * *',
default_args=default_args,
catchup=False,
max_active_runs=1,
concurrency=1
)
def wd_to_bq(key, val, **kwargs):
logger.info("workday to BQ ingestion")
workday_extract.fetch_wd_load_bq(key, val)
start_load = DummyOperator(task_id='start', dag=dag)
end_load = DummyOperator(task_id='end', dag=dag)
for key, val in workday_config_large.endpoint_tbl_mapping.items():
# Task 1: Process the unmatched records from the view
workday_to_bq = PythonOperator(
dag=dag,
task_id=f'{key}',
execution_timeout=timedelta(minutes=60),
provide_context=True,
python_callable=wd_to_bq,
op_kwargs={'key': key, 'val': val}
)
start_load >> workday_to_bq >> end_load
任务失败并出现错误 -任务退出并返回代码 Negsignal.SIGKILL。python 脚本在我的本地机器上运行良好,并在 15 分钟内完成。有多个从中提取报告的端点。但是,花费时间最长(约 15 分钟)的那个会因此错误而失败,而其他的会成功。
我尝试了很多选择,但似乎都没有。有人可以帮忙吗?
解决方案
推荐阅读
- android - 使用 Firestore 时避免相同的聊天
- javascript - 如何使 jQuery 构造函数属性全局可见
- asp.net-core - 在不同的服务器上设置 Web.MVC 和主机站点
- cordova - Ionic 应用程序在 android 上运行,但在 iOS 上出现多个插件的 plugin_not_installed 错误
- c# - 如何解决 SharpZipBaseException '标头校验和非法'?
- google-apps-script - 根据谷歌表格信息创建个性化的谷歌表单
- javascript - 这些异步/等待代码之间有区别吗?
- html - 固定表头没有和列对齐
- python - Flake8 抱怨关于内置函数的“未定义变量”
- java - 无法应用插件 [id 'forge']