首页 > 解决方案 > Airflow PythonOperator 任务失败-TypeError:密钥必须是字符串

问题描述

这里的气流新手,请耐心等待。我不明白为什么这个简单的任务失败了:

def getCarJSON():
    dictCars= {'link': '/cars/acura', 'num': '1'}    
    with open('data/dictCars.json', 'w') as fp:
        json.dump(dictCars, fp)

这是以 JSON 格式存储在磁盘上的简单 dict。为什么我会得到:

损坏的 DAG:[/home/user/airflow/dags/cars.py] Traceback(最近一次调用最后):文件“/usr/local/lib/python3.8/dist-packages/airflow/models/baseoperator.py” ,第 404 行,在init validate_key(task_id) 文件“/usr/local/lib/python3.8/dist-packages/airflow/utils/helpers.py”,第 39 行,在 validate_key 中引发 TypeError(“密钥必须是a string") TypeError: 键必须是字符串

我在 DAG 文件中有常用数据:

# Set default args
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 3, 23),
    'email': ['donko@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=2)
}

schedule_interval = '30 09 * * *'

# Define DAG: Set ID and assign default args and schedule interval
dag = DAG(
    dag_id = 'get_cars',
    default_args = default_args,
    schedule_interval = schedule_interval
)


# Get cars dict
get_cars_json = PythonOperator(
    task_id=getCarJSON,
    python_callable=getCarJSON,
    dag=dag
)

我想要的只是将数据转储到驱动器上......

标签: pythonairflowdirected-acyclic-graphs

解决方案


从损坏的 dag 消息来看,在验证 tast_id 属性时,似乎引发了此错误。

在下面的代码中,您将getCarJSON函数传递给task_id,而不是任务的名称,这将是字符串类型,因此导致了这个 TypeError 问题

get_cars_json = PythonOperator(
    task_id='getCarJSON', # Name here was without quotations
    python_callable=getCarJSON,
    dag=dag
)

推荐阅读