java - Google PubSub 异步速率限制未按预期工作
问题描述
我们在 prod 中使用 PubSub 并看到一个问题,即有更多 VM 处理我们期望拥有的 PubSub 消息。
我已经使用 PubSub 在一夜之间运行了简单的测试,并且似乎某些事情并不像我们对速率限制机制所期望的那样顺利。
这是测试:
- 使用 Pull Subscription 将一些消息发布到主题中。在实验中,大约有 2,7k 条消息(大约在晚上 9 点开始)
- 使用 StreamingPull 连接和 FlowControl 设置为 2 配置一个异步客户端。
- 通过将执行移至计时器并仅在计时器完成时确认消息来模拟每个传入消息的处理需要 5 秒。
预期结果:来自 PubSub 的消息以相同的速度被消费,每 5 秒一次收到 2 条消息。由于所有网络和处理费用,在确认消息和拉取新消息之间会有一个小的超时。
实际结果:PubSub 开始节流,或类似的东西,超时时间很长。那时没有消息到达。超时取决于订阅中未确认消息的数量。
FlowControl docs似乎并不清楚。
这是消费者(客户端)的代码:
var concurrentFlowsNumber = config.getLong(CONFIG_NUMBER_OF_THREADS);
var flowSettings = FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(concurrentFlowsNumber)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build();
var subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setCredentialsProvider(() -> serviceAccountCredentials)
.setFlowControlSettings(flowSettings)
.build();
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(ApiService.State from, Throwable failure) {
logger.error(failure);
}
},
MoreExecutors.directExecutor());
var apiService = subscriber.startAsync();
apiService.addListener(new ApiService.Listener() {
@Override
public void running() {
logger.info("Pubsub started");
}
@Override
public void failed(ApiService.State from, Throwable failure) {
logger.error("Pubsub failed on step: {}", from);
}
}, Runnable::run);
消息处理程序是:
private static void handlePubSubMessage(PubsubMessage message, AckReplyConsumer consumer) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
consumer.ack();
}
}, (long) 3000 + rand.nextInt(5000));
}
那么,有谁知道如何让客户端(许多虚拟机)使用具有并发处理限制的消息(最多 4 个并发消息)而不会因超时而中断?
Ps 这几个问题差不多,但又不一样: google pubsub flow control pubsub 动态限速 Cloud pubsub slow poll rate
解决方案
由于您有积压工作,您可能会遇到此问题:https ://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages
您未传递的消息将在 Pub/Sub 服务和客户端库之间缓冲。如果超过 ackDeadline,消息可能会卡在单个客户端的缓冲区中,或者重新传递到同一个客户端。
您可以按照建议尝试使用同步拉动。
推荐阅读
- python - python - 如何在python中具有不同键的两个字典之间执行方差操作?
- excel - Python与excel中的标准和输出组合
- r - 在 dplyr 的 group_by 中有条件地删除重复行
- javascript - 使用动态创建的 HTML 从 angularjs 控制器调用函数
- html - 如何将下拉列表添加到浮动按钮
- node.js - 不知道如何使用 redux 表单发布,无法将状态映射到 PostNew
- google-apps-script - 使用 GAS 登录 web 应用程序后,我无法使用 GAS 导航到网站上的其他页面 (.aspx)。我该怎么做呢?
- javascript - Perl 哈希表比较的 Javascript 等价物
- latex - 如何使用 LaTeX(背页)输入梵文?
- laravel - 避免 Blade 模板输出中的换行符