首页 > 解决方案 > 在 Go 中使用 Uber-Zap 记录器将指定的日志发送到 Kafka 接收器

问题描述

我正在尝试使用zap logger包来创建带有文件、控制台和 Kafka 接收器的核心。我有一些非常特定INFO级别的日志,我想将它们发送到 Kafka 主题以供下游消费者处理。但是,通过当前的实现,我INFO在 Kafka 主题中获得了所有级别的日志,甚至是我不想要的日志。

有没有办法使用一个通用的 zap 记录器对象来防止同一级别的不需要的日志不去任何一个特定的接收器?

下面是我用来创建单个记录器对象的函数。

func newZapLogger(config Configuration) (Logger, error) {
    var writer zapcore.WriteSyncer
    cores := []zapcore.Core{}

    if config.EnableFile {
        getLogLevel(config.FileLevel)
        if config.LogConfig == true {
            writer = zapcore.Lock(zapcore.AddSync(&lj.Logger{
                Filename: config.FileLocation,
                MaxSize:  config.LogMaxSize,
                Compress: config.LogCompression,
                MaxAge:   config.LogMaxAge,
            }))
        } else {
            writer = zapcore.Lock(zapcore.AddSync(&lj.Logger{
                Filename: config.FileLocation,
            }))
        }
        cores = append(cores, zapcore.NewCore(getEncoder(config.FileJSONFormat, config.IsColour), writer, atomLevel))
    }
    if config.EnableConsole {
        getLogLevel(config.ConsoleLevel)
        switch config.Stream {
        case 1:
            writer = zapcore.Lock(os.Stdout)
        case 2:
            writer = zapcore.Lock(os.Stderr)
        case 3:
            writer = zapcore.Lock(zapcore.AddSync(ioutil.Discard))
        default:
            writer = zapcore.Lock(os.Stdout)
        }
        cores = append(cores, zapcore.NewCore(getEncoder(config.ConsoleJSONFormat, config.IsColour), writer, atomLevel))
    }
if config.EnableKafka == true {
        highPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
            return lvl >= zapcore.WarnLevel
        })

        if len(brokerConn) > 0 {
            var (
                kl  LogKafka
                err error
            )
            kl.Topic = config.KafkaTopic
            config := sarama.NewConfig()
            config.Producer.RequiredAcks = sarama.WaitForAll
            config.Producer.Partitioner = sarama.NewRandomPartitioner
            config.Producer.Return.Successes = true
            config.Producer.Return.Errors = true

            kl.Producer, err = sarama.NewSyncProducer(brokerConn, config)
            if err != nil {
                return nil, fmt.Errorf("Failed to initialise kafka logger, connect to kafka failed: %v", err)
            } else {
                topicErrors := zapcore.AddSync(&kl)
                kafkaEncoder := zapcore.NewJSONEncoder(zap.NewDevelopmentEncoderConfig())
                cores = append(cores, zapcore.NewCore(kafkaEncoder, topicErrors, highPriority))
            }
        } else {
            return nil, fmt.Errorf("Failed to initialise kafka logger, no broker specified")
        }
    }

    appendedCore := zapcore.NewTee(cores...)
    logger := zap.New(appendedCore, zap.AddCaller(), zap.AddCallerSkip(1)).Sugar()
    defer logger.Sync()
    return logger, nil
}

我正在使用Sarama包来实现 Kafka 生产者。我还考虑过使用自定义日志记录级别。但是,zap v1.0 不支持它。

标签: goapache-kafkasaramago-zap

解决方案


推荐阅读