java - Apache Beam - 读取 JSON 和 Stream
问题描述
我正在编写 Apache 梁代码,我必须在其中读取已放置在项目文件夹中的 JSON 文件,并读取数据并将其流式传输。
这是读取 JSON 的示例代码。这是正确的做法吗?
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply("ReadMyFile", TextIO.read().from("/Users/xyz/eclipse-workspace/beam-prototype/test.json"));
System.out.println("lines: " + lines);
或者我应该使用,
p.apply(FileIO.match().filepattern("/Users/xyz/eclipse-workspace/beam-prototype/test.json"))
我只需要阅读下面的 json 文件。testdata
从此文件中读取完整内容,然后将其流式传输。
{
“testdata":{
“siteOwner”:”xxx”,
“siteInfo”:{
“siteID”:”id_member",
"siteplatform”:”web”,
"siteType”:”soap”,
"siteURL”:”www”,
}
}
}
上面的代码没有读取 json 文件,它正在打印
lines: ReadMyFile/Read.out [PCollection]
,你能指导我提供样品参考吗?
解决方案
这是读取 JSON 的示例代码。这是正确的做法吗?
为了快速回答您的问题,是的。您的示例代码是读取包含 JSON 的文件的正确方法,其中文件的每一行都包含一个 JSON 元素。TextIO
输入转换逐行读取文件,因此如果单个 JSON 元素跨越多行,则将无法解析。
第二个代码示例具有相同的效果。
上面的代码没有读取 json 文件,它正在打印
打印结果是预期的。该变量lines
实际上并不包含文件中的 JSON 字符串。lines
是sPCollection
的一个String
;它只是表示应用转换后管道的状态。可以通过应用后续转换来访问管道中的元素。可以在转换的实现中访问实际的 JSON 字符串。
推荐阅读
- java - 使用 Map 时不可变不生成正确的代码
- html - Css - 响应式分屏布局
- java - Spring Boot - 包含 AutoWired 依赖项的类的自动配置
- javascript - 要求方法在 React 中不适用于动态内容
- android - Rx 如何根据错误类型返回 Throwable 或 Object
- php - FedEx 费率请求返回“费率暂时不可用,请稍后再试。”
- javascript - CSS 在另一个元素处于活动状态时显示元素
- json - Why Postman return unminified JSON Response?
- apache-kafka - 设置多数据中心 Kafka 集群
- javascript - Javascript:打开汉堡菜单+标题在向下滚动时隐藏并在向上滚动时显示的功能