首页 > 解决方案 > 在 Apache Beam 中读取 CSV 文件时跳过标题

问题描述

我想从 CSV 文件中跳过标题行。截至目前,我在将标题加载到谷歌存储之前手动删除它。

下面是我的代码:

PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));        
    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
            String[] strArr = c.element().split(",");
            ClassFinance fin = new ClassFinance();
            fin.setBeneficiaryFinance(strArr[0]);
            fin.setCatlibCode(strArr[1]);
            fin.set_rNR_(Double.valueOf(strArr[2]));
            fin.set_rNCS_(Double.valueOf(strArr[3]));
            fin.set_rCtb_(Double.valueOf(strArr[4]));
            fin.set_rAC_(Double.valueOf(strArr[5]));
            c.output(fin);
        }
    }));

我已经检查了 stackoverflow 中的现有问题,但我认为它没有希望:跳过标题行 - Cloud DataFlow 是否可行?

有什么帮助吗?

编辑:我尝试过类似下面的方法并且它有效:

PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));       

    PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype

        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {  
            String[] strArr2 = c.element().split(",");
            String header = Arrays.toString(strArr2);
            ClassFinance fin = new ClassFinance();

                if(header.contains("Beneficiary"))
                System.out.println("Header");
                else {
            fin.setBeneficiaryFinance(strArr2[0].trim());
            fin.setCatlibCode(strArr2[1].trim());
            fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
            fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
            fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
            fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
            c.output(fin);
            }
        }
    }));

标签: javagoogle-cloud-platformgoogle-cloud-dataflowapache-beam

解决方案


您分享的较早的 Stack Overflow 帖子(跳过标题行 - Cloud DataFlow 是否可行?)确实包含您问题的答案。

此选项目前在 Apache Beam SDK 中不可用,尽管Apache Beam JIRA 问题跟踪器BEAM-123中有一个开放的功能请求。请注意,在撰写本文时,此功能请求仍处于开放状态且未解决,并且已经有 2 年了。但是,从这个意义上说,似乎正在做一些努力,并且该问题的最新更新是从 2018 年 2 月开始的,所以我建议您保持对该 JIRA 问题的更新,因为它最后被移动到组件中,并且它那里可能会受到更多关注。sdk-java-core

考虑到这些信息,我想说您使用的方法(在将文件上传到 GCS 之前删除标题)是您的最佳选择。我会避免手动执行此操作,因为您可以轻松编写脚本并自动化删除标头上传文件过程。


编辑:

我已经能够使用DoFn. 它可能不是最优雅的解决方案(我自己不是 Apache Beam 专家),但它确实有效,并且您可以根据自己的需要调整它。它要求您事先知道要上传的 CSV 文件的标题(因为它将按元素内容过滤),但同样,将其作为您可以根据需要进行修改的模板:

public class RemoveCSVHeader {
  // The Filter class
  static class FilterCSVHeaderFn extends DoFn<String, String> {
    String headerFilter;

    public FilterCSVHeaderFn(String headerFilter) {
      this.headerFilter = headerFilter;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
      String row = c.element();
      // Filter out elements that match the header
      if (!row.equals(this.headerFilter)) {
        c.output(row);
      }
    }
  }

  // The main class
  public static void main(String[] args) throws IOException {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline p = Pipeline.create(options);

    PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));

    String header = "col1,col2,col3,col4";

    vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
        .apply(TextIO.write().to("out"));

    p.run().waitUntilFinish();
  }
}

推荐阅读