首页 > 解决方案 > 使用 Pub/Sub 将 .csv 文件流式传输到 Cloud Storage

问题描述

一般问题是否有人可以在可能的情况下以正确的方式指出我,使用 pub/sub 大规模将传入的流式 .csv 文件导入 BigQuery(使用数据流应用一些转换)的最佳方法是什么?...因为我考虑使用 pub/sub 处理传入 .csv 文件的多个大型原始流

例如我正在考虑的方法是:

1.incoming raw.csv 文件 > 2. pub/sub > 3. 云存储 > 4. 云函数(触发数据流) > 5. DataFlow(转换) > 5. BigQuery

让我知道这种方法是否存在大规模问题或更好的选择?

如果这是一个好方法,我如何让 pub /sub 拾取 .csv 文件 / 以及如何构建它?

谢谢

在此处输入图像描述

标签: google-cloud-dataflowgoogle-cloud-pubsub

解决方案


有几种不同的方法可以解决此问题,但您的大部分用例都可以使用 Google 提供的 Dataflow模板来解决。使用模板时,可以在 JavaScript UDF 中完成灯光转换。这使您无需维护整个管道,而只需编写传入数据所需的转换。

如果您接受许多文件作为流输入到 Cloud Pub/Sub,请记住 Cloud Pub/Sub 不保证排序,因此来自不同文件的记录可能会在输出中混合。如果您希望按原样捕获整个文件,直接上传到 GCS 会是更好的方法。

使用Cloud Pub/Sub 到 BigQueryGCS 到 BigQuery提供的模板,您可以利用简单的 UDF 将数据从 CSV 格式转换为与 BigQuery 输出表架构匹配的 JSON 格式。

例如,如果您有 CSV 记录,例如:

transactionDate,product,retailPrice,cost,paymentType
2018-01-08,Product1,99.99,79.99,Visa

您可以编写一个 UDF 将该数据转换为您的输出模式,如下所示:

function transform(line) {
  var values = line.split(',');

  // Construct output and add transformations
  var obj = new Object();
  obj.transactionDate = values[0];
  obj.product = values[1];
  obj.retailPrice = values[2];
  obj.cost = values[3];
  obj.marginPct = (obj.retailPrice - obj.cost) / obj.retailPrice;
  obj.paymentType = values[4];
  var jsonString = JSON.stringify(obj);

  return jsonString;
}

推荐阅读