java - 将自定义 Java 对象列表转换为数据集会在 Spark 中产生 stackoverflow 异常
问题描述
我正在尝试将自定义 java 对象列表转换为自定义类型数据集。
JavaInputDStream<ConsumerRecord<String, String>> telemetryStream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams)
);
JavaDStream<String> telemetryValueStream = telemetryStream.map(consumerRecord -> consumerRecord.value());
telemetryValueStream.foreachRDD(eachPartition -> {
if(!eachPartition.isEmpty()){
List<Tuple> tupleList = new ArrayList<>();
eachPartition.foreach(record -> {
ObjectMapper om = new ObjectMapper();
JsonNode dataNode = om.readTree(record);
Tuple tupled = new Tuple();
tupled.setKey(dataNode.at("/Telemetry/encoding_path").asText());
tupled.setValue(dataNode);
tupleList.add(tupled);
});
Encoder<Tuple> encoder = Encoders.bean(Tuple.class);
Dataset<Tuple> tupledDS = JavaSparkSessionSingleton.getInstance(eachPartition.context().getConf()).createDataset(tupleList,encoder);
tupledDS.printSchema();
}
});
这会导致 StackoverflowException
Exception in thread "streaming-job-executor-0" java.lang.StackOverflowError
at sun.reflect.generics.reflectiveObjects.TypeVariableImpl.equals(TypeVariableImpl.java:189)
at org.spark_project.guava.collect.SingletonImmutableBiMap.get(SingletonImmutableBiMap.java:53)
at org.spark_project.guava.reflect.TypeResolver.resolveTypeVariable(TypeResolver.java:207)
at org.spark_project.guava.reflect.TypeResolver.resolveTypeVariable(TypeResolver.java:197)
at org.spark_project.guava.reflect.TypeResolver.resolveType(TypeResolver.java:157)
at org.spark_project.guava.reflect.TypeResolver.resolveParameterizedType(TypeResolver.java:229)
at org.spark_project.guava.reflect.TypeResolver.resolveType(TypeResolver.java:159)
at org.spark_project.guava.reflect.TypeToken.resolveType(TypeToken.java:268)
at org.spark_project.guava.reflect.TypeToken.resolveSupertype(TypeToken.java:279)
at org.spark_project.guava.reflect.TypeToken.getSupertype(TypeToken.java:401)
at org.apache.spark.sql.catalyst.JavaTypeInference$.elementType(JavaTypeInference.scala:157)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:112)
这是供您参考的bean类
public class Tuple implements Serializable {
private String key;
private JsonNode value;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public JsonNode getValue() {
return value;
}
public void setValue(JsonNode value) {
this.value = value;
}
}
解决方案
推荐阅读
- python-3.x - Internet Explorer中的python selenium windows身份验证
- javascript - 仅发布 JavaScript 文件的缩小版本 (ASPNET Core)
- c# - Dotnet Core 2.2 - Azure AD 身份验证工作但角色库返回访问被拒绝
- angular - 带有动作的 Angular6 表单和对 SERVLET 的 POST 调用不起作用
- r - 仅在使用 dplyr 的另一个值之后查找一个值的第一次出现
- java - 未处理的异常:java.text.ParseException - 将字符串转换为时间戳
- blockchain - 如何在 RIDE 中从 tx.sender 获取字符串地址?
- python - 获取带有空格的 pandas df 列的 len()
- javascript - 无法将下一个 js 部署到 azure
- ios - 如何在 UIView 中定位和动画子视图?