google-cloud-platform - [已解决]Pubsub 推送订阅不确认消息
问题描述
这是我的设置。
订阅 A 是一种推送订阅,可将消息发布到云运行部署。
该部署公开了一个 HTTP 端点,处理消息,将结果发布到主题 B,并对订阅 A 的 POST 请求响应 200。整个过程大约需要 1.5 秒。
因此,对于订阅 A 中的每条消息,我应该在主题 B 中得到 1 条消息。这就是我的代码的样子
我的应用启动了 Express 服务器
const express = require('express');
const bodyParser = require('body-parser');
const _ = require('lodash');
const startBrowser = require('./startBrowser');
const tab = require('./tab');
const createMessage = require('./publishMessage');
const domain = 'https://example.com';
require('dotenv').config();
const app = express();
app.use(bodyParser.json());
const port = process.env.PORT || 8080;
app.listen(port, async () => {
console.log('Listening on port', port);
});
所有魔法发生的终点
app.post('/', async (req, res) => {
// Define the success and fail functions, that respond status 200 and 500 respectively
const failed = () => res.status(500).send();
const completed = async () => {
const response = await res.status(200).send();
if (response && res.writableEnded) {
console.log('successfully responded 200');
}
};
//Process the data coming from Subscription A
let pubsubMessage = decodeBase64Json(req.body.message.data);
let parsed = await processor(pubsubMessage);
//Post the processed data to topic B
let messageId = await postParsedData(parsed);
if (messageId) {
// ACK the message once the data has been processed and posted to topic B.
completed();
} else {
console.log('Didnt get a message id');
// failed();
}
});
//define the functions that post data to Topic B
const postParsedData = async (parsed) => {
if (!_.isEmpty(parsed)) {
const topicName = 'topic-B';
const messageIdInternal = await createMessage(parsed, topicName);
};
return messageId;
} else {
console.log('Parsed is Empty');
return null;
}
};
function decodeBase64Json(data) {
return JSON.parse(Buffer.from(data, 'base64').toString());
}
执行时间大约需要 1.5 秒,我可以看到记录在 Cloud 上的成功响应每 1.5 秒运行一次。总计约 2400 条消息/小时(每个云运行实例)。
主题 B 正在以 ~2400 条消息/小时的速度接收新消息,订阅 A 的确认率为 ~200 条消息/小时,这导致消息被多次重新传递。
订阅 A 的确认截止日期为 600 秒。Cloud run 中的请求超时时间为 300 秒。
在将消息发布到主题 B 之前,甚至在解析之前,我已经尝试 ACKing 消息,但我得到了相同的结果。
编辑:添加待处理消息和已处理消息的屏幕截图。处理的消息比 ACKed 未决消息多得多。应该是 1:1
谢谢你的帮助
解决方案GCP 支持无法重现此错误。大量 Cloud Run 虚拟机不会发生这种情况。解决方案只是增加工作实例的数量
解决方案
你需要await
你的complete();
函数调用。像这样
....
if (messageId) {
// ACK the message once the data has been processed and posted to topic B.
await completed();
} else {
console.log('Didnt get a message id');
// failed();
}
推荐阅读
- python - 在 Pandas 中创建列并应用导致警告的值
- vue.js - 在vuejs中使用for循环在选择选项中显示数据?
- http-method - 关于安全 http 方法的说明
- python - 按键仅在光标移动时用作输入
- python - 在 Pandas Dataframe(python)中按时间戳对数据系列进行分组
- android - MissingResourceException:找不到捆绑包 com/ibm/icu/impl/data/icudt57b/en_EN'
- dialogflow-es - Dialogflow - webhook 执行时的 TIMEOUT_DNSLOOKUP
- vue.js - 如何解决 Module parse failed: Unexpected token (3:27) in Vue 3 project
- reactjs - 在另一个映射中映射对象反应原生
- git - Git SNI 或证书检查失败:SEC_E_WRONG_PRINCIPAL (0x80090322) - 目标主体名称不正确