python - 气流谷歌云日志
问题描述
对于在 Python2.7 中运行的Apache Airflow v1.10,使用 `pip install airflow[gcp_api] 我正在尝试为 Google Cloud 设置日志记录。我有以下 log_config py 文件:
GCS_LOG_FOLDER = 'gs://GCSbucket/'
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'LOG_FORMAT')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')
FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')
PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')
# Storage bucket url for remote logging
# s3 buckets should start with "s3://"
# gcs buckets should start with "gs://"
# wasb buckets should start with "wasb"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
'formatter': 'airflow',
'stream': 'sys.stdout'
},
'task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
# Add a GCSTaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
'gcs.task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': GCS_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'airflow.processor': {
'handlers': ['processor'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['gcs.task'],
'level': LOG_LEVEL,
'propagate': True,
},
'flask_appbuilder': {
'handler': ['console'],
'level': FAB_LOG_LEVEL,
'propagate': True,
}
},
'root': {
'handlers': ['console'],
'level': LOG_LEVEL,
}
}
REMOTE_HANDLERS = {
's3': {
'task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
's3_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
},
'gcs': {
'task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
},
'wasb': {
'task': {
'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
'wasb_container': 'airflow-logs',
'filename_template': FILENAME_TEMPLATE,
'delete_local_copy': False,
},
'processor': {
'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
'wasb_container': 'airflow-logs',
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
'delete_local_copy': False,
},
}
}
REMOTE_LOGGING = conf.get('core', 'remote_logging')
if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
我的airflow.cfg
设置是:
[core]
remote_logging = True
remote_base_log_folder = gs:/GCSbucket/logs
remote_log_conn_id = google_cloud_default
我得到的错误如下:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "/usr/local/lib/python2.7/logging/__init__.py", line 1676, in shutdown
h.close()
File "/usr/local/lib/python2.7/site-packages/airflow/utils/log/gcs_task_handler.py", line 73, in close
if self.closed:
AttributeError: 'GCSTaskHandler' object has no attribute 'closed'
有谁知道可能出了什么问题?正在遵循的教程是:https ://airflow.readthedocs.io/en/1.10.0/howto/write-logs.html
更新:对源代码进行了更多研究,在这里我看到 close 语句没有返回任何内容,这就是我的应用程序崩溃的原因。
https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/utils/log/gcs_task_handler.py
有人知道为什么什么都没有返回
def close(self):
if self.closed:
return
解决方案
说明可能已过时。请尝试以下链接中的说明:
https://airflow.readthedocs.io/en/latest/howto/write-logs.html#writing-logs-to-google-cloud-storage
请按照以下步骤启用 Google Cloud Storage 日志记录。
要启用此功能,airflow.cfg
必须按此示例进行配置:
[core]
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = True
remote_base_log_folder = gs://my-bucket/path/to/logs
remote_log_conn_id = MyGCSConn
- 首先安装
gcp_api
软件包,如下所示:pip install apache-airflow[gcp_api]
. - 确保已在 Airflow 中定义了 Google Cloud Platform 连接挂钩。该钩子应该对上面定义的 Google Cloud Storage 存储桶具有读写权限
remote_base_log_folder
。 - 重新启动 Airflow 网络服务器和调度程序,并触发(或等待)新的任务执行。
- 验证是否为您定义的存储桶中新执行的任务显示日志。
- 验证 Google Cloud Storage 查看器是否在 UI 中运行。拉起一个新执行的任务,并验证您是否看到如下内容:
-
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
[2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
[2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
[2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
[2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
推荐阅读
- javascript - 在父组件上测试单击事件以显示子组件
- python - 如何防止osx使用python flask 2.7
- python - 如何解决异常:Java 网关进程在 Windows 10 中发送其端口号之前已退出
- java - Linux 上的 JDialog:当 Type 为 Type.POPUP 时,组件可单击但不可见
- arrays - (Swift)条件为真时删除数组中所有元素的最简单方法是什么
- ruby-on-rails - Postgres pg gem 正在寻找错误的 openssl 版本(1.0.0)
- ios - 更改默认运行模拟器 - react-native?
- android - 如何使用其所有属性扩展小部件
- recursion - 如何理解 SML 中的这种递归?
- python-3.x - 如何为 Python3 编译 pymqe.c?