首页 > 解决方案 > GCP pubsub:为什么快速发布 20 万条消息会导致该主题的 250 万条消息

问题描述

前提条件:

  1. 我们创建了一个空主题,只有一个拉订阅
  2. 没有服务主动订阅订阅
  3. @google/pubsub我们使用该库快速发布大约 20 万条消息

观察:

未确认的消息数

当我们使用下面的等效代码发布消息时,消息量达到了 250 万条。从日志消息中我们看到它认为它发布了 200k 条消息。

第二个小问题是我们使用下面的代码,但将调用与另一个 for 循环分块Promise.all,并且一次只给 pubsub sdk 1000 条消息。

代码:

import {PubSub} from '@google-cloud/pubsub';

const pubsub = new PubSub()
const topic = pubsub.topic("some-topic");

async function publish(message) {
    const dataBuffer = Buffer.from(JSON.stringify(data));
    return topic.publisher.publish(dataBuffer, metadata);
}

async function processThing(thing) {
    const parsed = parseThingToLotsOfThings(thing);

    return (await Promise.all(
        parsed.map(it => topic.publish(it))
    )).length
}

async function processThings(things) {
    let count = 0;

    for (const thing of things) {
        count += await processThing(thing);
    }

    console.log(`published ${count} messages`);
}

通过阅读 nodejs sdk 源代码和查看 API 参考,我不明白这是怎么回事。

我意识到这是至少一次交付的保证,但这要多一个数量级,并且在内部客户端每次发布 rpc 调用仅包含 100 条消息,所以我不明白为什么在我们的代码中对其进行批处理会改变行为。

这是 sdk 中的错误,还是我们应该在调用 sdk 之前进行批处理?

标签: google-cloud-pubsubgoogle-api-nodejs-client

解决方案


我怀疑正在发生的事情是 200K 消息的突然涌入导致客户端资源过载(可能是网络、CPU 或线程池)。结果,消息被发送到服务器,但客户端不堪重负,无法及时处理响应。结果,它最终会尝试再次发送消息,从而导致重复消息并导致客户端需要做更多的工作。

我推荐两种解决方案:

  1. 如果可能,水平缩放。将负载分散到更多的发布者上,以免个别客户不堪重负。

  2. 通过跟踪未完成期货的数量来限制可以同时未完成的发布数量。最简单的方法是使用信号量。一些 Cloud Pub/Sub 客户端库已经支持在库本身中设置这些限制,例如Java。我想这也是最终会出现在 node.js 库中的功能。


推荐阅读