首页 > 解决方案 > apache spark数据框导致内存不足

问题描述

我的应用程序在微小对象的大型数据集上运行良好。但是随着对数据帧的对象操作越来越多,会遇到内存不足的异常。基本上应用程序读取一个序列文件并将键/值转换为具有一些列的行对象,进行一些排序,最后转换回 rdd 以写入一个序列文件。

如果行对象很小 - 大约 20kbytes 的数据 - 一切都很好。在第二次运行中,行对象包含大约 2mb 的数据,并且 spark 遇到内存不足的问题。我测试了几个选项,更改分区大小和计数,但应用程序运行不稳定。

为了重现此问题,我创建了以下示例代码。一个 10000 个 int 对象的 rdd 映射到一个 2mb 长度的字符串(假设每个字符 16 位,大概是 4mb)。10.000 乘以 2mb 大约是 20gb 的数据。操作 show() 排序和 show() 再次按预期工作。日志表明 spark 正在存储到磁盘。但是当将结果写入 csv 文件、hadoop 文件或只是调用 toJavaRDD() 时,应用程序以内存不足结束:java 堆空间似乎 spark 仅在内存中保存数据。

// create spark session
SparkSession spark = SparkSession.builder()
        .appName("example1")
        .master("local[2]")
        .config("spark.driver.maxResultSize","0")
        .getOrCreate();

JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

// base to generate huge data
List<Integer> list = new ArrayList<>();
for (int val = 1; val < 10000; val++) {
    int valueOf = Integer.valueOf(val);
    list.add(valueOf);
}
// create simple rdd of int
JavaRDD<Integer> rdd = sc.parallelize(list,200);
// use map to create large object per row
JavaRDD<Row> rowRDD =
        rdd.map(value -> RowFactory.create(String.valueOf(value), createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
        ;

StructType type = new StructType();
type = type
        .add("c1", DataTypes.StringType)
        .add( "c2", DataTypes.StringType );

Dataset<Row> df = spark.createDataFrame(rowRDD, type);
// works
df.show();
df = df.sort( col("c2").asc() );
// takes a lot of time but works
df.show();
// OutOfMemoryError: java heap space
df.write().csv("d:/temp/my.csv");
// OutOfMemoryError: java heap space
df
        .toJavaRDD()
        .mapToPair(row -> new Tuple2(new Text(row.getString(0)), new Text( row.getString(1))))
        .saveAsHadoopFile("d:\\temp\\foo", Text.class, Text.class, SequenceFileOutputFormat.class );

操作 createLongText() 创建一个长字符串。

private static String createLongText( String text, int minLength ) {
    String longText = text;
    while( longText.length() < minLength ) {
        longText = longText + longText;
    }
    return longText;
}

JVM 在 Java 1.8u171 中使用 4GB 堆 (-Xms4g -Xmx4g) 运行,默认 GC (GC1) 和可选 -XX:+UseParallelGC 在本地模式下。堆大小向上限增加,gc 活动增加。在大多数情况下,oom 会导致驱动程序崩溃,在某些情况下,高 gc-activity 会导致驱动程序超时。这是 spark.driver.maxResultSize 第一次设置为 0(无限制)。当使用较小的对象时(几乎相同数量的数据,更多的对象,更少的数据),任何工作都很好。似乎从 dataFrame 到 rdd 的任何转换都会绑定许多内存资源并强制将所有结果发送回驱动程序。我在崩溃之前创建了一个堆转储,并看到了一个大型的 scala 数组数组。内部数组包含大约 32mb(分区大小!?),内部数组的总和大约是分区数(115)

8/07/26 21:58:31 INFO MemoryStore: Block taskresult_144 stored as bytes in memory (estimated size 47.7 MB, free 86.3 MB)
18/07/26 21:58:31 INFO BlockManagerInfo: Added taskresult_144 in memory on blackhawk:61185 (size: 47.7 MB, free: 86.4 MB)
18/07/26 21:58:31 INFO Executor: Finished task 143.0 in stage 1.0 (TID 144). 50033768 bytes result sent via BlockManager)
18/07/26 21:58:31 INFO TaskSetManager: Starting task 145.0 in stage 1.0 (TID 146, localhost, executor driver, partition 145, PROCESS_LOCAL, 8355 bytes)
18/07/26 21:58:31 INFO Executor: Running task 145.0 in stage 1.0 (TID 146)
18/07/26 21:58:35 INFO TaskSetManager: Finished task 104.0 in stage 1.0 (TID 105) in 36333 ms on localhost (executor driver) (105/200)
18/07/26 21:58:35 ERROR Executor: Exception in task 144.0 in stage 1.0 (TID 145)
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.lang.StringCoding.safeTrim(StringCoding.java:79)
    at java.lang.StringCoding.encode(StringCoding.java:365)
    at java.lang.String.getBytes(String.java:941)
    at org.apache.spark.unsafe.types.UTF8String.fromString(UTF8String.java:141)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_1$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:592)
    at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:592)

知道这里有什么问题吗?还是 spark 2.3.1 中的错误?

标签: javaapache-sparkapache-spark-sqlout-of-memory

解决方案


尽量不要使用 GC1 垃圾器,这可能是连续内存分配和 GC1 32mb 区域的错误。

请阅读http://labs.criteo.com/2018/01/spark-out-of-memory/,可能是这样吗?


推荐阅读