首页 > 解决方案 > Apache Beam - 读取 JSON 和 Stream

问题描述

我正在编写 Apache 梁代码,我必须在其中读取已放置在项目文件夹中的 JSON 文件,并读取数据并将其流式传输。

这是读取 JSON 的示例代码。这是正确的做法吗?

PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

PCollection<String> lines = p.apply("ReadMyFile", TextIO.read().from("/Users/xyz/eclipse-workspace/beam-prototype/test.json"));
System.out.println("lines: " + lines);

或者我应该使用,

p.apply(FileIO.match().filepattern("/Users/xyz/eclipse-workspace/beam-prototype/test.json"))

我只需要阅读下面的 json 文件。testdata从此文件中读取完整内容,然后将其流式传输。

{
“testdata":{
“siteOwner”:”xxx”,
“siteInfo”:{
“siteID”:”id_member",
"siteplatform”:”web”,
"siteType”:”soap”,
"siteURL”:”www”,
}
}
}

上面的代码没有读取 json 文件,它正在打印

lines: ReadMyFile/Read.out [PCollection]

,你能指导我提供样品参考吗?

标签: javaapache-beam

解决方案


这是读取 JSON 的示例代码。这是正确的做法吗?

为了快速回答您的问题,是的。您的示例代码是读取包含 JSON 的文件的正确方法,其中文件的每一行都包含一个 JSON 元素。TextIO输入转换逐行读取文件,因此如果单个 JSON 元素跨越多行,则将无法解析。

第二个代码示例具有相同的效果。

上面的代码没有读取 json 文件,它正在打印

打印结果是预期的。该变量lines实际上并不包含文件中的 JSON 字符串。lines是sPCollection的一个String;它只是表示应用转换后管道的状态。可以通过应用后续转换来访问管道中的元素。可以在转换的实现中访问实际的 JSON 字符串。


推荐阅读