scala - InternalCompilerException:编译类是通过不同的加载器加载的
问题描述
我正在使用 Scala 解释器来评估来自配置的 Scala 语句。
示例代码是:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.IMain
object BSFTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("TEST")
.setMaster("local") // spark://127.0.0.1:7077
val sparkSession = SparkSession.builder()
.appName("TEST")
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
import sparkSession.sql
sql ("DROP DATABASE IF EXISTS test CASCADE")
sql(s"CREATE DATABASE test")
sql ("CREATE TABLE test.box_width (id INT, width INT)")
sql ("INSERT INTO test.box_width VALUES (1,1), (2,2)")
sql ("CREATE TABLE test.box_length (id INT, length INT)")
sql ("INSERT INTO test.box_length VALUES (1,10), (2,20)")
val widthDF:DataFrame = sql("select * from test.box_width")
val lengthDF = sql("select * from test.box_length")
val settings = new Settings
settings.usejavacp.value = true
settings.deprecation.value = true
settings.embeddedDefaults(this.getClass().getClassLoader())
val eval = new IMain(settings)
eval.bind("lengthDF", "org.apache.spark.sql.DataFrame", lengthDF)
eval.bind("widthDF", "org.apache.spark.sql.DataFrame", widthDF)
val clazz1 = "lengthDF.join(widthDF, \"id\")" //STATEMENT FROM CONFIGURATION
val evaluated = eval.interpret(clazz1)
val res = eval.valueOfTerm("res0").get.asInstanceOf[DataFrame]
println("PRINT SCHEMA: " + res.schema) //This statement is running fine
res.show() //EXCEPTION HERE
}
}
执行代码时出现以下错误:
lengthDF: org.apache.spark.sql.DataFrame = [id: int, length: int]
widthDF: org.apache.spark.sql.DataFrame = [id: int, width: int]
res0: org.apache.spark.sql.DataFrame = [id: int, length: int ... 1 more field]
PRINT SCHEMA: StructType(StructField(id,IntegerType,true), StructField(length,IntegerType,true), StructField(width,IntegerType,true))
18/10/24 15:08:14 ERROR CodeGenerator: failed to compile: org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Class 'org.apache.spark.sql.catalyst.expressions.codegen.GeneratedClass' was loaded through a different loader
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */ return new SpecificSafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/*....
Caused by: org.codehaus.janino.InternalCompilerException: Class 'org.apache.spark.sql.catalyst.expressions.codegen.GeneratedClass' was loaded through a different loader
at org.codehaus.janino.SimpleCompiler$2.getDelegate(SimpleCompiler.java:410)
at org.codehaus.janino.SimpleCompiler$2.accept(SimpleCompiler.java:353)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6130)
我无法理解,即使 res.schema(从 DataFrame 获取模式)按预期运行,res.show(从 DataFrame 检索数据和打印)也会引发异常
版本:
scalaVersion := "2.11.11"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.2"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.2.2"
我能做些什么来解决这个问题?
解决方案
我已经解决了这个问题,参考https://stackoverflow.com/a/6164608/811602
现在我在运行时创建和加载类:这是工作代码
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.IMain
import java.util.concurrent.atomic.AtomicInteger
object DynamicClassLoader {
val offset = new AtomicInteger()
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("TEST")
.setMaster("local") // spark://127.0.0.1:7077
val sparkSession = SparkSession.builder()
.appName("TEST")
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
import sparkSession.sql
sql ("DROP DATABASE IF EXISTS test CASCADE")
sql(s"CREATE DATABASE test")
sql ("CREATE TABLE test.box_width (id INT, width INT)")
sql ("INSERT INTO test.box_width VALUES (1,1), (2,2)")
sql ("CREATE TABLE test.box_length (id INT, length INT)")
sql ("INSERT INTO test.box_length VALUES (1,10), (2,20)")
val widthDF = sql("select * from test.box_width")
val lengthDF = sql("select * from test.box_length")
var udfclassName:String = "AClass" + offset.getAndIncrement()
var statements = """
| val result = input1.join(input2, "id")
| return result
| """.stripMargin
val srcA = """
| class """.stripMargin + udfclassName + """ extends SomeTrait {
| import org.apache.spark.sql.DataFrame
| def someMethod(input1:DataFrame, input2: DataFrame): DataFrame = {
| """.stripMargin +
statements +
"""}
| }
""".stripMargin
val settings = new Settings
settings.usejavacp.value = true
settings.deprecation.value = true
settings.embeddedDefaults(this.getClass().getClassLoader())
val eval = new IMain(settings)
eval.compileString(srcA)
val classA = eval.classLoader.loadClass(udfclassName)
eval.close()
val objA = classA.newInstance().asInstanceOf[SomeTrait]
val resultDF = objA.someMethod(lengthDF, widthDF)
println(resultDF.schema)
resultDF.show()
}
}
trait SomeTrait { def someMethod(input1:DataFrame, input2: DataFrame): DataFrame}
虽然我没有因为发布的问题而被阻止并想出了实现相同目标的替代方法,但问题仍然存在,因为异常的根本原因仍然有待查明和解决
推荐阅读
- javascript - 更新对象中所有发生的键的值
- c# - 迭代器变量在它来自的 IEnumerable 中不存在?
- javascript - 如何正确模拟 react-native-firebase v6
- javascript - JavaScript 中的函子实现
- java - java如何检查文件夹是否存在于同级子文件夹中?
- c - 如何在 C 中关闭套接字(Linux 上的 GCC)
- docker - 为什么使用时删除“ansible-galaxy”命令中的.ansible目录 - Jenkins docker image
- html - 如何将多个复选框与它们自己的文本在同一行对齐?
- firebase - 该请求在 Firebase 中缺少带有推送通知的身份验证密钥
- hyperledger-fabric - Hyperledger Fabric,javascript fabcar 链码错误。传输:拨号时出错:拨打 tcp 127.0.0.1:7051:连接:连接被拒绝