java - 使用 Spark 读取巨大的 CSV 文件
问题描述
我有 27 GB gz csv 文件,我正在尝试用 Spark 读取。我们最大的节点有 30 GB 的内存。
当我尝试读取文件时,只有一个执行程序正在加载数据(我正在监视内存和网络),其他 4 个已过时。
一段时间后,它由于内存而崩溃。
有没有办法并行读取这个文件?
Dataset<Row> result = sparkSession.read()
.option("header","true")
.option("escape", "\"")
.option("multiLine","true")
.format("csv")
.load("s3a://csv-bucket");
result.repartition(10)
spark_conf:
spark.executor.memoryOverhead: "512"
spark.executor.cores: "5"
driver:
memory: 10G
executor:
instances: 5
memory: 30G
解决方案
当涉及到大量数据时,您必须重新分区数据
在火花中的并行单位是分区
Dataset<Row> result = sparkSession.read()
.option("header","true")
.option("escape", "\"")
.option("multiLine","true")
.format("csv")
.load("s3a://csv-bucket");
result.repartition(5 * 5 *3) ( number of executors i.e.5 * cores i.e. 5 * replicationfactor i.e. 2-3) i.e. 25 might be working for you to ensure uniform disribution data.
交叉检查每个分区有多少条记录
import org.apache.spark.sql.functions.spark_partition_id
yourcsvdataframe.groupBy(spark_partition_id).count.show()
例子 :
val mycsvdata =
"""
|rank,freq,Infinitiv,Unreg,Trans,"Präsens_ich","Präsens_du","Präsens_er, sie, es","Präteritum_ich","Partizip II","Konjunktiv II_ich","Imperativ Singular","Imperativ Plural",Hilfsverb
|3,3796784,sein,"","",bin,bist,ist,war,gewesen,"wäre",sei,seid,sein
|8,1618550,haben,"","",habe,hast,hat,hatte,gehabt,"hätte",habe,habt,haben
|10,1379496,einen,"","",eine,einst,eint,einte,geeint,einte,eine,eint,haben
|12,948246,werden,"","",werde,wirst,wird,wurde,geworden,"würde",werde,werdet,sein
""".stripMargin.lines.toList.toDS
val csvdf: DataFrame = spark.read.option("header", true)
.option("header", true)
.csv(mycsvdata)
csvdf.show(false)
println("all the 4 records are in single partition 0 ")
import org.apache.spark.sql.functions.spark_partition_id
csvdf.groupBy(spark_partition_id).count.show()
println( "now divide data... 4 records to 2 per partition")
csvdf.repartition(2).groupBy(spark_partition_id).count.show()
结果 :
+----+-------+---------+-----+-----+-----------+----------+-------------------+--------------+-----------+-----------------+------------------+----------------+---------+
|rank|freq |Infinitiv|Unreg|Trans|Präsens_ich|Präsens_du|Präsens_er, sie, es|Präteritum_ich|Partizip II|Konjunktiv II_ich|Imperativ Singular|Imperativ Plural|Hilfsverb|
+----+-------+---------+-----+-----+-----------+----------+-------------------+--------------+-----------+-----------------+------------------+----------------+---------+
|3 |3796784|sein |null |null |bin |bist |ist |war |gewesen |wäre |sei |seid |sein |
|8 |1618550|haben |null |null |habe |hast |hat |hatte |gehabt |hätte |habe |habt |haben |
|10 |1379496|einen |null |null |eine |einst |eint |einte |geeint |einte |eine |eint |haben |
|12 |948246 |werden |null |null |werde |wirst |wird |wurde |geworden |würde |werde |werdet |sein |
+----+-------+---------+-----+-----+-----------+----------+-------------------+--------------+-----------+-----------------+------------------+----------------+---------+
all the 4 records are in single partition 0
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
| 0| 4|
+--------------------+-----+
now divide data... 4 records to 2 per partition
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
| 1| 2|
| 0| 2|
+--------------------+-----+