scala - 调用超过 1,000 列的 stddev 时,SparkSQL 作业失败
问题描述
我在使用 Spark 2.2.1 和 Scala 2.11 的 DataBricks。我正在尝试运行如下所示的 SQL 查询。
select stddev(col1), stddev(col2), ..., stddev(col1300)
from mydb.mytable
然后我执行如下代码。
myRdd = sqlContext.sql(sql)
但是,我看到引发了以下异常。
作业因阶段失败而中止:阶段 16.0 中的任务 24 失败 4 次,最近一次失败:阶段 16.0 中丢失任务 24.3(TID 1946、10.184.163.105、执行程序 3):org.codehaus.janino.JaninoRuntimeException:编译失败: org.codehaus.janino.JaninoRuntimeException:类 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection 的常量池已超过 0xFFFF 的 JVM 限制 /* 001 */ public java.lang.Object generate(Object[] references) { /* 002 */ return new SpecificMutableProjection(references); /* 003 */ } /* 004 */ /* 005 */ 类 SpecificMutableProjection 扩展 org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { /* 006 */ /* 007 */ 私有对象[] 引用; /* 008 */ private InternalRow mutableRow; /* 009 */ 私有布尔 evalExprIsNull; /* 010 */ 私有布尔 evalExprValue; /* 011 */ 私有布尔值 evalExpr1IsNull; /* 012 */ 私有布尔 evalExpr1Value; /* 013 */ 私有布尔值 evalExpr2IsNull; /* 014 */ 私有布尔 evalExpr2Value; /* 015 */ 私有布尔 evalExpr3IsNull; /* 016 */ 私有布尔 evalExpr3Value; /* 017 */ 私有布尔值 evalExpr4IsNull; /* 018 */ 私有布尔 evalExpr4Value; /* 019 */ 私有布尔值 evalExpr5IsNull; /* 020 */ 私有布尔 evalExpr5Value; /* 021 */ 私有布尔 evalExpr6IsNull;
堆栈跟踪一直在继续,甚至 Databricks 笔记本也会因为冗长而崩溃。有人见过这个吗?
此外,我有以下 2 个 SQL 语句来获得我执行的平均值和中位数,没有任何问题。
select avg(col1), ..., avg(col1300) from mydb.mytable
select percentile_approx(col1, 0.5), ..., percentile_approx(col1300, 0.5) from mydb.mytable
问题似乎与stddev
但例外没有帮助。关于发生了什么的任何想法?是否有另一种方法可以轻松计算不会导致此问题的标准偏差?
原来这篇文章描述了同样的问题,说由于 64KB 大小的类的限制,Spark 无法处理宽模式或大量列。但是,如果是这样的话,那么为什么avg
和percentile_approx
工作呢?
解决方案
几个选项:
尝试禁用整个阶段代码生成:
spark.conf.set("spark.sql.codegen.wholeStage", false)
如果上述内容无助于切换到 RDD(由zeo323从这个答案中采用):
import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer val columns: Seq[String] = ??? df .select(columns map (col(_).cast("double")): _*) .rdd .map(row => Vectors.dense(columns.map(row.getAs[Double](_)).toArray)) .aggregate(new MultivariateOnlineSummarizer)( (agg, v) => agg.add(v), (agg1, agg2) => agg1.merge(agg2))
VectorAssembler
使用和 use 将列组装成一个向量,Aggregator
类似于此处使用的调整finish
方法(您可能需要一些额外的调整才能转换ml.linalg.Vectors
为mllib.linalg.Vectors
)。
但是,如果是这样,那么为什么 avg 和 percentile_approx 起作用?
Spark 从字面上为这些阶段生成 Java 代码。因为逻辑不一样,所以输出大小会有所不同。
推荐阅读
- javascript - 使用 prop-types 时检查对象中的键是否为字符串
- jquery - 向 Vuejs 组件中的元素添加类会修改该组件的所有实例
- python - 如何优化 o(n**2) 算法成为 o(nlogn) 或 o(n)?
- python - 为什么在 for 循环期间,在漂亮的汤解析中途出现“IndexError:list index out of range”?
- reactjs - 更改 s3 存储桶文件时出现 403 禁止错误
- go - 如何使用结构显示所有记录
- flutter - 文本字段光标无法正常工作
- java - DocuSign 自动回复收件人事件
- javascript - 如何在 React Native 中使用相同的 Firebase 数据库在两个应用程序之间进行通信?
- postgresql - Postgres 中的部分索引