首页 > 解决方案 > 调用超过 1,000 列的 stddev 时,SparkSQL 作业失败

问题描述

我在使用 Spark 2.2.1 和 Scala 2.11 的 DataBricks。我正在尝试运行如下所示的 SQL 查询。

select stddev(col1), stddev(col2), ..., stddev(col1300)
from mydb.mytable

然后我执行如下代码。

myRdd = sqlContext.sql(sql)

但是,我看到引发了以下异常。

作业因阶段失败而中止:阶段 16.0 中的任务 24 失败 4 次,最近一次失败:阶段 16.0 中丢失任务 24.3(TID 1946、10.184.163.105、执行程序 3):org.codehaus.janino.JaninoRuntimeException:编译失败: org.codehaus.janino.JaninoRuntimeException:类 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection 的常量池已超过 0xFFFF 的 JVM 限制
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */ return new SpecificMutableProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ 类 SpecificMutableProjection 扩展 org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
/* 006 */
/* 007 */ 私有对象[] 引用;
/* 008 */ private InternalRow mutableRow;
/* 009 */ 私有布尔 evalExprIsNull;
/* 010 */ 私有布尔 evalExprValue;
/* 011 */ 私有布尔值 evalExpr1IsNull;
/* 012 */ 私有布尔 evalExpr1Value;
/* 013 */ 私有布尔值 evalExpr2IsNull;
/* 014 */ 私有布尔 evalExpr2Value;
/* 015 */ 私有布尔 evalExpr3IsNull;
/* 016 */ 私有布尔 evalExpr3Value;
/* 017 */ 私有布尔值 evalExpr4IsNull;
/* 018 */ 私有布尔 evalExpr4Value;
/* 019 */ 私有布尔值 evalExpr5IsNull;
/* 020 */ 私有布尔 evalExpr5Value;
/* 021 */ 私有布尔 evalExpr6IsNull;

堆栈跟踪一直在继续,甚至 Databricks 笔记本也会因为冗长而崩溃。有人见过这个吗?

此外,我有以下 2 个 SQL 语句来获得我执行的平均值和中位数,没有任何问题。

select avg(col1), ..., avg(col1300) from mydb.mytable
select percentile_approx(col1, 0.5), ..., percentile_approx(col1300, 0.5) from mydb.mytable

问题似乎与stddev但例外没有帮助。关于发生了什么的任何想法?是否有另一种方法可以轻松计算不会导致此问题的标准偏差?

原来这篇文章描述了同样的问题,说由于 64KB 大小的类的限制,Spark 无法处理宽模式或大量列。但是,如果是这样的话,那么为什么avgpercentile_approx工作呢?

标签: scalaapache-sparkdatabricks

解决方案


几个选项:

  • 尝试禁用整个阶段代码生成:

    spark.conf.set("spark.sql.codegen.wholeStage", false)
    
  • 如果上述内容无助于切换到 RDD(由zeo323从这个答案中采用):

    import org.apache.spark.mllib.linalg._
    import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
    
    val columns: Seq[String] = ???
    
    df
      .select(columns map (col(_).cast("double")): _*)
      .rdd
      .map(row => Vectors.dense(columns.map(row.getAs[Double](_)).toArray))
      .aggregate(new MultivariateOnlineSummarizer)(
         (agg, v) => agg.add(v), 
         (agg1, agg2) => agg1.merge(agg2))
    
  • VectorAssembler使用和 use 将列组装成一个向量,Aggregator类似于此处使用的调整finish方法(您可能需要一些额外的调整才能转换ml.linalg.Vectorsmllib.linalg.Vectors)。

但是,如果是这样,那么为什么 avg 和 percentile_approx 起作用?

Spark 从字面上为这些阶段生成 Java 代码。因为逻辑不一样,所以输出大小会有所不同。


推荐阅读