首页 > 解决方案 > Apache BEAM TextIO 读取带有行号的文件

问题描述

是否可以从 TextIO.read() 获取行号?如果没有,您能否指出一个从 Java 中的 TextIO 继承的自定义 I/O 示例

我的问题是,我需要读取一个大的 CSV 文件(150GB+),并在 30 分钟内生成一个行号。

使用下面的代码FileIO并打开一个文件然后输入序列号,数据流作业无法进行缩放,它只阻塞了 1 个工作人员,在 Dataflow 上花费的时间约为 5 小时n1-standard-2

@Slf4j
public class ApplyRowIndexFn extends DoFn<FileIO.ReadableFile, KV<Integer, String>> {

    protected final PCollectionView<TransformSideInput> transformFnInput;

    public ApplyRowIndexFn(PCollectionView<TransformSideInput> transformFnInput) {
        this.transformFnInput = transformFnInput;
    }

    @ProcessElement
    public void processElement(ProcessContext context) {
        TransformSideInput input = context.sideInput(transformFnInput);
        String header = input.getHeader();
        try (Reader reader = Channels.newReader(
                FileSystems.open(context.element().getMetadata().resourceId()), "UTF-8");
             BufferedReader buffer = new BufferedReader(reader)) {
            String row;
            int counter = 1;
            while ((row = buffer.readLine()) != null) {
                if (!row.equals(header)) {
                    context.output(KV.of(counter, row));
                    counter = counter + 1;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

标签: javaapache-beam

解决方案


推荐阅读