airflow - Apache - Airflow 1.10.1 不开始工作
问题描述
我对 Airflow 有疑问,DAG 中的第一个作业总是成功启动和结束,但第二个作业永远不会自动启动。
我尝试清除 UI 中的作业,但它没有启动,如果我想看到它正在运行,我需要删除数据库中正在运行的作业,
delete from job where state='running'
但是我没有很多处于运行状态的作业,我只有 1 个具有最新 Heartbeat 的作业 SchedulerJob,还有 16 个外部任务传感器在等待这个 DAG
该池有 150 个插槽,其中 16 个正在运行,1 个已调度。
- 我正在运行气流调度程序
- 我正在运行气流网络服务器
- 所有 DAG 在 web ui 中设置为 On
- 所有 DAG 都有一个过去的开始日期
- 我在几小时前重置了调度程序
这是气流中的代码
default_args = {
'owner': 'airgia',
'depends_on_past': False,
'retries': 2,
'start_date': datetime(2018, 12, 1, 0, 0),
'email': ['xxxx@yyyy.net'],
'email_on_failure': False,
'email_on_retry': False
}
dag = DAG('trigger_snapshot',
default_args=default_args,
dagrun_timeout= timedelta(hours=22),
schedule_interval="0 0 * * 1,2,3,4,5,7",
max_active_runs=1,
catchup=False
)
set_exec_dt = PythonOperator(
task_id='set_exec_dt',
python_callable=set_exec_dt_variable,
dag=dag,
pool='capser')
lanza_crawler = PythonOperator(
task_id='lanza_crawler',
op_kwargs={"crawler_name": crawler_name},
python_callable=start_crawler,
dag=dag,
pool='capser')
copy_as_processed = PythonOperator(
task_id='copy_as_processed',
op_kwargs={"source_bucket": Variable.get("bucket"),
"source_key": snapshot_key,
"dest_bucket": Variable.get("bucket"),
"dest_key": "{0}_processed".format(snapshot_key)},
python_callable=s3move,
dag=dag,
pool='capser')
airflow_snapshot = S3KeySensor(
task_id='airflow_snapshot',
bucket_key=snapshot_key,
wildcard_match=True,
bucket_name=Variable.get("bucket"),
timeout=8*60*60,
poke_interval=120,
dag=dag,
pool='capser')
Fin_DAG_TC = DummyOperator(
task_id='Fin_DAG_TC',
dag=dag,
pool='capser')
airflow_snapshot >> lanza_crawler >> set_exec_dt >> copy_as_processed >> Fin_DAG_TC
这就是我每天早上连接到 web ui 时看到的
[编辑]
这是调度程序的最后一个日志
在这里,我们可以看到第二份工作(lanza_crawler)的调用,但不是开始。
[2018-12-11 03:50:54,209] {{jobs.py:1109}} 信息 - 待执行的任务:
[2018-12-11 03:50:54,240] {{jobs.py:1180}} 信息 - DAG trigger_snapshot 有 0/16 正在运行和排队的任务
[2018-12-11 03:50:54,240] {{jobs.py:1218}} 信息 - 将以下任务设置为排队状态:
[2018-12-11 03:50:54,254] {{jobs.py:1301}} 信息 - 将以下任务设置为排队状态:
[2018-12-11 03:50:54,255] {{jobs.py:1343}} 信息 - 发送('trigger_snapshot', 'lanza_crawler', datetime.datetime(2018, 12, 10, 0, 0, tzinfo=) , 1) 到优先级为 4 且队列默认的执行器
[2018-12-11 03:50:54,255] {{base_executor.py:56}} 信息 - 添加到队列:气流运行 trigger_snapshot lanza_crawler 2018-12-10T00:00:00+00:00 --local --pool capser -sd /usr/local/airflow/dags/capser/trigger_snapshot.py
[2018-12-11 03:50:54,262] {{celery_executor.py:83}} 信息 - [celery] 排队('trigger_snapshot', 'lanza_crawler', datetime.datetime(2018, 12, 10, 0, 0, tzinfo=), 1) 通过 celery, queue=default
[2018-12-11 03:50:54,749] {{jobs.py:1447}} INFO - 执行程序报告 trigger_snapshot.airflow_snapshot execution_date=2018-12-10 00:00:00+00:00 成功为 try_number 1
/usr/local/airflow/dags/capser/trigger_snapshot.py 1.53s 2018-12-11T03:50:54
...
/usr/local/airflow/dags/capser/trigger_snapshot.py 6866 0.68s 1.54s 2018-12-11T03:56:50
这是工人的最后一个日志
[2018-12-11 03:50:52,718: INFO/ForkPoolWorker-11] 任务 airflow.executors.celery_executor.execute_command[9a2e1ae7-9264-47d8-85ff-cac32a542708] 在 13847.525094523095s 中成功:无
[2018-12-11 03:50:54,505: INFO/MainProcess] 收到的任务:airflow.executors.celery_executor.execute_command[9ff70fc8-45ef-4751-b274-71e242553128]
[2018-12-11 03:50:54,983] {{settings.py:174}} 信息 - setting.configure_orm():使用池设置。pool_size=5, pool_recycle=1800
[2018-12-11 03:50:55,422] {{_ _init__.py:51}} 信息 - 使用执行器 CeleryExecutor
[2018-12-11 03:50:55,611] {{models.py:271}} 信息 - 从 /usr/local/airflow/dags/capser/DAG_AURORA/DAG_AURORA.py 填充 DagBag
[2018-12-11 03:50:55,970] {{cli.py:484}} 信息-在主机 ip 上运行----* .eu-west-1.compute.internal
解决方案
在aws图形中,我们看到worker的内存占用了80%,我们决定增加worker的数量,问题就解决了。
推荐阅读
- excel - 在M语言excel中显示两个日期之间的所有日期
- python - 在单个字符串中打印熊猫行(jupyter notebook)
- android - 使用随相机位置旋转的平板电脑拍照
- c# - 如何使用直线 API 将机器人集成到网站
- vue.js - vue-router 没有得到正确的参数
- python - 如何确定滤波器功能是否适用于半幅度或半功率截止
- android - 找不到 baseLibrary.jar (com.android.databinding:baseLibrary:3.1.2)
- vba - 在所有工作表上应用 VBA 代码
- reactjs - Draft-js 编辑器只显示原生表情符号
- c# - C# 项目中的第三方 DLL 引用