首页 > 解决方案 > 气流找不到本地文件

问题描述

从 Airflow 使用 FileToGoogleCloudStorageOperator 时,我在运行我的 dag 时不断收到此错误:

"FileNotFoundError: [Errno 2] No such file or directory: '/Users/ramonsotogarcia/Desktop/Data/pokemon.csv"

我不明白为什么 Airflow 找不到我的本地文件。这是我的一天:

from datetime import timedelta
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.utils.dates import days_ago


#define variables

file = "pokemon.csv"
bucket = "modulo_spark_bucket"

destination_path = f"gs://{bucket}/data/{file}"
bucket = f"gs://{bucket}"
local_file = f"/Users/ramonsotogarcia/Desktop/Data/{file}"

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['sotogarcia.r@icloud.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}


my_dag = DAG(
    'fileSystem_toGCS_toBQ',
    default_args=default_args,
    description='Loads data from local file into GCS and then transfer to BQ',
    schedule_interval=None,
)


t1 = FileToGoogleCloudStorageOperator(task_id = "local_to_gcs",
                                      src = local_file, 
                                      dst = destination_path, 
                                      bucket = bucket,
                                     dag = my_dag)

t2 = GoogleCloudStorageToBigQueryOperator(task_id = "GCS_to_BQ",
        bucket = bucket, 
        source_objects = [destination_path], 
        autodetect = True,
        skip_leading_rows = 1,
        create_disposition = "CREATE_IF_NEEDED",
        destination_project_dataset_table = "neural-theory-277009.pokemon_data.pokemons",
        dag = my_dag)


#dependencies
t1 >> t2

有任何想法吗?我似乎无法弄清楚出了什么问题。

标签: pythonairflowdirected-acyclic-graphs

解决方案


只是一个猜测,但您是否尝试过在文件路径中指定驱动器,即

f"C:/Users/ramonsotogarcia/Desktop/Data/{file}"

如果您的气流和 csv 在不同的磁盘上。


推荐阅读