google-bigquery - GCP - 从 PubSub 到 BigQuery 的消息
问题描述
我需要从我的 pubsub 消息中获取数据并插入到 bigquery 中。
我有的:
const topicName = "-----topic-name-----";
const data = JSON.stringify({ foo: "bar" });
// Imports the Google Cloud client library
const { PubSub } = require("@google-cloud/pubsub");
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishMessageWithCustomAttributes() {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Add two custom attributes, origin and username, to the message
const customAttributes = {
origin: "nodejs-sample",
username: "gcp",
};
const messageId = await pubSubClient
.topic(topicName)
.publish(dataBuffer, customAttributes);
console.log(`Message ${messageId} published.`);
}
publishMessageWithCustomAttributes().catch(console.error);
我需要从此消息中获取数据/属性并在 BigQuery 中查询,有人可以帮助我吗?
提前谢谢!
解决方案
事实上,有两种消费消息的解决方案:每条消息一条消息,或者批量。
首先,在详细介绍之前,由于您将执行 BigQuery 调用(或 Facebook API 调用),您将花费大量处理时间来等待 API 响应。
- 每条消息的消息 如果您有可接受的消息量,则可以对每条消息执行一条消息处理。您在这里有 2 个解决方案:
- 您可以使用 Cloud Functions 处理每条消息。为函数设置最小内存量 (128Mb) 以限制 CPU 成本,从而限制全局成本。确实,因为你会等待很多,所以不要花费昂贵的 CPU 成本无所事事!好的,当数据存在时,您将慢慢处理数据,但这是一种权衡。
在主题上创建云函数,或推送订阅以调用HTTP 触发的云函数
- 您还可以使用 Cloud Run 同时处理请求。Cloud Run 最多可以同时处理 250 个请求(预览版),因为您会等待很多,所以它非常适合。如果您需要更多 CPU 和内存,可以将这些值增加到 4CPU 和 8Gb 内存。这是我的首选解决方案。
- 如果您能够轻松管理多 CPU 多(轻)线程开发,则可以进行批量处理。在 Go 中很容易。Node 中的并发也很容易(等待/异步),但我不知道它是支持多 CPU 还是只有单 CPU。反正原理如下
- 在 PubSub 主题上创建请求订阅
- 创建一个 Cloud Run(更适合多 CPU,但也可以与 App Engine 或 Cloud Functions 一起使用),它将监听拉取订阅一段时间(比如说 10 分钟)
- 对于提取的每条消息,都会执行一个异步过程:获取数据/属性,调用 BigQuery,确认消息
- pull connexion超时后,关闭消息监听,完成当前消息处理,优雅退出(返回200 HTTP码)
- 创建一个 Cloud Scheduler,每 10 分钟调用一次 Cloud Run 服务。将超时设置为 15 分钟并放弃重试。
- 部署 Cloud Run 服务,超时时间为 15 分钟。
此解决方案提供了更好的消息吞吐量处理(每个 Cloud Run 服务可以处理超过 250 条消息),但没有真正的优势,因为您受到 API 调用延迟的限制。
编辑 1
代码示例
// For pubsunb triggered function
exports.logMessageTopic = (message, context) => {
console.log("Message Content")
console.log(Buffer.from(message.data, 'base64').toString())
console.log("Attribute list")
for (let key in message.attributes) {
console.log(key + " -> " + message.attributes[key]);
};
};
// For push subscription
exports.logMessagePush = (req, res) => {
console.log("Message Content")
console.log(Buffer.from(req.body.message.data, 'base64').toString())
console.log("Attribute list")
for (let key in req.body.message.attributes) {
console.log(key + " -> " + req.body.message.attributes[key]);
};
};
推荐阅读
- python - 无法在 django 中创建自定义用户模型
- angular - PrimeNG ConfirmDialog 版本更改后未弹出
- reactjs - 当我在 NormalPeoplePicker 中使用 onRenderItem 时,角色样式被破坏
- c# - 我无法将频道设为私有
- java - 使用 JNA 实现 IContextMenu COM 接口
- html - Html - 显示whatsapp分享按钮内的链接文本
- reactjs - 获取选项后反应钩子未设置选择值
- html - 循环插值 SVG 精灵
- github - 在私有存储库的 GItHub 工作流中使用机密
- java - 数据绑定,MaterialCardView 应该像 Radiogroup