java - 我应该设置哪些参数来在 Spark 1.6 中处理 100 GB 的 Csv
问题描述
我是 Spark 的新手,我的用例是在 spark 中处理 100 Gb 文件并将其加载到 hive 中。我每个集群有一个 2 节点 128GB 内存。通过处理,我的意思是在我现有的 csv 中添加一个额外的列,其值是在运行时计算的。但是每次我运行 spark-submit 时,它都会抛出以下错误:-
线程“task-result-getter-1”中的异常 java.lang.OutOfMemoryError:超出 GC 开销限制 在 org.apache.spark.unsafe.types.UTF8String.read(UTF8String.java:1205) 在 com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:363) 在 com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:355) 在 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) 在 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) 在 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) 在 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) 在 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) 在 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) 在 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) 在 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) 在 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) 在 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) 在 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311) 在 org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97) 在 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60) 在 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) 在 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) 在 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1819) 在 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)
我尝试使用的命令如下:-
火花提交--master纱线客户端\ --executor-memory 8G --total-executor-cores 2 \ --class“com.test.app.Test”\ spark.old-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ 严酷测试数据严酷测试数据\ --jars spark-csv_2.10-1.5.0.jar
笔记:
harshaltestdata
是我在 HDFS 中的 Csv 名称harshaltestdata
是我的表名。
我已经尝试了高达 50 mb 的文件代码并且它工作正常,但是当我尝试超过 100 mb 时它失败了。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions.lit
object Test {
def main(args: Array[String]) {
//table csv name as in
val csvName = args(0)
val tableName = args(1)
System.setProperty("SPARK_YARN_MODE", "true");
val sparkConfiguration = new SparkConf();
sparkConfiguration.setMaster("yarn-client");
sparkConfiguration.setAppName("test-spark-job");
sparkConfiguration
.set("spark.executor.memory", "12g")
.set("spark.kryoserializer.buffer.max", "512")
val sparkContext = new SparkContext(sparkConfiguration);
println("started spark job")
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val hiveContext = new HiveContext(sparkContext)
val data = hiveContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("hdfs_path***" + csvName + ".csv");
//Printing in lines
data.collect().foreach(println)
//Printing in tabular form
data.show()
val newdf = data.withColumn("date", lit("10-04-19"))
newdf.withColumn("date", lit("10-04-19"))
newdf.write.mode("append").saveAsTable(tableName)
val d = hiveContext.sql("select * from " + tableName)
d.show()
}
}
预期结果是文件应该得到处理并在 Hive 中加载
解决方案
collect()
如果你真的不需要它,千万不要使用它,它会导致内存问题,尤其是当你有大的 CSV 文件时。
而且第二行是多余的,可以去掉。
val newdf = data.withColumn("date", lit("10-04-19"))
newdf.withColumn("date", lit("10-04-19")) // It means nothing, you can remove it.
newdf.write.mode("append").saveAsTable(tableName)
推荐阅读
- reactjs - 如何使用 React 测试库测试来自 React Redux 的 Field 组件?
- python - 用 Pandas DataFrame 中最近正常数据点的平均值替换异常值
- cypress - 如何使用柏树从下拉列表中选择值?
- c# - 需要在 C# 中将 simpledateformat(EEE MMM d HH:mm:ss z yyyy) 转换为 DateTime
- excel - 为什么我的代码不适用于我在编写代码后添加的单元格?
- python - 如何更改标签的背景颜色或以其他方式使它们更具可读性?
- ruby-on-rails - 从 YML 文件设置 rails 重定向
- node.js - React 单元测试在管道中失败
- json - I'm looking for a JSON file in a web stream to use with selenieum in python to scrape all the review text
- bash - 给定参数行号和带有特殊字符的替换字符串用新行替换文件中的整行