apache-spark - 由于数据类型不匹配“SerializeFromObject”,Spark FlatMapGroupsWithStateFunction 抛出无法解析“named_struct()”
问题描述
我在我的火花流应用程序中使用 FlatMapGroupsWithStateFunction。
FlatMapGroupsWithStateFunction<String, Row, SessionInfo, SessionUpdate> idstateUpdateFunction =
new FlatMapGroupsWithStateFunction<String, Row, SessionInfo, SessionUpdate>() {.....}
会话更新类如下;
public static class SessionUpdate implements Serializable {
private static final long serialVersionUID = -3858977319192658483L;
private String instanceId;
private ArrayList<GenericRowWithSchema> milestones = new ArrayList<GenericRowWithSchema>();
private Timestamp processingTimeoutTimestamp;
public SessionUpdate() {
super();
}
public SessionUpdate(String instanceId, ArrayList<GenericRowWithSchema> milestones, Timestamp processingTimeoutTimestamp) {
super();
this.instanceId = instanceId;
this.milestones = milestones;
this.processingTimeoutTimestamp = processingTimeoutTimestamp;
}
public String getInstanceId() {
return instanceId;
}
public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}
public ArrayList<GenericRowWithSchema> getMilestones() {
return milestones;
}
public void setMilestones(ArrayList<GenericRowWithSchema> milestones) {
this.milestones = milestones;
}
public Timestamp getProcessingTimeoutTimestamp() {
return processingTimeoutTimestamp;
}
public void setProcessingTimeoutTimestamp(Timestamp processingTimeoutTimestamp) {
this.processingTimeoutTimestamp = processingTimeoutTimestamp;
}
}
在将以下内容添加到类中后,我得到了如下所述的异常;
private ArrayList<GenericRowWithSchema> milestones = new ArrayList<GenericRowWithSchema>();
例外:
ERROR cannot resolve 'named_struct()' due to data type mismatch: input to function named_struct requires at least one argument;;
'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, true]).getInstanceId, true, false) AS instanceId#62, mapobjects(MapObjects_loopValue2, MapObjects_loopIsNull2, ObjectType(class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema), if (isnull(lambdavariable(MapObjects_loopValue2, MapObjects_loopIsNull2, ObjectType(class org.apache.spark.sql. catalyst.expressions.GenericRowWithSchema), true))) null else named_struct(), assertnotnull(input[0, oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, true]).getMilestones, None) AS 里程碑#63, staticinvoke(类 org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, assertnotnull(input[0, oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, true]).getProcessingTimeoutTimestamp, true, false) AS processingTimeoutTimestamp#64] +- FlatMapGroupsWithState , cast(value#54 as string).toString, createexternalrow(EventTime#23.toString, InstanceID# 24.toString、Model#25.toString、Milestone#26.toString、Region#27.toString、SalesOrganization#28.toString、ProductName#29.toString、ReasonForQuoteReject#30.toString、ReasonforRejectionBy#31.toString、OpportunityAmount#32。 toJavaBigDecimal, Discount#33.toJavaBigDecimal, TotalQuoteAmount#34.toJavaBigDecimal, NetQuoteAmount#35.toJavaBigDecimal, ApprovedDiscount#36.toJavaBigDecimal, TotalOrderAmount#37.toJavaBigDecimal, StructField(EventTime,StringType,true), StructField(InstanceID,StringType,true), StructField(Model,StringType,true), StructField(里程碑,StringType,true), StructField(Region,StringType,true), StructField(SalesOrganization,StringType,true), StructField(ProductName,StringType,true), StructField(ReasonForQuoteReject,StringType,true), StructField(ReasonforRejectionBy,StringType,true), ... 还有 6 个字段), [ value#54], [EventTime#23, InstanceID#24, Model#25, Milestone#26, Region#27, SalesOrganization#28, ProductName#29, ReasonForQuoteReject#30, ReasonforRejectionBy#31, OpportunityAmount#32, Discount#33, TotalQuoteAmount#34,NetQuoteAmount#35,ApprovedDiscount#36,TotalOrderAmount#37],obj#61:oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,类[instanceId[0]:字符串,里程碑[0]:数组>, processingTimeoutTimestamp[0]: 时间戳], Append, false, ProcessingTimeTimeoutStringType,true), StructField(ReasonforRejectionBy,StringType,true), ... 还有 6 个字段), [value#54], [EventTime#23, InstanceID#24, Model#25, Milestone#26, Region#27, SalesOrganization #28,ProductName#29,ReasonForQuoteReject#30,ReasonForRejectionBy#31,OpportunityAmount#32,Discount#33,TotalQuoteAmount#34,NetQuoteAmount#35,ApprovedDiscount#36,TotalOrderAmount#37],obj#61:oracle.insight.spark。 event_processor.EventProcessor$SessionUpdate, 类[instanceId[0]: 字符串, 里程碑[0]: 数组>, processingTimeoutTimestamp[0]: 时间戳], Append, false, ProcessingTimeTimeoutStringType,true), StructField(ReasonforRejectionBy,StringType,true), ... 还有 6 个字段), [value#54], [EventTime#23, InstanceID#24, Model#25, Milestone#26, Region#27, SalesOrganization #28,ProductName#29,ReasonForQuoteReject#30,ReasonForRejectionBy#31,OpportunityAmount#32,Discount#33,TotalQuoteAmount#34,NetQuoteAmount#35,ApprovedDiscount#36,TotalOrderAmount#37],obj#61:oracle.insight.spark。 event_processor.EventProcessor$SessionUpdate, 类[instanceId[0]: 字符串, 里程碑[0]: 数组>, processingTimeoutTimestamp[0]: 时间戳], Append, false, ProcessingTimeTimeoutDiscount#33,TotalQuoteAmount#34,NetQuoteAmount#35,ApprovedDiscount#36,TotalOrderAmount#37],obj#61:oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,类[instanceId[0]:字符串,里程碑[0] : array>, processingTimeoutTimestamp[0]: timestamp], Append, false, ProcessingTimeTimeoutDiscount#33,TotalQuoteAmount#34,NetQuoteAmount#35,ApprovedDiscount#36,TotalOrderAmount#37],obj#61:oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,类[instanceId[0]:字符串,里程碑[0] : array>, processingTimeoutTimestamp[0]: timestamp], Append, false, ProcessingTimeTimeout
任何线索为什么我会得到这个异常?
谢谢。
解决方案
推荐阅读
- drupal - Composer 说在某些情况下需要较低版本的软件包。(德鲁巴)
- javascript - 动态导入组件时vue-router不渲染组件
- java - 更新计数器的 JLabel 文本输出不起作用
- python - python cgi脚本不会使用lxml etree写入方法写入xml文件
- firebase - Firebase Admin SDK 将文件上传到 Google Cloud Storage
- c# - 为什么我尝试 WMI 的 MS 示例 C# 应用程序失败?
- python - 如何从现在是字典的json数据中提取特定值
- c++builder - C++ Builder 不会在断点处停止
- c++ - CMake 找到目标和组件,但无法链接它们
- javascript - 使用 webpack 安装引导程序时出错:模块构建失败(来自 ./node_modules/postcss-loader/dist/cjs.js):