首页 > 解决方案 > Spark中csv的多行标题

问题描述

我需要 Spark 将压缩的 csv 文件写入 HDFS,但我需要它以几行版本信息开头。

文件内容示例

version=2
date=2020-01-31

id,name,age
1,Alice,21
2,Bob,23

三种方法的想法

  1. 首先写入 hdfs://data/tmp/file1.csv.gz,然后使用 hadoop fs -cat 将其流式传输到 hdfs://data/real/file1.csv.gz
  2. 将输出数据帧转换为文本格式/RDD[String]并使用额外的标题行联合真实文件
  3. 将第一列名称更改为多行

所以对于方法3:

column1 ="version=2\ndate=2020-01-31\n\nid"

如果您知道更优雅的方法来做到这一点,请告诉我。

标签: csvapache-sparkhadoophdfs

解决方案


我尝试了所有方法。这是简化的代码:

方法一

方法 1 是使用来自 bash 脚本等的 Hadoop 命令。

这可行,但需要双 HDFS 写入和清理。它也不太适合 Scala Spark 项目。

(echo -e "version=2\ndate=2020-01-31\n\nid,name,age" | gzip  -vc ; hadoop fs -cat "$INPUT_DIR/*" ) | hadoop fs -put - "$OUTPUT_PATH"

这里发生的事情是它将

  1. 将多行标题回显到标准输出
  2. 将其通过管道传输到 gzip 和 stdout
  3. 将其他 HDSF 目录通过管道传输到标准输出
  4. 管道进入hadoop fs -put它将结合一切

方法二

代码有点复杂,标题中的引号字符还不错,但标题有时会出现在 csv 部分之后。

import org.apache.hadoop.io.compress.GzipCodec

val heading = """version=2
date=2020-01-31

id,name,age""".split("\n", -1).toSeq

val headingRdd: RDD[String] = sc.parallelize(heading)

val mediamathRdd: RDD[String] = df.rdd.map(row => row.mkString(","))

val combinedResult: RDD[String] = (headingRdd union mediamathRdd)

combinedResult.repartition(1).saveAsTextFile(path, classOf[GzipCodec])

方法 3

最简单的方法,但输出略有偏差

df.repartition(1)
.withColumnRenamed("id", "version=2\ndate=2020-01-31\n\nid")
.option("header", true)
.option("delimiter", ",")
.option("quoteMode", "NONE")
.option("quote", " ")
.option("codec", "gzip")
.csv(path)

结果将如下所示,这可能是可接受的,也可能是不可接受的

 version=2
date=2020-01-31

id ,name,age
1,Alice,21
2,Bob,23

我也尝试过:

.option("quote", "\u0000")

它实际上打印了 ascii 宪章零,虽然这没有出现在我的 HDFS 查看器中,但这不是规范的一部分。

最佳方法

它们都不适合看似非常简单的任务。也许有一个小修复可以使方法 2 完美运行。


推荐阅读