java - 原因:java.lang.RuntimeException:java.lang.String 不是日期模式的有效外部类型
问题描述
我正在尝试使用Spark SQL将一些 Java 对象数据插入MySQL 。下面是我的简化代码:
import java.io.Serializable;
import java.sql.Date;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Model implements Serializable {
private String state;
private String title;
private String exportCntry;
private String filepath;
private String unit;
private String frequency;
private String Seasonal_adjust;
private Date Last_updated;
private static StructType structType = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("state", DataTypes.StringType, false),
DataTypes.createStructField("title", DataTypes.StringType, false),
DataTypes.createStructField("exportCntry", DataTypes.StringType, false),
DataTypes.createStructField("filepath", DataTypes.StringType, false),
DataTypes.createStructField("unit", DataTypes.StringType, false),
DataTypes.createStructField("frequency", DataTypes.StringType, false),
DataTypes.createStructField("Seasonal_adjust", DataTypes.StringType, false),
DataTypes.createStructField("Last_updated", DataTypes.DateType, false)
});
public static StructType getStructType() {
return structType;
}
public static void setStructType(StructType structType) {
Model.structType = structType;
}
public Object[] getAllValues() {
return new Object[] { state, title, exportCntry, filepath, unit, frequency, unit,
frequency, Seasonal_adjust, Last_updated
};
}
}
以下代码是将上述java 对象插入MySQL的Spark SQL 代码。Model
import java.sql.Date;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Queue;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class Processor {
private static List<String> testData = Arrays.asList(
"Alabama,ValueofExports,Israel,A/L/I/ALISRA052SCEN.csv,$,A,NSA,2018-06-26",
"Alaska,ValueofExports,Israel,A/K/AKISRA052SCEN.csv,$,A,NSA,2018-06-26",
"Arizona,ValueofExports,Israel,A/Z/I/AZISRA052SCEN.csv,$,A,NSA,2018-06-26",
"Arkansas,ValueofExports,Israel,A/R/I/ARISRA052SCEN.csv,$,A,NSA,2018-06-26",
"California,ValueofExports,Israel,C/A/I/CAISRA052SCEN.csv,$,A,NSA,2018-06-26",
"Colorado,ValueofExports,Israel,C/O/I/COISRA052SCEN.csv,$,A,NSA,2018-06-26",
"Connecticut,ValueofExports,Israel,C/T/I/CTISRA052SCEN.csv,$,A,NSA,2018-06-26",
"Delaware,ValueofExports,Israel,D/E/I/DEISRA052SCEN.csv,$,A,NSA,2018-06-26"
);
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Properties jdbcProps = new Properties();
jdbcProps.put("user", "root");
jdbcProps.put("password", "password");
SparkSession spark = SparkSession.builder().master("local[*]").appName("Spark Kafka Test").getOrCreate();
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.milliseconds(1000));
JavaRDD<String> rddTest = jsc.parallelize(testData);
Queue<JavaRDD<String>> queue = new LinkedList<JavaRDD<String>>();
queue.add(rddTest);
JavaDStream<String> jds = jssc.queueStream( queue );
JavaDStream<Model> jdm = jds.map(str -> {
String[] parameters = new String[str.split(",").length];
parameters = str.split(",");
Date date = Date.valueOf(parameters[7]);
Model data = new Model(parameters[0], parameters[1], parameters[2], parameters[3],
parameters[4], parameters[5], parameters[6], date);
return data;
});
jdm.map(e -> {
Row row = RowFactory.create(e.getAllValues());
return row;
}).foreachRDD(rdd -> {
Dataset<Row> df = spark.createDataFrame(rdd, Model.getStructType());
df.printSchema();
df.write()
.mode(SaveMode.Append)
.jdbc("jdbc:mysql://localhost:3306/test?characterEncoding=utf8&serverTimezone=Asia/Seoul",
"test", jdbcProps);
});
jssc.start();
jssc.awaitTermination();
jssc.close();
}
}
但是这个错误是从Spark SQLinsert
语句中抛出的。
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of date
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_7$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
似乎该异常与java.sql.Date
对象 API 有关。Spark SQLDataframe.write
方法不起作用。我错过了一些过程吗?
解决方案
推荐阅读
- bash - 在另一个 bash 命令中使用多个 bash 变量
- html - 在 IE11 中,当单击其子元素之一时,焦点事件未在具有显示 flex 的可聚焦父 HTML 元素上触发
- java - 无论使用文字还是新创建的所有字符串的哈希值都是相同的
- flutter - 如何在颤动中制作具有边框半径的文本字段???
- arrays - 如何在 Swift 中合并两个排序数组?
- vba - VBA图表X轴有重复值,不与数据对齐
- primefaces - 如何知道(在支持 bean 中)单击了哪个 p:commandButton?
- python-3.x - Twitter API 仅应用程序身份验证 (Python3)
- python - 输入后脚本停止?
- angularjs - 如何在使用AngularJS动态输入文本框时显示最小值和最大值