python-3.x - 如何防止 Python Kafka Producer 由于使用日志记录而失败
问题描述
由于某种原因,当我在使用 Kafka 生产者之前添加任何日志记录代码时,连接失败。我花了一些时间来隔离问题,因为错误日志与连接有关。
如果没有这两个日志记录行,代码就可以正常工作:
logging.basicConfig(level=logging.INFO)
这是错误的代码:
#!/usr/bin/env python
import traceback
import logging
from kafka import KafkaProducer
from kafka.errors import KafkaError
def produce_test(listen=False):
topics = ['topic1', 'topic2']
messages = [['key1', 'logging hell'], ['key1', 'silly defaults']]
encoded = []
for m in messages:
encoded.append((str(m[0]).encode('utf-8'), str(m[1]).encode('utf-8')))
logging.info(f"something I want to log")
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
max_request_size=15048576,
)
for t in topics:
for m in encoded:
# pt = producer.partitions_for(t)
# print(f'partitions for {t}: {pt}')
print(f"sending: {m} to topic: {t}")
future = producer.send(topic=t, key=m[0], value=m[1])
producer.flush()
if listen:
try:
record_metadata = future.get(timeout=10)
# print(f'record_metadata: {record_metadata}')
except KafkaError:
# Decide what to do if produce request failed...
print(traceback.format_exc())
result = 'Fail'
finally:
producer.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
print('in producer calzone di Napoli')
produce_test()
这是错误日志:
in producer calzone di Napoli
INFO:root:something I want to log
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error 61. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. KafkaConnectionError: 61 ECONNREFUSED
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error 61. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. KafkaConnectionError: 61 ECONNREFUSED
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
sending: (b'key1', b'logging hell') to topic: topic1
INFO:kafka.conn:Broker version identifed as 1.0.0
INFO:kafka.conn:Set configuration api_version=(1, 0, 0) to skip auto check_version requests on startup
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]> returned error 61. Disconnecting.
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. KafkaConnectionError: 61 ECONNREFUSED
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: Closing connection.
sending: (b'key1', b'silly defaults') to topic: topic1
sending: (b'key1', b'logging hell') to topic: topic2
sending: (b'key1', b'silly defaults') to topic: topic2
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connected> [IPv4 ('127.0.0.1', 9092)]>: Closing connection.
Process finished with exit code 0
我猜记录器会干扰 Kafka 库记录器,但我只是切换到 confluent_kafka 并绕过了这个问题。
解决方案
您粘贴的日志显示生产者成功发送消息。您的应用程序正在运行!
当您禁用日志记录时,这两个错误也可能发生,您只是看不到它们。启用日志记录不会破坏您的应用程序。
发生这些错误是因为您将localhost
其用作引导服务器。Kafka 客户端必须将该名称解析为 IP 才能连接到它。在您的机器上,它看起来localhost
解析为 2 个 IP:
::1
在 IPv6 上:客户端尝试连接并失败127.0.0.1
在 IPv4 上:客户端尝试连接并成功
推荐阅读
- dotvvm - DotVVM 在运行时从 Panel 组件更改 CssStyle 属性不起作用
- javascript - 使用 React.js 和 Material-UI 简化样式化的组件媒体查询
- javascript - 为什么每次渲染后都会执行useEffect?
- sql - Snowflake 无法通过连接键识别 CAST 或 TO_VARCHAR 函数
- mysql - 插入行后右外连接不起作用
- python - SVM 模型预测不变
- machine-learning - 提供的模型不是 YellowBrick 中的聚类估计器
- php - 需要编写不简单的正则表达式
- excel - 在新工作簿中动态匹配和复制/粘贴
- macros - 具有生物格式的 imagej/fiji 宏处理