google-cloud-platform - 无法从数据流中的 GCS 读取我的配置文本文件(列名)
问题描述
我在 GCS 中有一个源 CSV 文件(没有标题)以及标题配置 CSV 文件(仅包含列名)。我在 Bigquery 中也有静态表。我想通过使用列标题映射(配置文件)将源文件加载到静态表中。
我之前尝试过使用不同的方法(我在同一文件中维护包含标题和数据的源文件,然后尝试从源文件中拆分标题,然后使用标题列映射将这些数据插入 Bigquery。我注意到这种方法是不可能的,因为数据流将数据洗牌到多个工作节点中。所以我放弃了这种方法。
下面的代码我使用了硬编码的列名。我正在寻找从外部配置文件中读取列名的方法(我想让我的代码成为动态的)。
package com.coe.cog;
import java.io.BufferedReader;
import java.util.*;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
public class SampleTest {
private static final Logger LOG = LoggerFactory.getLogger(SampleTest.class);
public static TableReference getGCDSTableReference() {
TableReference ref = new TableReference();
ref.setProjectId("myownproject");
ref.setDatasetId("DS_Employee");
ref.setTableId("tLoad14");
return ref;
}
static class TransformToTable extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
String csvSplitBy = ",";
String lineHeader = "ID,NAME,AGE,SEX"; // Hard code column name but i want to read these header from GCS file.
String[] colmnsHeader = lineHeader.split(csvSplitBy); //Only Header array
String[] split = c.element().split(csvSplitBy); //Data section
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
row.set(colmnsHeader[i], split[i]);
}
c.output(row);
// }
}
}
public interface MyOptions extends PipelineOptions {
/*
* Param
*
*/
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
options.setTempLocation("gs://demo-bucket-data/temp");
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply("Read From Storage", TextIO.read().from("gs://demo-bucket-data/Demo/Test/SourceFile_WithOutHeader.csv"));
PCollection<TableRow> rows = lines.apply("Transform To Table",ParDo.of(new TransformToTable()));
rows.apply("Write To Table",BigQueryIO.writeTableRows().to(getGCDSTableReference())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
p.run();
}
}
源文件:
1,John,25,M
2,Smith,30,M
3,Josephine,20,F
配置文件(仅限标题):
ID,NAME,AGE,SEX
解决方案
你有几个选择:
- 使用 Dataflow/Beam
side input
将配置/头文件读入某种集合,例如 aaArrayList
。它将可供集群中的所有工作人员使用。然后,您可以使用side input
将架构动态分配给 BigQuery 表DynamicDestinations
。 - 在进入您的 Dataflow 管道之前,直接调用 GCS api 来获取您的配置/头文件,对其进行解析,然后将结果设置为您的管道。
推荐阅读
- sql - 具有相同列但不匹配记录的 SQL Server 表
- c# - 将鼠标光标移动到特定元素的问题
- ios - Nativescript iOS 6.4.2 网格布局抛出错误:未定义不是对象(评估'this.rows [measureSpec.getRowIndex()].children')
- python - 如何在 Python 中使用 Google Drive API v3 更改所有者?
- javascript - 如何在另一个组件中运行 React 函数
- assembly - 为什么反汇编代码有不同的指令大小?CPU如何知道要加载多少字节?
- php - array_key_exists - 键确实存在但它什么也不返回,完全难倒
- python - 我怎样才能得到元组Python中每个元素的总和
- ios - Azure AMS:如何获取 Sidecar WebVTT 以在 iOS 原生播放器中显示字幕/字幕?
- azure-devops - 在发布到生产之前检查代码是否在主分支上