java - Kafka Schema 注册表和 Spark 连续流集成,异常:类 org.apache.avro.Schema 的循环引用
问题描述
我正在尝试使用 Kafka 作为读取源在使用 Schema 注册表的项目上应用 Spark 连续流。并且我的 Kafka 消息扩展SpecificAvroRecord
[它有一个 Schema 字段 (org.apache.avro.Schema)]。
据我了解,spark 不支持自引用。那么,将 Kafka Schema 注册表 [特定记录] 与 Spark 连续流集成的最佳方法是什么?
我的测试代码:
SparkSession spark = SparkSession.builder().appName("testpro")
.master("local[2]").getOrCreate();
Dataset<Row> df = spark.readStream()
.format("kafka").option("kafka.bootstrap.servers", "192.168.68.1:9092,192.168.204.1:9092")
.option("subscribe", "testTopic")
.option("startingOffsets", "latest").load();
Dataset<Data> messages = df.selectExpr("CAST(value AS STRING)").map(value -> new Data(value.toString()), Encoders.bean(Data.class));
消息类
public class Data extends SpecificAvroRecord {
private static final long serialVersionUID = 1L;
private String firstName;
private String lastName;
public String name;
public Data() {}
........setters and getters and string constructore
具体Avro记录:
public abstract class SpecificAvroRecord extends org.apache.avro.specific.SpecificRecordBase {
protected Schema schema;
}
例外 :
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot have circular references in bean class, but got the circular reference of class class org.apache.avro.Schema
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:126)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:86)
at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
at org.apache.spark.sql.Encoders.bean(Encoders.scala)
at com.eventumsolutions.nms.spark.services.analyzer.app.contStr.main(contStr.java:50)
```
解决方案
推荐阅读
- java - 如何确定各种 Java 8 堆内存名称?
- jquery - 在jQuery中获取TD单元格的值
- php - 在具有特定类的 div 中获取 preg_match_all 锚文本
- python - 在 Pandas 中创建淘汰赛比赛
- python-3.x - 如何捕捉错误(Smartsheet API Python SDK)
- python-3.x - 在使用多个输入层训练的 keras 模型中,如何在预测时忽略某些输入层?
- java - Android studio gradle .pom 文件下载错误
- c# - C#删除错误文件
- reactjs - 如何更改 React Recharts 库 X 轴/Y 轴刻度的字体系列?
- typescript - 材料日期选择器总是导致时刻错误