scala - 分别处理spark中的多个目录
问题描述
我在 HDFS 中有一个目录列表,每个目录都包含几个文件。我的目标是将一个目录中的所有文件合并到一个文件中,但每个目录分别合并。在火花中最快的方法是什么?依次遍历所有目录太慢了。所以我想并行进行。一种解决方案可能是使用线程池。也许有更好更快更本地的?
谢谢!
解决方案
考虑以下测试目录foo
并bar
包含以下文件:
cat /tmp/foo/0.csv
4
cat /tmp/foo/1.csv
3
cat /tmp/bar/0.csv
7
我们可以使用以下代码段来阅读它们:
val df = spark.read.csv("/tmp/foo", "/tmp/bar")
.withColumn("dir", regexp_extract(input_file_name(), """([^/]*)/[^/]+\.csv$""", 1))
df.show()
/*
+---+---+
|_c0|dir|
+---+---+
|4 |foo|
|3 |foo|
|7 |bar|
+---+---+
*/
函数input_file_name
给出了文件的绝对路径,因此我们可以使用它来获取目录。函数regexp_extract
仅用于转换例如/tmp/foo/1.csv -> foo
。
Spark 写入文件时,每个分区输出一个文件。因此,我们需要按列重新分区dir
以合并每个目录下的所有文件。最后,我们也可以使用partitionBy
获取输出文件结构的目录名。例如
df.repartition($"dir")
.write
.partitionBy("dir")
.csv("/tmp/out")
会产生文件
/tmp/out/._SUCCESS.crc
/tmp/out/dir=bar/.part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc
/tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/_SUCCESS
/tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/dir=foo/.part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc
其中/tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
包含
7
并/tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
包含
4
3
AFAIK 如果没有自定义的 HadoopFileSystem
类等,就不可能将这些输出文件写入与原始输入相同的目录结构。
推荐阅读
- php - 如何使用 href 链接签入选项?
- sql - 基于下一行的值的行数
- web-crawler - 有没有办法在风暴爬虫中包含站点地图网址而不是 robots.txt 中的站点地图?
- html - 在 CSS 中的导航栏之间添加间距
- python - 2 天后 'Connection aborted.', RemoteDisconnected('Remote end closed connection without response',)
- php - 如何使用“preg_replace”或更短的代码隐藏特定帖子?
- php - Prestashop 1.7 使用 SqlTranslationLoader.php 手动升级错误
- html - 如何使列表组(引导程序)仅占用少量空间但仍保持中心
- java - 可以在android studio中组合图像和字符串
- android - Android 制造商拥有自己的指纹/面部。id SDK