python - 气流 - [Errno 21] 是一个目录:'/airflow/key/'
问题描述
我正在尝试执行以下 DAG,但气流在下面发送以下错误消息:
[Errno 21] 是一个目录:'/airflow/key/'
DAG 正在 Bitbucket 中更新。我不明白这个错误是什么意思,即使查看与我类似的错误,我也无法确定我需要在代码中更改什么
import json
import decimal
import airflow
import pymssql
import logging
import os
import six
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks.mssql_hook import MsSqlHook
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.hooks.dbapi_hook import DbApiHook
from airflow.utils.log.logging_mixin import LoggingMixin
from tempfile import NamedTemporaryFile
from google.cloud import bigquery
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from oauth2client.client import GoogleCredentials
from copy import deepcopy
from six import iteritems
from pandas_gbq.gbq import _check_google_client_version as gbq_check_google_client_version
from pandas_gbq import read_gbq
from pandas_gbq.gbq import _test_google_api_imports as gbq_test_google_api_imports
from pandas_gbq.gbq import GbqConnector
from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
import subfolder.my_functions_google as mf
def get_data_from_bigquery():
"""query bigquery to get data to import to PSQL"""
bq = bigquery.Client()
query = """ \
select max(col1) as col1, max(col2) as col2 from ( \
select 0 as col1, max(col2) as col2 from table1 where field1 > \
DATE_SUB(current_date(), INTERVAL 10 DAY) \
union all \
select max(col1) as col1, 0 as col2 from table2 where field1 > DATE_SUB(current_date(), INTERVAL 10 DAY)) as a """
query_job = bq.query(query)
data = query_job.result()
rows = list(data)
return rows
a = get_data_from_bigquery();
tab_rem = str(a[0][0])
tab_ebx_bac = str(a[0][1]+1)
nm_dag = 'dag_name'
agendamento_dag='10 */12 * * 1-5'
mssql_connection = 'conn_sql'
nm_arquivo = 'folder/file_{{ ts_nodash }}.json'
sc_arquivo = 'schemas/file.json'
sc_tbl_bq = [
{"name":"id","type":"INTEGER","mode":"REQUIRED"},
{"name":"date","type":"DATE","mode":"REQUIRED"},
{"name":"value","type":"FLOAT","mode":"REQUIRED"} ]
dataset_bq_tbl = 'table.bq'
sql_query = """select a.id, a.date , a.value from table1 as a"""
nm_bucket = 'bexs-sistemas-core'
tp_particao={'type': 'DAY', 'field': 'date'}
wrt_disposition='WRITE_TRUNCATE'
slack_msg = mf.task_fail_slack_alert
default_args = {
'owner': 'just-me',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': False,
'email': ['test@airflow.org'],
'email_on_failure': False,
'email_on_retry': False,
'depends_on_past': False,
# If a task fails, retry it once after waiting
# at least 5 minutes
'retries': 1,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': slack_msg
}
dag = DAG(
dag_id=nm_dag,
default_args=default_args,
schedule_interval=agendamento_dag,
dagrun_timeout=timedelta(minutes=60)
)
MsSql = mf.MsSqlToGoogleCloudStorageOperator(
task_id='task1',
mssql_conn_id=mssql_connection,
google_cloud_storage_conn_id='gcp_conn',
sql=sql_query,
bucket=nm_bucket,
filename=nm_arquivo,
schema_filename=sc_arquivo,
dag=dag)
import_orders_op = MsSql
Google = mf.GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='bucket_name',
source_objects=[nm_arquivo],
destination_project_dataset_table=dataset_bq_tbl,
schema_fields=sc_tbl_bq,
source_format='NEWLINE_DELIMITED_JSON',
create_disposition='CREATE_IF_NEEDED',
write_disposition=wrt_disposition,
time_partitioning=tp_particao,
cluster_fields=nm_cluster,
bigquery_conn_id='gcp_conn',
google_cloud_storage_conn_id='gcp_conn',
dag=dag
)
json_gcs_to_bq = Google
json_gcs_to_bq.set_upstream(import_orders_op)
编辑:
追溯:
IsADirectoryError: [Errno 21] Is a directory: '/airflow/key/'
[2019-09-10 04:56:59 +0000] [11] [INFO] Handling signal: ttou
[2019-09-10 04:56:59 +0000] [5058] [INFO] Worker exiting (pid: 5058)
[2019-09-10 04:57:29 +0000] [11] [INFO] Handling signal: ttin
[2019-09-10 04:57:29 +0000] [5078] [INFO] Booting worker with pid: 5078
[2019-09-10 04:57:30,043] {__init__.py:51} INFO - Using executor KubernetesExecutor
[2019-09-10 04:57:30,276] {dagbag.py:90} INFO - Filling up the DagBag from /airflow/dags/git
[2019-09-10 04:57:30,810] {dagbag.py:205} ERROR - Failed to import: /airflow/dags/git/dag_test.py
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 202, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 696, in _load
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/airflow/dags/git/dag_test.py", line 50, in <module>
a = get_data_from_bigquery();
File "/airflow/dags/git/dag_test.py", line 38, in get_data_from_bigquery
bq = bigquery.Client()
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 173, in __init__
project=project, credentials=credentials, _http=_http
File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 226, in __init__
_ClientProjectMixin.__init__(self, project=project)
File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 178, in __init__
project = self._determine_default(project)
File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 193, in _determine_default
return _determine_default_project(project)
File "/usr/local/lib/python3.7/site-packages/google/cloud/_helpers.py", line 186, in _determine_default_project
_, project = google.auth.default()
File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 305, in default
credentials, project_id = checker()
File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 165, in _get_explicit_environ_credentials
os.environ[environment_vars.CREDENTIALS])
File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 91, in _load_credentials_from_file
with io.open(filename, 'r') as file_obj:
IsADirectoryError: [Errno 21] Is a directory: '/airflow/key/'
[2019-09-10 04:57:31 +0000] [11] [INFO] Handling signal: ttou
[2019-09-10 04:57:31 +0000] [5062] [INFO] Worker exiting (pid: 5062)
[2019-09-10 04:58:02 +0000] [11] [INFO] Handling signal: ttin
[2019-09-10 04:58:02 +0000] [5082] [INFO] Booting worker with pid: 5082
[2019-09-10 04:58:02,692] {__init__.py:51} INFO - Using executor KubernetesExecutor
[2019-09-10 04:58:02,932] {dagbag.py:90} INFO - Filling up the DagBag from /airflow/dags/git
[2019-09-10 04:58:03,438] {dagbag.py:205} ERROR - Failed to import: /airflow/dags/git/dag_test.py
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 202, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 696, in _load
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/airflow/dags/git/dag_test.py", line 50, in <module>
a = get_data_from_bigquery();
File "/airflow/dags/git/dag_test.py", line 38, in get_data_from_bigquery
bq = bigquery.Client()
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 173, in __init__
project=project, credentials=credentials, _http=_http
File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 226, in __init__
_ClientProjectMixin.__init__(self, project=project)
File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 178, in __init__
project = self._determine_default(project)
File "/usr/local/lib/python3.7/site-packages/google/cloud/client.py", line 193, in _determine_default
return _determine_default_project(project)
File "/usr/local/lib/python3.7/site-packages/google/cloud/_helpers.py", line 186, in _determine_default_project
_, project = google.auth.default()
File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 305, in default
credentials, project_id = checker()
File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 165, in _get_explicit_environ_credentials
os.environ[environment_vars.CREDENTIALS])
File "/usr/local/lib/python3.7/site-packages/google/auth/_default.py", line 91, in _load_credentials_from_file
with io.open(filename, 'r') as file_obj:
IsADirectoryError: [Errno 21] Is a directory: '/airflow/key/'
解决方案
推荐阅读
- r - 如何基于属性生成网络,例如人们列出他们正在参加的活动?
- maven - maven-ear-plugin 从 2 个 webModule 中创建 2 个耳朵(战争)
- python - 日期上传到 Kibana/Elastic 不起作用
- java - 沙盒 Paypal webhook 集成
- regex - 正则表达式:可以出现在两个地方之一的可选子字符串,但不能同时出现
- c# - 在 getter 中返回新结构是否会降低性能?
- css - 图片的 URL 在 CSS 中无法正常工作
- sql - 在 Redshift 中连接字符串的递归 CTE 替代方案
- excel - 使用聚合函数计算列的小计
- jakarta-ee - 测量 3 层 Java 应用程序的用户交互时间