python - Google Cloud PubSub python 订阅者泄漏内存
问题描述
我使用适用于 python 的 Google PubSub 客户端(https://github.com/googleapis/python-pubsub)面临内存密集型回调的严重内存泄漏。
细节:
- Ubuntu 20.04
- 蟒蛇 3.8
只有3个要求:
- 谷歌-云-pubsub==2.4.1
- matplotlib==3.4.1
- 内存分析器==0.58.0
以这个简单的“内存密集型”回调为例。如果我将它作为 python 函数运行 4 次,这就是内存图的样子:
from memory_profiler import profile
@profile
def callback():
# Memory intensive operation
x = [n for n in range(int(1e5))]
if __name__ == '__main__':
callback()
callback()
callback()
callback()
在我看来,这里的一切都很好。
现在,如果我使用与 pubsub 回调相同的“内存密集型”函数并发送 4 条消息,则会发生以下情况:
import os
from google.cloud import pubsub_v1
from memory_profiler import profile
os.environ.setdefault('GOOGLE_APPLICATION_CREDENTIALS', 'path/to/key.json')
os.environ.setdefault('GOOGLE_CLOUD_PROJECT', 'project-id')
os.environ.setdefault('MY_TOPIC_NAME', 'test_memory_leak-sub')
topic_name = 'projects/{project_id}/topics/{topic}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
topic=os.getenv('MY_TOPIC_NAME'), # Set this to something appropriate.
)
subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
sub=os.getenv('MY_TOPIC_NAME'), # Set this to something appropriate.
)
@profile
def callback(message):
# Memory intensive operation
x = [n for n in range(int(1e5))]
message.ack()
print("ack")
with pubsub_v1.SubscriberClient() as subscriber:
future = subscriber.subscribe(subscription_name, callback)
print("Starting subscriber")
try:
print("Listening...")
future.result(timeout=30)
except KeyboardInterrupt:
future.cancel()
在这里,我面临着严重的内存泄漏,这很明显是因为“巨大的”回调内存使用量(?!)
看起来订阅者正在泄漏内存,并且泄漏的内存量与底层回调内存使用量成正比,因为泄漏量基于回调内存使用量。如果回调“消耗”更多内存,泄漏会变得更糟。如果回调“消耗”更少的内存,泄漏会变得更好。内存仅在future.result
超时时释放,正是未来被取消时,所以我使用的实际解决方法是设置一个短的超时并将消息拉取部分包装在 a 中while True
,但这不是官方文档指定使用订阅者的方式并且在超时窗口中拉取 20 或 30 条消息时无法解决问题。
看起来图书馆以某种方式“缓存”了回调,但为什么呢?我怎样才能避免这种行为?
这可能是库本身的错误,但我不确定。
解决方案
推荐阅读
- c# - 记录 ASP.net 核心 Lambda 应用程序日志
- windows - Using git submodules as junctions in Windows
- apache-spark - Azure Data Lake Gen 2 存储中的 Parquet 与 Delta 格式
- python - 升级到最新的 moto 版本(1.3.15 和 1.3.16)后测试中断
- slurm - 作业未获得请求的内存
- reactjs - 未处理的拒绝(TypeError):无法读取未定义的属性“temp”
- mongodb - 在 MongoDB 上使用 max 字段查询对象
- c# - EF Core 使用参数执行 SQL
- powershell - 过滤证书获取日期结果 AddDays NotAfter
- c - 设置 TCP_NODELAY 的正确时间?