csv - Spark中csv的多行标题
问题描述
我需要 Spark 将压缩的 csv 文件写入 HDFS,但我需要它以几行版本信息开头。
文件内容示例
version=2
date=2020-01-31
id,name,age
1,Alice,21
2,Bob,23
三种方法的想法
- 首先写入 hdfs://data/tmp/file1.csv.gz,然后使用 hadoop fs -cat 将其流式传输到 hdfs://data/real/file1.csv.gz
- 将输出数据帧转换为文本格式/
RDD[String]
并使用额外的标题行联合真实文件 - 将第一列名称更改为多行
所以对于方法3:
column1 ="version=2\ndate=2020-01-31\n\nid"
如果您知道更优雅的方法来做到这一点,请告诉我。
解决方案
我尝试了所有方法。这是简化的代码:
方法一
方法 1 是使用来自 bash 脚本等的 Hadoop 命令。
这可行,但需要双 HDFS 写入和清理。它也不太适合 Scala Spark 项目。
(echo -e "version=2\ndate=2020-01-31\n\nid,name,age" | gzip -vc ; hadoop fs -cat "$INPUT_DIR/*" ) | hadoop fs -put - "$OUTPUT_PATH"
这里发生的事情是它将
- 将多行标题回显到标准输出
- 将其通过管道传输到 gzip 和 stdout
- 将其他 HDSF 目录通过管道传输到标准输出
- 管道进入
hadoop fs -put
它将结合一切
方法二
代码有点复杂,标题中的引号字符还不错,但标题有时会出现在 csv 部分之后。
import org.apache.hadoop.io.compress.GzipCodec
val heading = """version=2
date=2020-01-31
id,name,age""".split("\n", -1).toSeq
val headingRdd: RDD[String] = sc.parallelize(heading)
val mediamathRdd: RDD[String] = df.rdd.map(row => row.mkString(","))
val combinedResult: RDD[String] = (headingRdd union mediamathRdd)
combinedResult.repartition(1).saveAsTextFile(path, classOf[GzipCodec])
方法 3
最简单的方法,但输出略有偏差
df.repartition(1)
.withColumnRenamed("id", "version=2\ndate=2020-01-31\n\nid")
.option("header", true)
.option("delimiter", ",")
.option("quoteMode", "NONE")
.option("quote", " ")
.option("codec", "gzip")
.csv(path)
结果将如下所示,这可能是可接受的,也可能是不可接受的
version=2
date=2020-01-31
id ,name,age
1,Alice,21
2,Bob,23
我也尝试过:
.option("quote", "\u0000")
它实际上打印了 ascii 宪章零,虽然这没有出现在我的 HDFS 查看器中,但这不是规范的一部分。
最佳方法
它们都不适合看似非常简单的任务。也许有一个小修复可以使方法 2 完美运行。
推荐阅读
- c++ - 如何通过函数的 void 指针参数传递 int 值并将其转换回 int 值?
- javascript - 事件和事件处理是 JavaScript 语言本身的固有部分吗?
- sql-server - 将变量传递给另一个案例
- python - 为什么我的 KernelReg.bw 为负数?它不应该是积极的吗?我的代码有什么问题?
- c# - 仅当执行参数相同时,如何阻止 Web 应用程序中的进程?
- fabricjs - 擦除 Fabric JS 上的线条
- javascript - 如何将 HTTP get 请求的响应保存到 Angular 7 中的 json 文件中?
- sql-server - 总销售额,每个日期的 MTD 销售额
- microservices - 如何使用 vert.x 对运行在不同服务器上的两个微服务进行集群?
- java - Java ResourceBundle.getBundle 如何加载属性文件?