scala - java.lang.ClassCastException:org.apache.avro.generic.GenericData$Record 无法转换为 java.lang.String
问题描述
我正在执行 kafka 消费者程序以从主题中读取 avro 格式的数据。合并通用记录后,我正在遍历通用记录并获得通用record.value()。我想将值转换为字符串但失败。
def getProp():Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, serializer)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffset)
//props.put("specific.avro.reader", specificAvroReader)
props.put("schema.registry.url", schemaRegestry)
props.put("consumer-timeout-ms", "30000")
props
}
def consume(props: Properties, spark: SparkSession) = {
val conSumer = new KafkaConsumer[String, String](props)
conSumer.subscribe(util.Collections.singletonList(topic))
while (true) {
val records: ConsumerRecords[String,String] = conSumer.poll(100)
for(record <- records.asScala){
val m:String = record.value() ```
error:-
20/02/01 05:04:55 INFO internals.ConsumerCoordinator: Setting newly assigned partitions [xx1, xx2, xx3, xx4] for group xxxxxx
Exception in thread "main" java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.String
at com.hbc.IntellicheckConsumer$$anonfun$consume$1.apply(TestScala.scala:52)
at com.hbc.IntellicheckConsumer$$anonfun$consume$1.apply(TestScala.scala:49)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at com.hbc.IntellicheckConsumer.consume(TestScala.scala:49)
at com.hbc.TestScala$.main(TestScala.scala:93)
at com.hbc.TestScala.main(TestScala.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
20/02/01 05:04:55 INFO spark.SparkContext: Invoking stop() from shutdown hook
20/02/01 05:04:55 INFO server.AbstractConnector: Stopped Spark@33aecef7{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
20/02/01 05:04:55 INFO ui.SparkUI: Stopped Spark web UI at http://172.1
解决方案
你告诉你的消费者你想要字符串
new KafkaConsumer[String, String](props)
ConsumerRecords[String,String]
相反,您可能想要
new KafkaConsumer[String, GenericRecord](props)
ConsumerRecords[String,GenericRecord]
有什么方法可以读取火花数据帧中的这些 avro 记录以进行进一步处理
好吧,您必须重写所有代码才能真正使用Spark Structured Streaming
您不仅需要spark-submit
运行 Scala 代码
我收到的值是嵌套 json 的形式
如果您只是将 JSON 字符串放入字段中,不确定为什么要使用 Avro
推荐阅读
- angular - “OperatorFunction”类型上不存在属性“订阅”
' - node.js - 通过 Crontab 自动导出 amCharts-PDF-Export?
- reactjs - 检查 If/Else 块奇怪行为中的空数组
- python-3.x - 导入子模块然后从脚本调用函数
- php - Slim (V3) 框架:为生成的链接添加前缀,但不为传入路由添加前缀
- visual-studio - Unity3D 与 Visual Studio 2019 - 自动刷新
- python - 来自 multipart/form-data 原始套接字的 Python 解码文件
- stored-procedures - 环回节点 mssql:如何设置远程方法以使用存储过程的输出参数?
- google-apps-script - Google Ads 脚本:如何获取 30 天的数据,然后每天循环
- javascript - 在小屏幕尺寸上将引导导航栏置于顶部