首页 > 解决方案 > Google Cloud PubSub python 订阅者泄漏内存

问题描述

我使用适用于 python 的 Google PubSub 客户端(https://github.com/googleapis/python-pubsub)面临内存密集型回调的严重内存泄漏。

细节:

只有3个要求:

以这个简单的“内存密集型”回调为例。如果我将它作为 python 函数运行 4 次,这就是内存图的样子:

from memory_profiler import profile

@profile
def callback():
    # Memory intensive operation
    x = [n for n in range(int(1e5))]


if __name__ == '__main__':
    callback()
    callback()
    callback()
    callback()

这是 memory_profiler 绘制的图表 在此处输入图像描述

在我看来,这里的一切都很好。

现在,如果我使用与 pubsub 回调相同的“内存密集型”函数并发送 4 条消息,则会发生以下情况:

import os
from google.cloud import pubsub_v1
from memory_profiler import profile

os.environ.setdefault('GOOGLE_APPLICATION_CREDENTIALS', 'path/to/key.json')
os.environ.setdefault('GOOGLE_CLOUD_PROJECT', 'project-id')
os.environ.setdefault('MY_TOPIC_NAME', 'test_memory_leak-sub')

topic_name = 'projects/{project_id}/topics/{topic}'.format(
    project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
    topic=os.getenv('MY_TOPIC_NAME'),  # Set this to something appropriate.
)

subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
    project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
    sub=os.getenv('MY_TOPIC_NAME'),  # Set this to something appropriate.
)


@profile
def callback(message):
    # Memory intensive operation
    x = [n for n in range(int(1e5))]
    message.ack()
    print("ack")


with pubsub_v1.SubscriberClient() as subscriber:
    future = subscriber.subscribe(subscription_name, callback)
    print("Starting subscriber")
    try:
        print("Listening...")
        future.result(timeout=30)
    except KeyboardInterrupt:
        future.cancel()

在此处输入图像描述 在这里,我面临着严重的内存泄漏,这很明显是因为“巨大的”回调内存使用量(?!)

看起来订阅者正在泄漏内存,并且泄漏的内存量与底层回调内存使用量成正比,因为泄漏量基于回调内存使用量。如果回调“消耗”更多内存,泄漏会变得更糟。如果回调“消耗”更少的内存,泄漏会变得更好。内存仅在future.result超时时释放,正是未来被取消时,所以我使用的实际解决方法是设置一个短的超时并将消息拉取部分包装在 a 中while True,但这不是官方文档指定使用订阅者的方式并且在超时窗口中拉取 20 或 30 条消息时无法解决问题。

看起来图书馆以某种方式“缓存”了回调,但为什么呢?我怎样才能避免这种行为?

这可能是库本身的错误,但我不确定。

标签: pythongoogle-cloud-platformmemory-leaksgoogle-cloud-pubsub

解决方案


推荐阅读