docker - 气流日期错误 dag.normalize_schedule TypeError
问题描述
我遇到了一个 apache-airflow 日期时间问题,如下所示
Process DagFileProcessor238215-Process:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 388, in helper
pickle_dags)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1832, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1422, in _process_dags
dag_run = self.create_dag_run(dag)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 856, in create_dag_run
next_run_date = dag.normalize_schedule(min(task_start_dates))
TypeError: '<' not supported between instances of 'str' and 'datetime.datetime'
我在docker中使用apache-airflow,它来自zhongjiajie/docker - airflow ,基于puckel/docker-airflow。
我的 DAG 是这样定义的
from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from udf.udf_hive_operator import HiveOperator
from airflow.operators.hive_to_mysql import HiveToMySqlTransfer
from udf.udf_hive_to_oracle import HiveToOracleTransfer
from udf.utils.date_utils import gen_history_date_para, today_belong_business_day
from datetime import datetime, timedelta
TMPL_SQL_PATH = Variable.get("sql_path")
HIVE_DB = "default"
NOSTRICT_HIVE_PARTITION_MODE = "set hive.exec.dynamic.partition.mode=nonstrict;\n"
default_args = {
"owner": "xx_monitor",
"description": "workflow for xx monitor system",
"depends_on_past": False,
"start_date": datetime(2014, 1, 1),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
# "queue": "bash_queue",
# "pool": "backfill",
# "priority_weight": 10,
# "end_date": datetime(2016, 1, 1),
}
dag = DAG(
dag_id="drug_monitor",
default_args=default_args,
schedule_interval="0 18 * * *",
template_searchpath=TMPL_SQL_PATH
)
该udf
模块是我的用户定义功能
但是奇怪的事情发生了
- 我去
webserver UI
转动 dag ,它仍然失败,我在上面ON
看到错误消息schedule
- 我
backfill
在 cli as 中使用airflow backfill -s 20140101 -e 20180101 <DAG_ID>
,然后转到schedule
错误消息消失,所有任务开始计划或排队
我尝试了几种方法来解决这个问题,但失败了。
- 尝试设置
start_date
为Object 但失败,default_args
例如airflow.utils.dates.days_ago
days_ago(2018, 9, 5)
- 尝试设置
start_date
为Object 但失败,default_args
例如airflow.utils.timezone.datetime
datetime(2018, 9, 5)
- 尝试设置
schedule_interval
为DAG-runs变量,如DAG
但失败@daily
- 尝试设置为
schedule_interval
对象但失败DAG
datetime.timedelta
每个人都遇到过这样的问题,我该如何解决?
解决方案
在我的 Dag 文件中,我使用 param 定义了一个任务start_date
,我修复了这个以重命名 param。
推荐阅读
- javascript - 您如何利用本地存储来增加投票记录?
- python - 使用 Tensorflow ODE Solver 获取梯度时出现 ZeroDivisionError?
- reactjs - 相同的 url 有时是 Cors-Origin 有时是 NS_ERROR_DOM_BAD_URI 并随机通过成功
- android - 如何修复由于 jetified-avro 导致的 Gradle 重复类错误
- linux - Docker cron计划作业未运行
- typescript - 如何使用 TypeScript 向伪经典类添加类型注释
- python - 训练测试设置中的 Scikit-learn FunctionTransformer
- html - 提交时 ASP.NET jQuery 重新加载元素(Ajax POST/GET)
- python - 如何使用样本权重进行交叉验证?
- typescript - 如何从 Admin SDK 的 Firebase 消息导入接口?