java - Apache BEAM TextIO 读取带有行号的文件
问题描述
是否可以从 TextIO.read() 获取行号?如果没有,您能否指出一个从 Java 中的 TextIO 继承的自定义 I/O 示例
我的问题是,我需要读取一个大的 CSV 文件(150GB+),并在 30 分钟内生成一个行号。
使用下面的代码FileIO
并打开一个文件然后输入序列号,数据流作业无法进行缩放,它只阻塞了 1 个工作人员,在 Dataflow 上花费的时间约为 5 小时n1-standard-2
@Slf4j
public class ApplyRowIndexFn extends DoFn<FileIO.ReadableFile, KV<Integer, String>> {
protected final PCollectionView<TransformSideInput> transformFnInput;
public ApplyRowIndexFn(PCollectionView<TransformSideInput> transformFnInput) {
this.transformFnInput = transformFnInput;
}
@ProcessElement
public void processElement(ProcessContext context) {
TransformSideInput input = context.sideInput(transformFnInput);
String header = input.getHeader();
try (Reader reader = Channels.newReader(
FileSystems.open(context.element().getMetadata().resourceId()), "UTF-8");
BufferedReader buffer = new BufferedReader(reader)) {
String row;
int counter = 1;
while ((row = buffer.readLine()) != null) {
if (!row.equals(header)) {
context.output(KV.of(counter, row));
counter = counter + 1;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
解决方案
推荐阅读
- firebase - Firebase - 尝试在 Ionic 应用程序中使用 Facebook 登录后出现错误代码 24
- javascript - 如何使用外部 css 和图像复制整个 html 页面
- r - 在ggplot2中的轴上添加密度
- uwp - windows.services 中不存在命名空间“store”
- angular5 - 无法访问angular5中的innerhtml元素
- scons - 在 hp ux 上的 scons 上出现错误以构建简单的 helloworld 应用程序
- python-3.x - 使用单行for循环从列表中提取文件名 - python
- amazon-web-services - AWS API Gateway 的访问权限
- ios - 我是否必须始终删除 Firestore 中的侦听器?
- sql-server - 如何在sql中的pivot列中进行升序