首页 > 解决方案 > 如何在 GCP 上执行数据非规范化?

问题描述

我正在尝试在 Bigquery 中非规范化和连接三个名为 order、order_item 和 user 的表。我已经有一个管道,用于使用 pubsub 和数据流更新这些表。现在我想创建一个管道,只要在提到的三个表中的任何一个发生更新或插入时,它就会更新一个新的非规范化表。order 和 order_item 的这些更新必须同步,并且只有在 order 发生插入时才应加入用户(以便在创建新订单时显示用户状态)。

到目前为止,我想出了两个解决方案。

命令

id(主键) last_updated_at created_at user_id(外键)

======================

订单项

id(主键) last_updated_at created_at order_id(外键)

======================

用户

id(主键) last_updated_at created_at

我对数据流不是很熟悉,也找不到任何说明如何使用它完成工作的教程或示例(尽管有示例代码显示了如何完成 ETL 操作,但没有解决同步问题的方法)。有没有我可以研究的例子,哪种替代方案似乎更有效?

标签: google-bigqueryetlgoogle-cloud-dataflowapache-beam

解决方案


我想以此作为开头,BigQuery 不是事务数据库,因此在事后尝试保持一致性将非常困难。在这种情况下,我建议使用 Cloud Spanner 或 Cloud SQL(有关两者之间的区别,请参阅这篇Quora 帖子)。例如,这在 Cloud Spanner 中非常简单。在事务的概念中,您可以在任何给定时间使非规范化表与其他表完全同步。

从好的方面来说,如果您对非规范化表可能与其他表不同步感到满意,那么那里有更简单的解决方案。

在这种情况下,我假设切换到另一种存储产品的成本过高,并且表可能不同步是可以的。如果您需要进行批处理或流式数据分析,Cloud Dataflow 是一款很棒的产品,但是在像您这样的用例(基于事件的处理)中使用 API 会变得很尴尬。如果您想继续使用 Dataflow,您的第一个解决方案似乎是最好的,但我实际上建议使用Cloud Functions之类的东西。

设置如下所示:

  1. Pub/Sub 写入数据流
  2. Dataflow 将更新的行写入 BigQuery
  3. 数据流写入包含增量的 Pub/Sub 消息(例如,将行 X 插入 order,更新 order_item 中的 Y 行)。
  4. 有一个在 Pub/Sub 订阅上触发的云函数,该订阅具有您指定的逻辑,可以从规范化表中读取正确的行,然后写入非规范化表。

您的云函数可能类似于以下内容(用 Javascript 编写),灵感来自herehere。:

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
const dataset = bigquery.dataset('my-dataset');
const orders = dataset.table('orders');
const order_items = dataset.table('order_items');
const users = dataset.table('users');
const denorm = dataset.table('denormalized');

function GetOrder(order_id) {
  let [order] = await orders.row(order_id);
  return order;
}

function GetOrderItem(...) { ... }
function GetUser(...) { ... }

/**
 * HTTP Cloud Function.
 *
 * @param {Object} req Cloud Function request context.
 * @param {Object} res Cloud Function response context.
 */
exports.get = (req, res) => {
  const method = req.params.method;
  const table = req.params.table;

  let query = '';
  if (method === 'insert' && table === 'order') {
    let order = GetOrder(req.params.order_id);
    let order_item = GetOrderItem(order.id);
    let user = GetUser(order.user_id);
    denorm.insert({
      ORDER: order.my_data,
      ORDER_ITEM: order_item.my_data,
      USER: user.my_data
    });
  } else if ( ... ) { ... }
}

如何插入 如何查询


推荐阅读