首页 > 解决方案 > 如何防止 Python Kafka Producer 由于使用日志记录而失败

问题描述

由于某种原因,当我在使用 Kafka 生产者之前添加任何日志记录代码时,连接失败。我花了一些时间来隔离问题,因为错误日志与连接有关。

如果没有这两个日志记录行,代码就可以正常工作:

logging.basicConfig(level=logging.INFO)

这是错误的代码:

#!/usr/bin/env python
import traceback
import logging

from kafka import KafkaProducer
from kafka.errors import KafkaError


def produce_test(listen=False):

    topics = ['topic1', 'topic2']

    messages = [['key1', 'logging hell'], ['key1', 'silly defaults']]
    encoded = []

    for m in messages:
        encoded.append((str(m[0]).encode('utf-8'), str(m[1]).encode('utf-8')))

    logging.info(f"something I want to log")

    producer = KafkaProducer(
        bootstrap_servers='localhost:9092',
        max_request_size=15048576,
    )

    for t in topics:

        for m in encoded:
            # pt = producer.partitions_for(t)
            # print(f'partitions for {t}: {pt}')

            print(f"sending: {m} to topic: {t}")
            future = producer.send(topic=t, key=m[0], value=m[1])

            producer.flush()

            if listen:
                try:
                    record_metadata = future.get(timeout=10)

                    # print(f'record_metadata: {record_metadata}')
                except KafkaError:
                    # Decide what to do if produce request failed...
                    print(traceback.format_exc())
                    result = 'Fail'
                finally:
                    producer.close()


if __name__ == "__main__":

    logging.basicConfig(level=logging.INFO)
    print('in producer calzone di Napoli')
    produce_test()

这是错误日志:

in producer calzone di Napoli
INFO:root:something I want to log
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error 61. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. KafkaConnectionError: 61 ECONNREFUSED
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error 61. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. KafkaConnectionError: 61 ECONNREFUSED
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
sending: (b'key1', b'logging hell') to topic: topic1
INFO:kafka.conn:Broker version identifed as 1.0.0
INFO:kafka.conn:Set configuration api_version=(1, 0, 0) to skip auto check_version requests on startup
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error 61. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. KafkaConnectionError: 61 ECONNREFUSED
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. 
sending: (b'key1', b'silly defaults') to topic: topic1
sending: (b'key1', b'logging hell') to topic: topic2
sending: (b'key1', b'silly defaults') to topic: topic2
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. 

Process finished with exit code 0

我猜记录器会干扰 Kafka 库记录器,但我只是切换到 confluent_kafka 并绕过了这个问题。

标签: python-3.xloggingapache-kafka

解决方案


您粘贴的日志显示生产者成功发送消息。您的应用程序正在运行

当您禁用日志记录时,这两个错误也可能发生,您只是看不到它们。启用日志记录不会破坏您的应用程序。

发生这些错误是因为您将localhost其用作引导服务器。Kafka 客户端必须将该名称解析为 IP 才能连接到它。在您的机器上,它看起来localhost解析为 2 个 IP:

  • ::1在 IPv6 上:客户端尝试连接并失败
  • 127.0.0.1在 IPv4 上:客户端尝试连接并成功

推荐阅读