google-bigquery - 如何在 GCP 上执行数据非规范化?
问题描述
我正在尝试在 Bigquery 中非规范化和连接三个名为 order、order_item 和 user 的表。我已经有一个管道,用于使用 pubsub 和数据流更新这些表。现在我想创建一个管道,只要在提到的三个表中的任何一个发生更新或插入时,它就会更新一个新的非规范化表。order 和 order_item 的这些更新必须同步,并且只有在 order 发生插入时才应加入用户(以便在创建新订单时显示用户状态)。
到目前为止,我想出了两个解决方案。
- 一种是在通过数据流读取来自 pubsub 的消息时捕获每个表上的更改,然后使用非规范化表中的最新相应记录进行丰富。最后,旧记录被新记录替换。
- 另一种是查询 order 和 order_item 表以获取更新或新插入的行,然后使用 bigquery sql 将它们连接起来,从而将结果附加到非规范化表中。这项工作在 Airflow 的帮助下定期运行。
命令
id(主键) last_updated_at created_at user_id(外键)
======================
订单项
id(主键) last_updated_at created_at order_id(外键)
======================
用户
id(主键) last_updated_at created_at
我对数据流不是很熟悉,也找不到任何说明如何使用它完成工作的教程或示例(尽管有示例代码显示了如何完成 ETL 操作,但没有解决同步问题的方法)。有没有我可以研究的例子,哪种替代方案似乎更有效?
解决方案
我想以此作为开头,BigQuery 不是事务数据库,因此在事后尝试保持一致性将非常困难。在这种情况下,我建议使用 Cloud Spanner 或 Cloud SQL(有关两者之间的区别,请参阅这篇Quora 帖子)。例如,这在 Cloud Spanner 中非常简单。在事务的概念中,您可以在任何给定时间使非规范化表与其他表完全同步。
从好的方面来说,如果您对非规范化表可能与其他表不同步感到满意,那么那里有更简单的解决方案。
在这种情况下,我假设切换到另一种存储产品的成本过高,并且表可能不同步是可以的。如果您需要进行批处理或流式数据分析,Cloud Dataflow 是一款很棒的产品,但是在像您这样的用例(基于事件的处理)中使用 API 会变得很尴尬。如果您想继续使用 Dataflow,您的第一个解决方案似乎是最好的,但我实际上建议使用Cloud Functions之类的东西。
设置如下所示:
- Pub/Sub 写入数据流
- Dataflow 将更新的行写入 BigQuery
- 数据流写入包含增量的 Pub/Sub 消息(例如,将行 X 插入 order,更新 order_item 中的 Y 行)。
- 有一个在 Pub/Sub 订阅上触发的云函数,该订阅具有您指定的逻辑,可以从规范化表中读取正确的行,然后写入非规范化表。
您的云函数可能类似于以下内容(用 Javascript 编写),灵感来自here和here。:
// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
const dataset = bigquery.dataset('my-dataset');
const orders = dataset.table('orders');
const order_items = dataset.table('order_items');
const users = dataset.table('users');
const denorm = dataset.table('denormalized');
function GetOrder(order_id) {
let [order] = await orders.row(order_id);
return order;
}
function GetOrderItem(...) { ... }
function GetUser(...) { ... }
/**
* HTTP Cloud Function.
*
* @param {Object} req Cloud Function request context.
* @param {Object} res Cloud Function response context.
*/
exports.get = (req, res) => {
const method = req.params.method;
const table = req.params.table;
let query = '';
if (method === 'insert' && table === 'order') {
let order = GetOrder(req.params.order_id);
let order_item = GetOrderItem(order.id);
let user = GetUser(order.user_id);
denorm.insert({
ORDER: order.my_data,
ORDER_ITEM: order_item.my_data,
USER: user.my_data
});
} else if ( ... ) { ... }
}
推荐阅读
- akka - 出站关联没有来自远程的响应。[15000 ms] Akka 聚类错误后握手超时
- spring - jhipster v4.14.4 - 构建可执行的 WAR 文件
- pdf - 处理 PDF 文件并拆分为多个文件
- node.js - NodeJs,从远程 .jpeg 块中获取 b64
- angular - 无法读取角度 6 中未定义的推送
- node.js - 浏览器 Fetch() API 未将正文发布到后端节点服务器
- c# - 基于界面的强制输入
- python - 波动率表面的 3D 图
- elasticsearch - 如何为谷歌数据流实例分配 IP 范围?
- ruby-on-rails - ruby 中的私有类方法链接