首页 > 解决方案 > 气流 - [Errno 21] 是一个目录:'/airflow/key/'

问题描述

我正在尝试执行以下 DAG,但气流在下面发送以下错误消息:

[Errno 21] 是一个目录:'/airflow/key/'

DAG 正在 Bitbucket 中更新。我不明白这个错误是什么意思,即使查看与我类似的错误,我也无法确定我需要在代码中更改什么

import json
import decimal
import airflow
import pymssql  
import logging
import os
import six
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks.mssql_hook import MsSqlHook
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.hooks.dbapi_hook import DbApiHook
from airflow.utils.log.logging_mixin import LoggingMixin
from tempfile import NamedTemporaryFile
from google.cloud import bigquery
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from oauth2client.client import GoogleCredentials
from copy import deepcopy
from six import iteritems
from pandas_gbq.gbq import _check_google_client_version as gbq_check_google_client_version
from pandas_gbq import read_gbq
from pandas_gbq.gbq import _test_google_api_imports as gbq_test_google_api_imports
from pandas_gbq.gbq import GbqConnector
from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
import subfolder.my_functions_google as mf


def get_data_from_bigquery():
    """query bigquery to get data to import to PSQL"""
    bq = bigquery.Client()
    query = """ \
select max(col1) as col1, max(col2) as col2 from ( \
select 0 as col1, max(col2) as col2 from table1 where field1 >  \
DATE_SUB(current_date(), INTERVAL 10 DAY) \
union all \
select max(col1) as col1, 0 as col2  from table2 where field1 > DATE_SUB(current_date(), INTERVAL 10 DAY)) as a """
    query_job = bq.query(query)
    data = query_job.result()
    rows = list(data)
    return rows

a = get_data_from_bigquery();
tab_rem = str(a[0][0])
tab_ebx_bac = str(a[0][1]+1)


nm_dag = 'dag_name' 
agendamento_dag='10 */12 * * 1-5' 
mssql_connection = 'conn_sql' 
nm_arquivo = 'folder/file_{{ ts_nodash }}.json' 
sc_arquivo = 'schemas/file.json' 
sc_tbl_bq = [
        {"name":"id","type":"INTEGER","mode":"REQUIRED"},
        {"name":"date","type":"DATE","mode":"REQUIRED"},
        {"name":"value","type":"FLOAT","mode":"REQUIRED"}    ]    
dataset_bq_tbl = 'table.bq' 
sql_query = """select a.id, a.date , a.value from table1 as a"""
nm_bucket = 'bexs-sistemas-core' 
tp_particao={'type': 'DAY', 'field': 'date'}  
wrt_disposition='WRITE_TRUNCATE' 

slack_msg = mf.task_fail_slack_alert

default_args = {
    'owner': 'just-me',
        'start_date': airflow.utils.dates.days_ago(2),
        'depends_on_past': False,
        'email': ['test@airflow.org'],
        'email_on_failure': False,
        'email_on_retry': False,
        'depends_on_past': False,
        # If a task fails, retry it once after waiting
        # at least 5 minutes
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'on_failure_callback': slack_msg
}

dag = DAG(
    dag_id=nm_dag,
    default_args=default_args,
    schedule_interval=agendamento_dag,
    dagrun_timeout=timedelta(minutes=60)
)


MsSql = mf.MsSqlToGoogleCloudStorageOperator(
    task_id='task1',
    mssql_conn_id=mssql_connection,
    google_cloud_storage_conn_id='gcp_conn',
    sql=sql_query,
    bucket=nm_bucket,
    filename=nm_arquivo,
    schema_filename=sc_arquivo, 
    dag=dag) 

import_orders_op  = MsSql

Google = mf.GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='bucket_name',
    source_objects=[nm_arquivo],
    destination_project_dataset_table=dataset_bq_tbl,
    schema_fields=sc_tbl_bq,
    source_format='NEWLINE_DELIMITED_JSON',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition=wrt_disposition,
    time_partitioning=tp_particao,
    cluster_fields=nm_cluster,
    bigquery_conn_id='gcp_conn',
    google_cloud_storage_conn_id='gcp_conn',
    dag=dag
)

