java - Google Pub/Sub 消息大约需要 5 分钟才能发布。这是为什么?
问题描述
我有一个正在发布 Cloud Pub/Sub 消息的 Cloud Function。但是,此测试消息需要大约 5 分钟才能被测试订阅发布和使用。
这是正常的时机吗?我实际上期望它会更快,让我们说更具响应性。当我的 Auth0 挂钩(注册后)触发 Cloud Function 端点时,我想在我的数据库中初始化一个用户。
最好的情况是用户初始化应该发生在用户注册后访问网站之前,因此速度有点关键。
这是功能代码(Java):
public class UserInit implements HttpFunction {
@Override
public void service(HttpRequest request, HttpResponse response) throws Exception {
String projectId = "app-platform";
String topicId = "user-init";
final TopicName topicName = TopicName.of(projectId, topicId);
final Publisher publisher = Publisher.newBuilder(topicName).build();
System.out.println("project: " + projectId);
System.out.println("topic: " + topicId);
final PubsubMessage message = PubsubMessage.newBuilder()
.putAttributes("test", "test")
.build();
final ApiFuture<String> messageIdFuture = publisher.publish(message);
ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
public void onSuccess(String messageId) {
System.out.println("published with message id: " + messageId);
}
public void onFailure(Throwable t) {
System.out.println("failed to publish: " + t);
}
}, MoreExecutors.directExecutor());
}
}
以下是相关日志:
2021-03-20T12:29:47.486387599Zuser-initlo8xln5ztgq3 Function execution started
2021-03-20T12:29:57.947Zuser-initlo8xln5ztgq3 project: app-platform
2021-03-20T12:29:57.949Zuser-initlo8xln5ztgq3 topic: user-init
2021-03-20T12:29:59.551220278Zuser-initlo8xln5ztgq3 Function execution took 12065 ms, finished with status code: 200
2021-03-20T12:35:13.145Zuser-initlo8xln5ztgq3 published with message id: 2139319306781573
第一件事是Function execution took 12065 ms
让我感到惊讶。该函数的执行时间为12 秒。恕我直言,那是相当长的一段时间。然而,真正令人讨厌的是函数执行和 Pub/Sub 回调之间的时间
2021-03-20T12:29:59 // execution
2021-03-20T12:35:13 // callback print message
这是大约 5 分钟过去了。我可以通过 Pub/Sub 监控以及我的测试订阅来确认这一点,在我预期消息已经发布后,我多次执行“拉取”。在所有情况下,我都可以确认从函数执行到发现发布/订阅消息的时间已经提到了大约 5 分钟。
所以我想知道这是预期的行为吗?我很惊讶,因为我第一次使用 Pub/Sub,或者我是对的,这里有些奇怪?
这也是我的cloudbuild.yaml
设置,因此是我的功能配置:
steps:
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
args:
- gcloud
- functions
- deploy
- user-init
- --runtime=java11
- --region=europe-west3
- --source=./user-init
- --entry-point=com.app.functions.UserInit
- --allow-unauthenticated
- --trigger-http
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
args:
- gcloud
- alpha
- functions
- add-iam-policy-binding
- user-init
- --region=europe-west3
- --member=allUsers
- --role=roles/cloudfunctions.invoker
编辑 1
我尝试在通话.wait()
后等待并发任务。publish()
相关更改如下所示:
final Publisher publisher = Publisher.newBuilder(topicName)
.setBatchingSettings(Publisher.Builder.getDefaultBatchingSettings())
.build();
final PubsubMessage message = PubsubMessage.newBuilder()
.putAttributes("test", "test")
.build();
publisher.publish(message).wait();
然而,这些错误是由于使用wait()
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:485)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1212)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at java.base/java.lang.Thread.run(Thread.java:834)
at com.google.cloud.functions.invoker.runner.Invoker$NotFoundHandler.handle(Invoker.java:392)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:500)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:547)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:270)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:135)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
解决方案
您的 Cloud Functions 真正在 12 秒内执行完毕,并回复了 HTTP 200 代码。然后 CPU 会受到限制,因为在请求处理之外,您的 Cloud Functions 不需要 CPU。
您的 Cloud Functions 允许使用的 CPU 不到 5%,非常少,而且使用这么少的 CPU 执行回调需要时间。
实际上,您在等待有效消息发布之前回复 HTTP 200。在函数末尾添加这一行
messageIdFuture.wait();
等待并发线程结束。你会发现它会更有效率!
您还需要知道Cloud Functions 的性能(以及定价)取决于内存的数量。默认情况下,您有 256Mb 的内存 -> 400Mhz 的计算处理:仅使用 400Mhz 单核 CPU 启动 Cloud Functions + JVM 确实需要时间(12 秒)。您还可以理解,其中 5% 的时间来处理回调,可能需要几分钟!
因此,增加 Cloud Functions 内存以提高 Cloud Functions 性能
推荐阅读
- mysql - 时间戳类型的Mysql数据插入问题
- angular6 - 如何通过另一个组件的按钮进行切换?
- java - Spring Security 获取自定义标头
- android-studio - 丢失 Android Studio 密钥库
- java - 如何从 NetCDF 文件中获取恒定时间和深度的单层纬度和经度?
- json - 解析 url 期间的 Http 失败
- oracle - 在 plsql 中使用表单数据上传后文件损坏
- android - 更改 android 中编辑文本的 addTextChangedListener 侦听器中的文本会出错
- android - 带有输入文本的浮动标签-标签在使用本机反应的android中触及输入的底部边框
- raspbian - 如何将节点红色日志存储在本地存储中?