首页 > 解决方案 > Kafka Schema 注册表和 Spark 连续流集成,异常:类 org.apache.avro.Schema 的循环引用

问题描述

我正在尝试使用 Kafka 作为读取源在使用 Schema 注册表的项目上应用 Spark 连续流。并且我的 Kafka 消息扩展SpecificAvroRecord[它有一个 Schema 字段 (org.apache.avro.Schema)]。

据我了解,spark 不支持自引用。那么,将 Kafka Schema 注册表 [特定记录] 与 Spark 连续流集成的最佳方法是什么?

我的测试代码:

    SparkSession spark = SparkSession.builder().appName("testpro")
        .master("local[2]").getOrCreate();

    Dataset<Row> df = spark.readStream()
        .format("kafka").option("kafka.bootstrap.servers", "192.168.68.1:9092,192.168.204.1:9092")
        .option("subscribe", "testTopic")
        .option("startingOffsets", "latest").load();

    Dataset<Data> messages = df.selectExpr("CAST(value AS STRING)").map(value -> new Data(value.toString()), Encoders.bean(Data.class));

消息类

public class Data extends SpecificAvroRecord {

  private static final long serialVersionUID = 1L;
  private String firstName;
  private String lastName;
  public String name;

  public Data() {}
   ........setters and getters and string constructore

具体Avro记录:

public abstract class SpecificAvroRecord extends org.apache.avro.specific.SpecificRecordBase {
  protected Schema schema;
}

例外 :

Exception in thread "main" java.lang.UnsupportedOperationException: Cannot have circular references in bean class, but got the circular reference of class class org.apache.avro.Schema
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:126)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:136)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$1.apply(JavaTypeInference.scala:134)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:134)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:55)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.javaBean(ExpressionEncoder.scala:86)
    at org.apache.spark.sql.Encoders$.bean(Encoders.scala:142)
    at org.apache.spark.sql.Encoders.bean(Encoders.scala)
    at com.eventumsolutions.nms.spark.services.analyzer.app.contStr.main(contStr.java:50)
    ```

标签: javaapache-sparkapache-kafkaavrospark-structured-streaming

解决方案


推荐阅读