java - 如何在 Java 中使用 Apache Beam parDo 函数读取 JSON 文件
问题描述
我是 Apache 梁的新手。根据我们的要求,我需要传递一个包含 5 到 10 个 JSON 记录的 JSON 文件作为输入,并从文件中逐行读取此 JSON 数据并存储到 BigQuery 中。谁能帮我下面的示例代码尝试使用 apache Beam 读取 JSON 数据:
PCollection<String> lines =
pipeline
.apply("ReadMyFile",
TextIO.read()
.from("C:\\Users\\Desktop\\test.json"));
if(null!=lines) {
PCollection<String> words =
lines.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String line = c.element();
}
}));
pipeline.run();
}
解决方案
假设我们在文件中有一个 json 字符串,如下所示,
{"col1":"sample-val-1", "col2":1.0}
{"col1":"sample-val-2", "col2":2.0}
{"col1":"sample-val-3", "col2":3.0}
{"col1":"sample-val-4", "col2":4.0}
{"col1":"sample-val-5", "col2":5.0}
为了通过 DataFlow/Beam 将这些值从文件存储到 BigQuery,您可能必须执行以下步骤,
定义一个 TableReference 来引用 BigQuery 表。
为您希望存储的每一列定义 TableFieldSchema。
使用 TextIO.read() 读取文件。
创建一个 DoFn 将 Json 字符串解析为 TableRow 格式。
使用 BigQueryIO 提交 TableRow 对象。
您可以参考以下有关上述步骤的代码片段,
对于 TableReference 和 TableFieldSchema 创建,
TableReference tableRef = new TableReference(); tableRef.setProjectId("project-id"); tableRef.setDatasetId("dataset-name"); tableRef.setTableId("table-name"); List<TableFieldSchema> fieldDefs = new ArrayList<>(); fieldDefs.add(new TableFieldSchema().setName("column1").setType("STRING")); fieldDefs.add(new TableFieldSchema().setName("column2").setType("FLOAT"));
对于流水线步骤,
Pipeline pipeLine = Pipeline.create(options); pipeLine .apply("ReadMyFile", TextIO.read().from("path-to-json-file")) .apply("MapToTableRow", ParDo.of(new DoFn<String, TableRow>() { @ProcessElement public void processElement(ProcessContext c) { Gson gson = new GsonBuilder().create(); HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class); TableRow row = new TableRow(); row.set("column1", parsedMap.get("col1").toString()); row.set("column2", Double.parseDouble(parsedMap.get("col2").toString())); c.output(row); } })) .apply("CommitToBQTable", BigQueryIO.writeTableRows() .to(tableRef) .withSchema(new TableSchema().setFields(fieldDefs)) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(WriteDisposition.WRITE_APPEND)); pipeLine.run();
BigQuery 表可能如下所示,
推荐阅读
- xcode - 启用不在构建选项中的位码
- go - 获取所有在golang中实现接口的结构
- yarnpkg - 如何在纱线中安装所有依赖包?
- r - 使用 Rvest 进行 Web 抓取 - 如果未找到节点,则返回 NA?
- python - Python如何从打印中删除最后一个逗号(字符串,结束=“,”)
- r - 在 R Leaflet.minicharts 中向图表添加值
- c# - 仅在从代码(c#)调用时返回未经授权的(401)
- c# - .NET 在启动时执行多个静态构造函数
- amazon-web-services - Appsync Resolver UpdateItem 忽略空参数?
- .net - 远程续租有时会失败