首页 > 解决方案 > 我应该设置哪些参数来在 Spark 1.6 中处理 100 GB 的 Csv

问题描述

我是 Spark 的新手,我的用例是在 spark 中处理 100 Gb 文件并将其加载到 hive 中。我每个集群有一个 2 节点 128GB 内存。通过处理,我的意思是在我现有的 csv 中添加一个额外的列,其值是在运行时计算的。但是每次我运行 spark-submit 时,它都会抛出以下错误:-

线程“task-result-getter-1”中的异常 java.lang.OutOfMemoryError:超出 GC 开销限制
        在 org.apache.spark.unsafe.types.UTF8String.read(UTF8String.java:1205)
        在 com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:363)
        在 com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:355)
        在 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        在 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        在 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        在 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
        在 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
        在 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        在 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        在 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        在 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        在 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        在 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
        在 org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
        在 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
        在 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        在 org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        在 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1819)
        在 org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
        在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        在 java.lang.Thread.run(Thread.java:748)

我尝试使用的命令如下:-

火花提交--master纱线客户端\
             --executor-memory 8G --total-executor-cores 2 \
             --class“com.test.app.Test”\
             spark.old-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
             严酷测试数据严酷测试数据\
             --jars spark-csv_2.10-1.5.0.jar

笔记:

我已经尝试了高达 50 mb 的文件代码并且它工作正常,但是当我尝试超过 100 mb 时它失败了。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions.lit

object Test {
  def main(args: Array[String]) {
    //table csv name as in
    val csvName = args(0)
    val tableName = args(1)
    System.setProperty("SPARK_YARN_MODE", "true");
    val sparkConfiguration = new SparkConf();
    sparkConfiguration.setMaster("yarn-client");
    sparkConfiguration.setAppName("test-spark-job");
    sparkConfiguration
      .set("spark.executor.memory", "12g")
      .set("spark.kryoserializer.buffer.max", "512")

    val sparkContext = new SparkContext(sparkConfiguration);
    println("started spark job")
    val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
    val hiveContext = new HiveContext(sparkContext)

    val data = hiveContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("mode", "DROPMALFORMED")
      .load("hdfs_path***" + csvName + ".csv");
    //Printing in lines
    data.collect().foreach(println)
    //Printing in tabular form
    data.show()
    val newdf = data.withColumn("date", lit("10-04-19"))
    newdf.withColumn("date", lit("10-04-19"))
    newdf.write.mode("append").saveAsTable(tableName)

    val d = hiveContext.sql("select * from " + tableName)
    d.show()
    }
    }

预期结果是文件应该得到处理并在 Hive 中加载

标签: javapythonscalaapache-spark

解决方案


collect()如果你真的不需要它,千万不要使用它,它会导致内存问题,尤其是当你有大的 CSV 文件时。

而且第二行是多余的,可以去掉。

val newdf = data.withColumn("date", lit("10-04-19"))
newdf.withColumn("date", lit("10-04-19")) // It means nothing, you can remove it.
newdf.write.mode("append").saveAsTable(tableName)

推荐阅读