scala - 无法找到存储在数据集中用于通过 Kafka 流式传输 mongo db 数据的类型的编码器
问题描述
我想尾随 Mongo oplog 并通过 Kafka 流式传输。因此,我发现了 debezium Kafka CDC 连接器,它使用内置序列化技术跟踪 Mongo oplog。
模式注册表使用以下转换器进行序列化,
'key.converter=io.confluent.connect.avro.AvroConverter' and
'value.converter=io.confluent.connect.avro.AvroConverter'
以下是我在项目中使用的库依赖项
libraryDependencies += "io.confluent" % "kafka-avro-serializer" % "3.1.2"
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.10.2.0
下面是反序列化 Avro 数据的流代码
import org.apache.spark.sql.{Dataset, SparkSession}
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import scala.collection.JavaConverters._
object KafkaStream{
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession
.builder
.master("local")
.appName("kafka")
.getOrCreate()
sparkSession.sparkContext.setLogLevel("ERROR")
import sparkSession.implicits._
case class DeserializedFromKafkaRecord(key: String, value: String)
val schemaRegistryURL = "http://127.0.0.1:8081"
val topicName = "productCollection.inventory.Product"
val subjectValueName = topicName + "-value"
//create RestService object
val restService = new RestService(schemaRegistryURL)
//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)
//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)
//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null
//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 20) //remove for prod
.load()
rawTopicMessageDF.printSchema()
//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
row =>
if (keyDeserializer == null) {
keyDeserializer = new KafkaAvroDeserializer
keyDeserializer.configure(props.asJava, true) //isKey = true
}
if (valueDeserializer == null) {
valueDeserializer = new KafkaAvroDeserializer
valueDeserializer.configure(props.asJava, false) //isKey = false
}
//Pass the Avro schema.
val deserializedKeyString = keyDeserializer.deserialize(topicName, row.getAs[Array[Byte]]("key"), keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
val deserializedValueJsonString = valueDeserializer.deserialize(topicName, row.getAs[Array[Byte]]("value"), topicValueAvroSchema).toString
DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueJsonString)
}
val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
Kafka 消费者运行良好我可以看到来自 oplog 的数据拖尾,但是当我在代码上方运行时,我遇到了以下错误,
Error:(70, 59) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val deserializedTopicMessageDS = rawTopicMessageDF.map{
和
Error:(70, 59) not enough arguments for method map: (implicit evidence$7: org.apache.spark.sql.Encoder[DeserializedFromKafkaRecord])org.apache.spark.sql.Dataset[DeserializedFromKafkaRecord].
Unspecified value parameter evidence$7.
val deserializedTopicMessageDS = rawTopicMessageDF.map{
请建议我在这里缺少的东西。
提前致谢。
解决方案
只需DeserializedFromKafkaRecord
在方法之外声明您的案例类main
。
我想当 case 类在里面定义时main
,带有隐式编码器的 Spark 魔法不能正常工作,因为在方法case class
执行之前不存在main
。
这个问题可以用一个更简单的例子来重现(没有 Kafka):
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object SimpleTest {
// declare CaseClass outside of main method
case class CaseClass(value: Int)
def main(args: Array[String]): Unit = {
// when case class is declared here instead
// of outside main, the program does not compile
// case class CaseClass(value: Int)
val sparkSession = SparkSession
.builder
.master("local")
.appName("simpletest")
.getOrCreate()
import sparkSession.implicits._
val df: DataFrame = sparkSession.sparkContext.parallelize(1 to 10).toDF()
val ds: Dataset[CaseClass] = df.map { row =>
CaseClass(row.getInt(0))
}
ds.show()
}
}
推荐阅读
- c# - MainLayout.razor 没有注入 AuthenticationStateProvider
- python - 如何为 seaborn catplot 的每个子图自定义文本刻度标签
- javascript - 如何编写自己的 Math.pow 函数?
- javascript - 如何仅从最后一个请求中接收价值?
- android - 验证 PDF 是否已下载并使用 Espresso 显示给用户
- java - 单击 Java Web 应用程序中的菜单时出现错误“无效的用户会话”
- flutter - 如何使用颤振设计下面的用户界面,如下图所示
- html - 无限循环 ik Keycloack 页面
- python - OpenCV 摄像头流适用于 Anaconda/Spyder,但不适用于 VS Code(“无响应”窗口)。他们有什么建议吗?
- javascript - 反应路由。在不同页面上定位组件