python-3.x - 如何使用 python kafka 库处理与 kafka 的连接问题?
问题描述
我有一个无服务器功能,它试图向 kafka 发送一些数据。有时它可以工作,有时连接只是断开并且数据丢失。
原因是 kafka 的库没有引发异常,而是添加了错误日志。所以我不能在try:except
.
这是我在日志中经常遇到的错误:
<BrokerConnection node_id=11 host=... port=9092>: Error receiving network data closing socket
Traceback (most recent call last):
File "/var/task/kafka/conn.py", line 745, in _recv
data = self._sock.recv(SOCK_CHUNK_BYTES)
ConnectionResetError: [Errno 104] Connection reset by peer
上面提到的函数_recv
定义如下:
我仍在寻找解决方案,但在 try:except 中添加代码不起作用。
def _recv(self):
responses = []
SOCK_CHUNK_BYTES = 4096
while True:
try:
data = self._sock.recv(SOCK_CHUNK_BYTES)
# We expect socket.recv to raise an exception if there is not
# enough data to read the full bytes_to_read
# but if the socket is disconnected, we will get empty data
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
self.close(error=Errors.ConnectionError('socket disconnected'))
break
else:
responses.extend(self.receive_bytes(data))
if len(data) < SOCK_CHUNK_BYTES:
break
except SSLWantReadError:
break
except ConnectionError as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
break
log.exception('%s: Error receiving network data'
' closing socket', self)
self.close(error=Errors.ConnectionError(e))
break
except BlockingIOError:
if six.PY3:
break
raise
return responses
我希望当连接丢失时,会引发异常并且应用程序中断,但只有错误日志并且即使 kafka 部分失败,应用程序也会继续。
解决方案
推荐阅读
- mysql - 如何在原始查询 laravel 中创建表?
- java - 请求未到达控制器但仍得到 200 响应
- python - 用取自另外两列的另一个值替换 NaN
- r - 如何在单个窗口中显示多个图表
- angular - 构建 Angular 应用程序后出现“未捕获的 SyntaxError:严格模式代码可能不包含 with 语句”错误
- typescript - 模拟 Node Js 代理
- tensorflow - 训练后量化后无法加载模型
- python - 接收其类对象的python类方法
- javascript - 禁用特定输入字段的特定日期
- macos - 没有使用 rowViewForRow 创建的行对象