google-cloud-dataflow - 使用 Pub/Sub 将 .csv 文件流式传输到 Cloud Storage
问题描述
一般问题是否有人可以在可能的情况下以正确的方式指出我,使用 pub/sub 大规模将传入的流式 .csv 文件导入 BigQuery(使用数据流应用一些转换)的最佳方法是什么?...因为我考虑使用 pub/sub 处理传入 .csv 文件的多个大型原始流
例如我正在考虑的方法是:
1.incoming raw.csv 文件 > 2. pub/sub > 3. 云存储 > 4. 云函数(触发数据流) > 5. DataFlow(转换) > 5. BigQuery
让我知道这种方法是否存在大规模问题或更好的选择?
如果这是一个好方法,我如何让 pub /sub 拾取 .csv 文件 / 以及如何构建它?
谢谢
本
解决方案
有几种不同的方法可以解决此问题,但您的大部分用例都可以使用 Google 提供的 Dataflow模板来解决。使用模板时,可以在 JavaScript UDF 中完成灯光转换。这使您无需维护整个管道,而只需编写传入数据所需的转换。
如果您接受许多文件作为流输入到 Cloud Pub/Sub,请记住 Cloud Pub/Sub 不保证排序,因此来自不同文件的记录可能会在输出中混合。如果您希望按原样捕获整个文件,直接上传到 GCS 会是更好的方法。
使用Cloud Pub/Sub 到 BigQuery或GCS 到 BigQuery提供的模板,您可以利用简单的 UDF 将数据从 CSV 格式转换为与 BigQuery 输出表架构匹配的 JSON 格式。
例如,如果您有 CSV 记录,例如:
transactionDate,product,retailPrice,cost,paymentType
2018-01-08,Product1,99.99,79.99,Visa
您可以编写一个 UDF 将该数据转换为您的输出模式,如下所示:
function transform(line) {
var values = line.split(',');
// Construct output and add transformations
var obj = new Object();
obj.transactionDate = values[0];
obj.product = values[1];
obj.retailPrice = values[2];
obj.cost = values[3];
obj.marginPct = (obj.retailPrice - obj.cost) / obj.retailPrice;
obj.paymentType = values[4];
var jsonString = JSON.stringify(obj);
return jsonString;
}
推荐阅读
- csv - 逐行读取 CSV 文件并生成带有随机延迟发送的某些行的流
- python - Pandas:查找不在另一个 DataFrame 中的 DataFrame 行
- regex - 如何使用 RegexSet 执行替换?
- python - Python 多处理比简单的 pandas 慢
- perl - 如何使用 Perl 在 LDAP 服务器上创建用户以进行 Unix 身份验证?
- r - 如何使用 dplyr 迭代计算汇总变量
- go - 传达简单测试失败
- matlab - 如何修复“索引超过数组元素数(6)”?
- python - 在非正组件中使用不同于 Cpp 中的特征库的 python 计算特征值
- google-apps-script - 如何创建 Google 表单版本,以便我们可以在不中断生产的情况下进行开发/测试?