json_gcs_to_bq = Google

json_gcs_to_bq.set_upstream(import_orders_op)

编辑:

追溯:

IsADirectoryError: [Errno 21] Is a directory: '/airflow/key/'
[2019-09-10 04:56:59 +0000] [11] [INFO] Handling signal: ttou
[2019-09-10 04:56:59 +0000] [5058] [INFO] Worker exiting (pid: 5058)
[2019-09-10 04:57:29 +0000] [11] [INFO] Handling signal: ttin
[2019-09-10 04:57:29 +0000] [5078] [INFO] Booting worker with pid: 5078
[2019-09-10 04:57:30,043] {__init__.py:51} INFO - Using executor KubernetesExecutor
[2019-09-10 04:57:30,276] {dagbag.py:90} INFO - Filling up the DagBag from /airflow/dags/git
[2019-09-10 04:57:30,810] {dagbag.py:205} ERROR - Failed to import: /airflow/dags/git/dag_test.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 202, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/airflow/dags/git/dag_test.py", line 50, in <module>
    a = get_data_from_bigquery();
  File "/airflow/dags/git/dag_test.py", line 38, in get_data_from_bigquery
    bq = bigquery.Client()
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 173, in __init__
    project=project, credentials=credentials, _http=_http
  File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 226, in __init__
    _ClientProjectMixin.__init__(self, project=project)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 178, in __init__
    project = self._determine_default(project)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 193, in _determine_default
    return _determine_default_project(project)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/_helpers.py", line 186, in _determine_default_project
    _, project = google.auth.default()
  File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 305, in default
    credentials, project_id = checker()
  File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 165, in _get_explicit_environ_credentials
    os.environ[environment_vars.CREDENTIALS])
  File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 91, in _load_credentials_from_file
    with io.open(filename, 'r') as file_obj:
IsADirectoryError: [Errno 21] Is a directory: '/airflow/key/'
[2019-09-10 04:57:31 +0000] [11] [INFO] Handling signal: ttou
[2019-09-10 04:57:31 +0000] [5062] [INFO] Worker exiting (pid: 5062)
[2019-09-10 04:58:02 +0000] [11] [INFO] Handling signal: ttin
[2019-09-10 04:58:02 +0000] [5082] [INFO] Booting worker with pid: 5082
[2019-09-10 04:58:02,692] {__init__.py:51} INFO - Using executor KubernetesExecutor
[2019-09-10 04:58:02,932] {dagbag.py:90} INFO - Filling up the DagBag from /airflow/dags/git
[2019-09-10 04:58:03,438] {dagbag.py:205} ERROR - Failed to import: /airflow/dags/git/dag_test.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 202, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/airflow/dags/git/dag_test.py", line 50, in <module>
    a = get_data_from_bigquery();
  File "/airflow/dags/git/dag_test.py", line 38, in get_data_from_bigquery
    bq = bigquery.Client()
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 173, in __init__
    project=project, credentials=credentials, _http=_http
  File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 226, in __init__
    _ClientProjectMixin.__init__(self, project=project)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 178, in __init__
    project = self._determine_default(project)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 193, in _determine_default
    return _determine_default_project(project)
  File "/usr/local/lib/python3.7/site-packages/google/cloud/_helpers.py", line 186, in _determine_default_project
    _, project = google.auth.default()
  File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 305, in default
    credentials, project_id = checker()
  File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 165, in _get_explicit_environ_credentials
    os.environ[environment_vars.CREDENTIALS])
  File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 91, in _load_credentials_from_file
    with io.open(filename, 'r') as file_obj:
IsADirectoryError: [Errno 21] Is a directory: '/airflow/key/'

标签: pythonairflow

解决方案


推荐阅读