首页 > 解决方案 > 如何让 Google Dataflow 从输入数据中写入 BigQuery 表名?

问题描述

我是 Dataflow/Beam 的新手。我正在尝试将一些数据写入 BigQuery。我希望从上一阶段引入目标表名称,一个映射条目键入“表”。但我不知道如何通过管道将此表名传递给 BigQuery。这就是我卡住的地方..有什么想法下一步该怎么做?

pipeline
// ...
//////// I guess I shouldn't output TableRow here?
.apply("ToBQRow", ParDo.of(new DoFn<Map<String, String>, TableRow>() {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {
        ////////// WHAT DO I DO WITH "table"?
        String table = c.element().get("table");
        TableRow row = new TableRow();
        // ... set some records
        c.output(row);
    }
}))
.apply(BigQueryIO.writeTableRows().to(/* ///// WHAT DO I WRITE HERE?? */)
    .withSchema(schema)
    .withWriteDisposition(
        BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
));

标签: google-bigquerygoogle-cloud-dataflowapache-beam

解决方案


您可以为此使用DynamicDestinations

作为一个例子,我创建了一些虚拟数据,我将使用最后一个单词作为键:

p.apply("Create Data", Create.of("this should go to table one",
                                 "I would like to go to table one",
                                 "please, table one",
                                 "I prefer table two",
                                 "Back to one",
                                 "My fave is one",
                                 "Rooting for two"))
.apply("Create Keys", ParDo.of(new DoFn<String, KV<String,String>>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
      String[] splitBySpaces = c.element().split(" ");
      c.output(KV.of(splitBySpaces[splitBySpaces.length - 1],c.element()));
    }
  }))

然后getDestination我们控制如何根据键将每个元素路由到不同的表并getTable构建完全限定的表名(前置前缀)。getSchema如果不同的表有不同的模式,我们可以使用。最后,我们使用以下命令控制在表中写入的内容withFormatFunction

.apply(BigQueryIO.<KV<String, String>>write()
.to(new DynamicDestinations<KV<String, String>, String>() {
    public String getDestination(ValueInSingleWindow<KV<String, String>> element) {
        return element.getValue().getKey();
    }
    public TableDestination getTable(String name) {
      String tableSpec = output + name;
        return new TableDestination(tableSpec, "Table for type " + name);
  }
    public TableSchema getSchema(String schema) {
          List<TableFieldSchema> fields = new ArrayList<>();

      fields.add(new TableFieldSchema().setName("Text").setType("STRING"));
      TableSchema ts = new TableSchema();
      ts.setFields(fields);
      return ts;
    }
})
.withFormatFunction(new SerializableFunction<KV<String, String>, TableRow>() {
    public TableRow apply(KV<String, String> row) {
    TableRow tr = new TableRow();

    tr.set("Text", row.getValue());
    return tr;
    }
 })
 .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

为了充分测试这一点,我创建了以下表格:

bq mk dynamic_key
bq mk -f dynamic_key.dynamic_one Text:STRING
bq mk -f dynamic_key.dynamic_two Text:STRING

并且,在设置$PROJECT,$BUCKET$TABLE_PREFIX(在我的情况下PROJECT_ID:dynamic_key.dynamic_)变量之后,我使用以下命令运行作业:

mvn -Pdataflow-runner compile -e exec:java \
 -Dexec.mainClass=com.dataflow.samples.DynamicTableFromKey \
      -Dexec.args="--project=$PROJECT \
      --stagingLocation=gs://$BUCKET/staging/ \
      --tempLocation=gs://$BUCKET/temp/ \
      --output=$TABLE_PREFIX \
      --runner=DataflowRunner"

我们可以验证每个元素都进入了正确的表:

$ bq query "SELECT * FROM dynamic_key.dynamic_one"
+---------------------------------+
|              Text               |
+---------------------------------+
| please, table one               |
| Back to one                     |
| My fave is one                  |
| this should go to table one     |
| I would like to go to table one |
+---------------------------------+
$ bq query "SELECT * FROM dynamic_key.dynamic_two"
+--------------------+
|        Text        |
+--------------------+
| I prefer table two |
| Rooting for two    |
+--------------------+

完整代码在这里


推荐阅读