首页 > 解决方案 > 使用 Dataflow 将 csv 数据从 Pub/Sub 订阅流式传输到 BigQuery

问题描述

使用 GCP 探索 ETL 过程。我在 Dataflow 中使用 Pub/Sub Subscription to BigQuery 模板。

Pub/Sub Subscription 中的消息数据为 csv 格式,如下所示

53466,06/30/2020,,特立尼达和多巴哥,2020-07-01 04:33:52,130.0,8.0,113.0

这会在加载到 BigQuery 表时出现错误。如何在模板中将 CSV 数据转换为 JSON?

标签: google-cloud-platformgoogle-bigquerygoogle-dataflowgoogle-cloud-pubsublite

解决方案


解决了 !!

在使用 pub/sub 订阅 Bigquery 模板创建作业时,单击查看选项参数。我们可以在其中设置 .js 文件路径和 UDF 函数名称。

在此处输入图像描述

这是用于转换的 JS 代码,即从 CSV 格式到 JSON 格式。

function transform(messages) {
  var values = messages.split(',');

  // Construct output and add transformations
  var obj = new Object();
  obj.SNo = values[0];
  var dateObj = values[1];
  // Date format in file is dd/mm/YYYY
  // Transform the field to Date format required for BigQuery that is YYYY-mm-dd
  obj.ObservationDate = dateObj.replace(/(\d\d)\/(\d\d)\/(\d{4})/, "$3-$1-$2");
  obj.Provision_State = values[2];
  obj.Country_Region = values[3];
  obj.Last_Update = values[4];
  obj.Confirmed = values[5];
  obj.Deaths = values[6];
  obj.Recovered = values[7];
  // add object to JSON
  var jsonString = JSON.stringify(obj);

  return jsonString;
}

推荐阅读