apache-spark - 如何增量加载,适应新数据,使用 spark 保存管道模型?
问题描述
任何增量训练和构建模型的指针,并获得对单个元素的预测。
尝试运行 Web 应用程序会将数据写入共享路径中的 csv,而 ml 应用程序将读取数据并加载模型,尝试拟合数据并保存模型,转换测试数据。(这应该在循环中发生)
但是当第二次加载保存的模型时,面临以下异常,(我正在使用 minmax scaler 来规范化数据)
线程“main”java.lang.IllegalArgumentException 中的异常:输出列 features_intermediate 已存在。
任何指点将不胜感激,谢谢
object RunAppPooling {
def main(args: Array[String]): Unit = { // start the spark session
val conf = new SparkConf().setMaster("local[2]").set("deploy-mode", "client").set("spark.driver.bindAddress", "127.0.0.1")
.set("spark.broadcast.compress", "false")
.setAppName("local-spark")
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
val filePath = "src/main/resources/train.csv"
val modelPath = "file:///home/vagrant/custom.model"
val schema = StructType(
Array(
StructField("IDLE_COUNT", IntegerType),
StructField("TIMEOUTS", IntegerType),
StructField("ACTIVE_COUNT", IntegerType),
StructField("FACTOR_LOAD", DoubleType)))
while(true){
// read the raw data
val df_raw = spark
.read
.option("header", "true")
.schema(schema)
.csv(filePath)
df_raw.show()
println(df_raw.count())
// fill all na values with 0
val df = df_raw.na.fill(0)
df.printSchema()
// create the feature vector
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("IDLE_COUNT", "TIMEOUTS", "ACTIVE_COUNT" ))
.setOutputCol("features_intermediate")
var lr1: PipelineModel = null
try {
lr1 = PipelineModel.load(modelPath)
} catch {
case ie: InvalidInputException => println(ie.getMessage)
}
import org.apache.spark.ml.feature.StandardScaler
val scaler = new StandardScaler().setWithMean(true).setWithStd(true).setInputCol("features_intermediate").setOutputCol("features")
var pipeline: Pipeline = null
if (lr1 == null) {
val lr =
new LinearRegression()
.setMaxIter(100)
.setRegParam(0.1)
.setElasticNetParam(0.8)
.setLabelCol("FACTOR_LOAD") // setting label column
// create the pipeline with the steps
pipeline = new Pipeline().setStages(Array( vectorAssembler, scaler, lr))
} else {
pipeline = new Pipeline().setStages(Array(vectorAssembler, scaler, lr1))
}
// create the model following the pipeline steps
val cvModel = pipeline.fit(df)
// save the model
cvModel.write.overwrite.save(modelPath)
var testschema = StructType(
Array(
StructField("PACKAGE_KEY", StringType),
StructField("IDLE_COUNT", IntegerType),
StructField("TIMEOUTS", IntegerType),
StructField("ACTIVE_COUNT", IntegerType)
))
val df_raw1 = spark
.read
.option("header", "true")
.schema(testschema)
.csv("src/main/resources/test_pooling.csv")
// fill all na values with 0
val df1 = df_raw1.na.fill(0)
val extracted = cvModel.transform(df1) //.toDF("prediction")
import org.apache.spark.sql.functions._
val test = extracted.select(mean(df("FACTOR_LOAD"))).collect()
println(test.apply(0))
}
}
}
解决方案
我想出了一种方法,至少可以摆脱异常,不确定它是否正确。加载模型后创建管道时,将阶段设置为模型,因为模型已经定义了各自的架构。不确定这是否会使新数据正常化。
pipeline = new Pipeline().setStages(Array( lr1))
推荐阅读
- javascript - ForEach 不工作,但 For 工作正常
- selenium - 为什么我的自动化测试用例浏览器会自动关闭?
- symfony - 如何将变量共享给树枝中的所有视图(包括行为)?
- java - 什么导致网络错误IOException:连接超时:连接?
- angular - 无法为 Ionic4 和 Angular fire 添加 firebase 支持
- powershell - 验证是否制作时无法正常工作?
- java - ClassNotFoundException:用于 SpringBoot 2 功能区启动器的 com.netflix.config.CachedDynamicIntProperty
- c - 我可以强制动态库链接到特定的动态库依赖项吗?
- javascript - 创建中间件全局与本地变量
- html - 将值从 ngFor 内部的元素传递给 Service