java - 如何在不提取的情况下读取保存在 Apache Beam 中的云存储中的压缩 gzip csv 文件
问题描述
我通过第三方将 GZIP 压缩的 csv 文件上传到云存储中。我正在使用 apache Beam 连续流管道来读取压缩文件元数据(文件名、完整路径)。我还有一个要求来读取这个 csv 文件的第一行和最后一行。我正在使用以下代码来获取所有被添加到云存储桶文件夹的压缩文件。
pipeline.apply("MatchFile(s)", FileIO.match()
.filepattern(zipFilePath)
.continuously(Duration.standardMinutes(1), Watch.Growth.never()))
.apply(Window.<~>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Get Compressed File(s)", ParDo.of(new GetCompressedFile()));
static class GetCompressedFile extends DoFn<MatchResult.Metadata, Void> {
@ProcessElement
public void processElement(ProcessContext context) throws ParseException {
ResourceId inputFile = context.element().resourceId();
String fileName = inputFile.getFilename();
String currentDirectoryPath = inputFile.getCurrentDirectory().toString();
我能够获得压缩文件名和路径,但我无法在不提取 csv 文件的情况下读取它。我尝试了一些谷歌答案来读取压缩文件,但这不是从云存储中读取的。
解决方案
我可以使用以下代码读取压缩文件,而无需提取它。也许它会帮助某人。
pipeline.apply("MatchFile(s)", FileIO.match()
.filepattern(zipFilePath)
.continuously(Duration.standardMinutes(1), Watch.Growth.never()))
.apply(Window.<MatchResult.Metadata>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(FileIO.readMatches().withCompression(GZIP))
.apply("Read Files",ParDo.of(new ReadFilesGZIP()));
pipeline.run();
}
static class ReadsFilesGZIP extends DoFn<FileIO.ReadableFile,String>{
@ProcessElement
public void processElement(ProcessContext context){
FileIO.ReadableFile file = context.element();
ReadableByteChannel readableByteChannel = file.getCompression().readDecompressed(FileSystems.open(file.getMetadata().resourceId()));
try (BufferedReader r = new BufferedReader(new InputStreamReader(Channels.newInputStream(readableByteChannel)))) {
String line;
Stream<String> fileLines = r.lines();
}
推荐阅读
- javascript - 如果添加已编辑的对象,则反应不渲染
- mysql - 如何根据其他 2 列选择 1 列的最大值和最小值?
- javascript - Android应用程序中的React Native Production错误
- c# - Azure Speech To Text - 从口语中提取音素
- excel - VBA 从一个驱动器中删除文件
- bash - 在 macos 上找不到 Bash 命令
- flutter - 如何让 GestureDetector 在 Flutter Web 中检测两指拖动?
- python - 用于连接到 s3 客户端的 aws 配置文件
- php - 为什么我在教程代码中收到解析错误
- sql - 将 oracle sql 密集秩函数转换为 Bigquery