python - Airflow PythonOperator 任务失败-TypeError:密钥必须是字符串
问题描述
这里的气流新手,请耐心等待。我不明白为什么这个简单的任务失败了:
def getCarJSON():
dictCars= {'link': '/cars/acura', 'num': '1'}
with open('data/dictCars.json', 'w') as fp:
json.dump(dictCars, fp)
这是以 JSON 格式存储在磁盘上的简单 dict。为什么我会得到:
损坏的 DAG:[/home/user/airflow/dags/cars.py] Traceback(最近一次调用最后):文件“/usr/local/lib/python3.8/dist-packages/airflow/models/baseoperator.py” ,第 404 行,在init validate_key(task_id) 文件“/usr/local/lib/python3.8/dist-packages/airflow/utils/helpers.py”,第 39 行,在 validate_key 中引发 TypeError(“密钥必须是a string") TypeError: 键必须是字符串
我在 DAG 文件中有常用数据:
# Set default args
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 3, 23),
'email': ['donko@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=2)
}
schedule_interval = '30 09 * * *'
# Define DAG: Set ID and assign default args and schedule interval
dag = DAG(
dag_id = 'get_cars',
default_args = default_args,
schedule_interval = schedule_interval
)
# Get cars dict
get_cars_json = PythonOperator(
task_id=getCarJSON,
python_callable=getCarJSON,
dag=dag
)
我想要的只是将数据转储到驱动器上......
解决方案
从损坏的 dag 消息来看,在验证 tast_id 属性时,似乎引发了此错误。
在下面的代码中,您将getCarJSON
函数传递给task_id
,而不是任务的名称,这将是字符串类型,因此导致了这个 TypeError 问题
get_cars_json = PythonOperator(
task_id='getCarJSON', # Name here was without quotations
python_callable=getCarJSON,
dag=dag
)
推荐阅读
- asterisk - 无法使用 Asterisk 发起呼叫
- mysql - 没有主键的 MySQL 更新
- python - Python CSV阅读器在列表中创建列表
- ios - 无法识别的选择器发送到实例 (UIPinchGestureRecognizer)
- macos - 左右两侧的 OSX High Sierra 码头
- python - 无法使用 cx_Freeze 编译
- ios - 如何使用 IOS 12 在 UITableViewCell 中正确添加 UICollectionView
- angular - Angular 6中的过滤方法返回空数组
- r - R Shiny-动态文件输入标签
- android-layout - ScrollView 与 RelativeLayout 中的其他内容重叠