首页 > 解决方案 > Google PubSub 异步速率限制未按预期工作

问题描述

我们在 prod 中使用 PubSub 并看到一个问题,即有更多 VM 处理我们期望拥有的 PubSub 消息。

我已经使用 PubSub 在一夜之间运行了简单的测试,并且似乎某些事情并不像我们对速率限制机制所期望的那样顺利。

这是测试:

  1. 使用 Pull Subscription 将一些消息发布到主题中。在实验中,大约有 2,7k 条消息(大约在晚上 9 点开始)
  2. 使用 StreamingPull 连接和 FlowControl 设置为 2 配置一个异步客户端。
  3. 通过将执行移至计时器并仅在计时器完成时确认消息来模拟每个传入消息的处理需要 5 秒。

预期结果:来自 PubSub 的消息以相同的速度被消费,每 5 秒一次收到 2 条消息。由于所有网络和处理费用,在确认消息和拉取新消息之间会有一个小的超时。

实际结果:PubSub 开始节流,或类似的东西,超时时间很长。那时没有消息到达。超时取决于订阅中未确认消息的数量。

FlowControl docs似乎并不清楚。

PubSub 订阅未确认消息计数

这是消费者(客户端)的代码:

var concurrentFlowsNumber = config.getLong(CONFIG_NUMBER_OF_THREADS);
    var flowSettings = FlowControlSettings.newBuilder()
      .setMaxOutstandingElementCount(concurrentFlowsNumber)
      .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
      .build();

    var subscriber = Subscriber.newBuilder(subscriptionName, receiver)
      .setCredentialsProvider(() -> serviceAccountCredentials)
      .setFlowControlSettings(flowSettings)
      .build();

    subscriber.addListener(
      new Subscriber.Listener() {
        @Override
        public void failed(ApiService.State from, Throwable failure) {
          logger.error(failure);
        }
      },
      MoreExecutors.directExecutor());

    var apiService = subscriber.startAsync();
    apiService.addListener(new ApiService.Listener() {
      @Override
      public void running() {
         logger.info("Pubsub started");
      }

      @Override
      public void failed(ApiService.State from, Throwable failure) {
        logger.error("Pubsub failed on step: {}", from);
      }
    }, Runnable::run);

消息处理程序是:

private static void handlePubSubMessage(PubsubMessage message, AckReplyConsumer consumer) {
    new Timer().schedule(new TimerTask() {
      @Override
      public void run() {
               consumer.ack();
      }
    }, (long) 3000 + rand.nextInt(5000));
  }

那么,有谁知道如何让客户端(许多虚拟机)使用具有并发处理限制的消息(最多 4 个并发消息)而不会因超时而中断?

Ps 这几个问题差不多,但又不一样: google pubsub flow control pubsub 动态限速 Cloud pubsub slow poll rate

标签: javakotlingoogle-cloud-platformconcurrencygoogle-cloud-pubsub

解决方案


由于您有积压工作,您可能会遇到此问题:https ://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages

您未传递的消息将在 Pub/Sub 服务和客户端库之间缓冲。如果超过 ackDeadline,消息可能会卡在单个客户端的缓冲区中,或者重新传递到同一个客户端。

您可以按照建议尝试使用同步拉动。


推荐阅读