java - 在 Apache Beam 中读取 CSV 文件时跳过标题
问题描述
我想从 CSV 文件中跳过标题行。截至目前,我在将标题加载到谷歌存储之前手动删除它。
下面是我的代码:
PCollection<String> financeobj =p.apply(TextIO.read().from("gs://storage_path/Financials.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr = c.element().split(",");
ClassFinance fin = new ClassFinance();
fin.setBeneficiaryFinance(strArr[0]);
fin.setCatlibCode(strArr[1]);
fin.set_rNR_(Double.valueOf(strArr[2]));
fin.set_rNCS_(Double.valueOf(strArr[3]));
fin.set_rCtb_(Double.valueOf(strArr[4]));
fin.set_rAC_(Double.valueOf(strArr[5]));
c.output(fin);
}
}));
我已经检查了 stackoverflow 中的现有问题,但我认为它没有希望:跳过标题行 - Cloud DataFlow 是否可行?
有什么帮助吗?
编辑:我尝试过类似下面的方法并且它有效:
PCollection<String> financeobj = p.apply(TextIO.read().from("gs://google-bucket/final_input/Financials123.csv"));
PCollection<ClassFinance> pojos5 = financeobj.apply(ParDo.of(new DoFn<String, ClassFinance>() { // converting String into classtype
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String[] strArr2 = c.element().split(",");
String header = Arrays.toString(strArr2);
ClassFinance fin = new ClassFinance();
if(header.contains("Beneficiary"))
System.out.println("Header");
else {
fin.setBeneficiaryFinance(strArr2[0].trim());
fin.setCatlibCode(strArr2[1].trim());
fin.setrNR(Double.valueOf(strArr2[2].trim().replace("", "0")));
fin.setrNCS(Double.valueOf(strArr2[3].trim().replace("", "0")));
fin.setrCtb(Double.valueOf(strArr2[4].trim().replace("", "0")));
fin.setrAC(Double.valueOf(strArr2[5].trim().replace("", "0")));
c.output(fin);
}
}
}));
解决方案
您分享的较早的 Stack Overflow 帖子(跳过标题行 - Cloud DataFlow 是否可行?)确实包含您问题的答案。
此选项目前在 Apache Beam SDK 中不可用,尽管Apache Beam JIRA 问题跟踪器BEAM-123中有一个开放的功能请求。请注意,在撰写本文时,此功能请求仍处于开放状态且未解决,并且已经有 2 年了。但是,从这个意义上说,似乎正在做一些努力,并且该问题的最新更新是从 2018 年 2 月开始的,所以我建议您保持对该 JIRA 问题的更新,因为它最后被移动到组件中,并且它那里可能会受到更多关注。sdk-java-core
考虑到这些信息,我想说您使用的方法(在将文件上传到 GCS 之前删除标题)是您的最佳选择。我会避免手动执行此操作,因为您可以轻松编写脚本并自动化删除标头⟶上传文件过程。
编辑:
我已经能够使用DoFn
. 它可能不是最优雅的解决方案(我自己不是 Apache Beam 专家),但它确实有效,并且您可以根据自己的需要调整它。它要求您事先知道要上传的 CSV 文件的标题(因为它将按元素内容过滤),但同样,将其作为您可以根据需要进行修改的模板:
public class RemoveCSVHeader {
// The Filter class
static class FilterCSVHeaderFn extends DoFn<String, String> {
String headerFilter;
public FilterCSVHeaderFn(String headerFilter) {
this.headerFilter = headerFilter;
}
@ProcessElement
public void processElement(ProcessContext c) {
String row = c.element();
// Filter out elements that match the header
if (!row.equals(this.headerFilter)) {
c.output(row);
}
}
}
// The main class
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<String> vals = p.apply(TextIO.read().from("gs://BUCKET/FILE.csv"));
String header = "col1,col2,col3,col4";
vals.apply(ParDo.of(new FilterCSVHeaderFn(header)))
.apply(TextIO.write().to("out"));
p.run().waitUntilFinish();
}
}
推荐阅读
- c - 使用 scanf() 分配内存和读表
- tabs - 如何阻止 Microsoft Edge 在新选项卡中打开 Bing 链接
- java - JavaFX 中的可变字体
- c# - 为什么 AsyncMethodBuilder.Start() 方法需要检查线程上下文是否已更改?
- java - 防止任何使用 Java 的应用程序截取屏幕截图
- neo4j - neo4j 浏览器运行缓慢
- laravel - 左外连接在 laravel 6 中不能按预期工作
- python - 无法验证我对特定网站的查询
- docker - 我无法模拟与 docker 容器的网络断开连接
- c++ - 如何解决多文件 cpp 项目中的链接器错误