首页 > 解决方案 > 用于记录的单独 loggerClass

问题描述

我在我的应用程序中使用 2 个记录器。一个用于 Azure,另一个用于主记录器。

我为两个记录器使用相同的模块。

import logging
import os
import traceback
from logging.handlers import RotatingFileHandler

import pandas as pd
from slacker import Slacker

SLACK_TOKEN = os.getenv("SLACK_TOKEN")
slack = Slacker(SLACK_TOKEN)
SLACK_MESSAGE_SIZE_LIMIT = 4000


logging.getLogger("azure").setLevel(logging.WARNING)


def _format_slack(
    message, data_df: pd.DataFrame = None, exception: Exception = None, **kwargs
):
    if data_df is not None:
        message = f"*{message}*\n{data_df.to_string()}"

    if exception is not None:
        message = f"{message}\n```{exception}```"

    return message[:SLACK_MESSAGE_SIZE_LIMIT]


class PipelineLogger(logging.Logger):
    def __init__(self, name, level=logging.NOTSET):
        return super(PipelineLogger, self).__init__(name, level)

    def post_slack(self, message, channel, **kwargs):
        if os.getenv("PYTHON_ENV") == "staging":
            try:
                print("ICIICIICIICIICIICIICI", flush=True)
                print(channel, flush=True)
                slack.chat.post_message(channel, _format_slack(message, **kwargs))
            except Exception as e:
                super(PipelineLogger, self).error(
                    f"Issue with Slack, could not post message"
                )
                super(PipelineLogger, self).error(traceback.format_exc())

    def info(self, message, channel=None, **kwargs):
        if channel is not None:
            self.post_slack(message, channel, **kwargs)

        return super(PipelineLogger, self).info(message)

    def warning(self, message, channel="#data-alert-warning", **kwargs):
        self.post_slack(message, channel, **kwargs)

        return super(PipelineLogger, self).warning(message)

    def error(self, message, channel="#data-alert-error", **kwargs):
        self.post_slack(message, channel, **kwargs)

        return super(PipelineLogger, self).error(message)

    def critical(self, message, channel="#data-alert-critical", **kwargs):
        self.post_slack(message, channel, **kwargs)

        return super(PipelineLogger, self).critical(message)


logging.setLoggerClass(PipelineLogger)
logger = logging.getLogger("main")
logger.setLevel(logging.DEBUG)

formatter = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s")
file_handler = RotatingFileHandler("log.log", "a", 1000000, 1)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
logger.addHandler(stream_handler)

PipelineLogger只想用于main记录器而不是azure记录器。我确信这很简单,但我对日志记录一点也不熟悉。

目前,使用此代码,PipelineLogger为两个记录器设置。

标签: pythonlogging

解决方案


推荐阅读