首页 > 解决方案 > Google Pub/Sub 消息大约需要 5 分钟才能发布。这是为什么?

问题描述

我有一个正在发布 Cloud Pub/Sub 消息的 Cloud Function。但是,此测试消息需要大约 5 分钟才能被测试订阅发布和使用。

这是正常的时机吗?我实际上期望它会更快,让我们说更具响应性。当我的 Auth0 挂钩(注册后)触发 Cloud Function 端点时,我想在我的数据库中初始化一个用户。

最好的情况是用户初始化应该发生在用户注册后访问网站之前,因此速度有点关键。

这是功能代码(Java):

public class UserInit implements HttpFunction {
  @Override
  public void service(HttpRequest request, HttpResponse response) throws Exception {
    String projectId = "app-platform";
    String topicId = "user-init";
    final TopicName topicName = TopicName.of(projectId, topicId);
    final Publisher publisher = Publisher.newBuilder(topicName).build();
    System.out.println("project: " + projectId);
    System.out.println("topic: " + topicId);
    final PubsubMessage message = PubsubMessage.newBuilder()
        .putAttributes("test", "test")
        .build();
    final ApiFuture<String> messageIdFuture = publisher.publish(message);
    ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
      public void onSuccess(String messageId) {
        System.out.println("published with message id: " + messageId);
      }

      public void onFailure(Throwable t) {
        System.out.println("failed to publish: " + t);
      }
    }, MoreExecutors.directExecutor());
  }
}

以下是相关日志:

2021-03-20T12:29:47.486387599Zuser-initlo8xln5ztgq3 Function execution started
2021-03-20T12:29:57.947Zuser-initlo8xln5ztgq3 project: app-platform
2021-03-20T12:29:57.949Zuser-initlo8xln5ztgq3 topic: user-init
2021-03-20T12:29:59.551220278Zuser-initlo8xln5ztgq3 Function execution took 12065 ms, finished with status code: 200
2021-03-20T12:35:13.145Zuser-initlo8xln5ztgq3 published with message id: 2139319306781573

第一件事是Function execution took 12065 ms让我感到惊讶。该函数的执行时间为12 秒。恕我直言,那是相当长的一段时间。然而,真正令人讨厌的是函数执行和 Pub/Sub 回调之间的时间

2021-03-20T12:29:59 // execution
2021-03-20T12:35:13 // callback print message

这是大约 5 分钟过去了。我可以通过 Pub/Sub 监控以及我的测试订阅来确认这一点,在我预期消息已经发布后,我多次执行“拉取”。在所有情况下,我都可以确认从函数执行到发现发布/订阅消息的时间已经提到了大约 5 分钟。

所以我想知道这是预期的行为吗?我很惊讶,因为我第一次使用 Pub/Sub,或者我是对的,这里有些奇怪?

这也是我的cloudbuild.yaml设置,因此是我的功能配置:

steps:
  - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
    args:
      - gcloud
      - functions
      - deploy
      - user-init
      - --runtime=java11
      - --region=europe-west3
      - --source=./user-init
      - --entry-point=com.app.functions.UserInit
      - --allow-unauthenticated
      - --trigger-http
  - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
    args:
      - gcloud
      - alpha
      - functions
      - add-iam-policy-binding
      - user-init
      - --region=europe-west3
      - --member=allUsers
      - --role=roles/cloudfunctions.invoker

编辑 1

我尝试在通话.wait()后等待并发任务。publish()

相关更改如下所示:

    final Publisher publisher = Publisher.newBuilder(topicName)
        .setBatchingSettings(Publisher.Builder.getDefaultBatchingSettings())
        .build();
    final PubsubMessage message = PubsubMessage.newBuilder()
        .putAttributes("test", "test")
        .build();
    publisher.publish(message).wait();

然而,这些错误是由于使用wait()

at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:485)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1212)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at java.base/java.lang.Thread.run(Thread.java:834)
at com.google.cloud.functions.invoker.runner.Invoker$NotFoundHandler.handle(Invoker.java:392)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:500)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:547)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:270)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)

标签: javagoogle-cloud-functionsgoogle-cloud-pubsub

解决方案


您的 Cloud Functions 真正在 12 秒内执行完毕,并回复了 HTTP 200 代码。然后 CPU 会受到限制,因为在请求处理之外,您的 Cloud Functions 不需要 CPU。

您的 Cloud Functions 允许使用的 CPU 不到 5%,非常少,而且使用这么少的 CPU 执行回调需要时间。

实际上,您在等待有效消息发布之前回复 HTTP 200。在函数末尾添加这一行

messageIdFuture.wait();

等待并发线程结束。你会发现它会更有效率!


您还需要知道Cloud Functions 的性能(以及定价)取决于内存的数量。默认情况下,您有 256Mb 的内存 -> 400Mhz 的计算处理:仅使用 400Mhz 单核 CPU 启动 Cloud Functions + JVM 确实需要时间(12 秒)。您还可以理解,其中 5% 的时间来处理回调,可能需要几分钟!

因此,增加 Cloud Functions 内存以提高 Cloud Functions 性能


推荐阅读