首页 > 解决方案 > 如何在 Java 中使用 Apache Beam parDo 函数读取 JSON 文件

问题描述

我是 Apache 梁的新手。根据我们的要求,我需要传递一个包含 5 到 10 个 JSON 记录的 JSON 文件作为输入,并从文件中逐行读取此 JSON 数据并存储到 BigQuery 中。谁能帮我下面的示例代码尝试使用 apache Beam 读取 JSON 数据:

PCollection<String> lines = 
    pipeline
      .apply("ReadMyFile", 
             TextIO.read()
                   .from("C:\\Users\\Desktop\\test.json")); 
if(null!=lines) { 
  PCollection<String> words =
     lines.apply(ParDo.of(new DoFn<String, String>() { 
        @ProcessElement
        public void processElement(ProcessContext c) { 
          String line = c.element();
        }
      })); 
  pipeline.run(); 
}

标签: javajsonapache-beam

解决方案


假设我们在文件中有一个 json 字符串,如下所示,

{"col1":"sample-val-1", "col2":1.0}
{"col1":"sample-val-2", "col2":2.0}
{"col1":"sample-val-3", "col2":3.0}
{"col1":"sample-val-4", "col2":4.0}
{"col1":"sample-val-5", "col2":5.0}

为了通过 DataFlow/Beam 将这些值从文件存储到 BigQuery,您可能必须执行以下步骤,

  • 定义一个 TableReference 来引用 BigQuery 表。

  • 为您希望存储的每一列定义 TableFieldSchema。

  • 使用 TextIO.read() 读取文件。

  • 创建一个 DoFn 将 Json 字符串解析为 TableRow 格式。

  • 使用 BigQueryIO 提交 TableRow 对象。

您可以参考以下有关上述步骤的代码片段,

  • 对于 TableReference 和 TableFieldSchema 创建,

    TableReference tableRef = new TableReference();
    tableRef.setProjectId("project-id");
    tableRef.setDatasetId("dataset-name");
    tableRef.setTableId("table-name");
    
    List<TableFieldSchema> fieldDefs = new ArrayList<>();
    fieldDefs.add(new TableFieldSchema().setName("column1").setType("STRING"));
    fieldDefs.add(new TableFieldSchema().setName("column2").setType("FLOAT"));  
    
  • 对于流水线步骤,

    Pipeline pipeLine = Pipeline.create(options);
    pipeLine
    .apply("ReadMyFile", 
            TextIO.read().from("path-to-json-file")) 
    
    .apply("MapToTableRow", ParDo.of(new DoFn<String, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext c) { 
            Gson gson = new GsonBuilder().create();
            HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class);
    
            TableRow row = new TableRow();
            row.set("column1", parsedMap.get("col1").toString());
            row.set("column2", Double.parseDouble(parsedMap.get("col2").toString()));
            c.output(row);
        }
    }))
    
    .apply("CommitToBQTable", BigQueryIO.writeTableRows()
            .to(tableRef)
            .withSchema(new TableSchema().setFields(fieldDefs))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    
    pipeLine.run(); 
    

BigQuery 表可能如下所示,

在此处输入图像描述


推荐阅读