scala - 使用 spark 数据框读取 kafka 主题
问题描述
我想在 kafka 主题之上创建数据框,然后我想将该数据框注册为临时表以对数据执行减法操作。我写了下面的代码。但是在查询注册表时出现错误“org.apache.spark.sql.AnalysisException:必须使用 writeStream.start();; 执行带有流源的查询”
org.apache.spark.sql.types.DataType
org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "SERVER ******").option("subscribe", "TOPIC_NAME").option("startingOffsets", "earliest").load()
df.printSchema()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val user_schema =StructType(Array(StructField("OEM",StringType,true),StructField("IMEI",StringType,true),StructField("CUSTOMER_ID",StringType,true),StructField("REQUEST_SOURCE",StringType,true),StructField("REQUESTER",StringType,true),StructField("REQUEST_TIMESTAMP",StringType,true),StructField("REASON_CODE",StringType,true)))
val personDF = personStringDF.select(from_json(col("value"),user_schema).as("data")).select("data.*")
personDF.registerTempTable("final_df1")
spark.sql("select * from final_df1").show
错误:----------“org.apache.spark.sql.AnalysisException:必须使用 writeStream.start();; 执行带有流式源的查询;”
我也使用了 start() 方法,但我遇到了错误。
20/08/11 00:59:30 错误流。MicroBatchExecution:查询 final_df1 [id = 1a3e2ea4-2ec1-42f8-a5eb-8a12ce0fb3f5,runId = 7059f3d2-21ec-43c4-b55a-8c735272bf0f] 以错误 java.lang.AbstractMethodError 终止
注意:我编写此脚本的主要目标是我想对此数据编写减号查询,并希望将其与集群中的一个注册表进行比较。所以,总结一下,如果我从 oracle 数据库中发送 1000 条 kafka 主题记录,我将在 oracle 表之上创建数据框,将其注册为临时表,我正在使用 kafka 主题。比我想在源(oracle)和目标(kafka 主题)之间运行减号查询。在源和目标之间执行 100% 的数据验证。(可以将kafka主题注册为临时表吗?)
解决方案
使用memory
sink 而不是 registerTempTable。检查下面的代码。
org.apache.spark.sql.types.DataType
org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "SERVER ******")
.option("subscribe", "TOPIC_NAME")
.option("startingOffsets", "earliest")
.load()
df.printSchema()
val personStringDF = df.selectExpr("CAST(value AS STRING)")
val user_schema =StructType(Array(StructField("OEM",StringType,true),StructField("IMEI",StringType,true),StructField("CUSTOMER_ID",StringType,true),StructField("REQUEST_SOURCE",StringType,true),StructField("REQUESTER",StringType,true),StructField("REQUEST_TIMESTAMP",StringType,true),StructField("REASON_CODE",StringType,true)))
val personDF = personStringDF.select(from_json(col("value"),user_schema).as("data")).select("data.*")
personDF
.writeStream
.outputMode("append")
.format("memory")
.queryName("final_df1").start()
spark.sql("select * from final_df1").show(10,false)
推荐阅读
- javascript - Chrome有时将元素作为节点有时作为标记
- clr - 如何在此 winim 库中使用 ProcessStartInfo 实例化应用程序
- java - Android Kotlin 语句到 Java 转换
- google-sheets - 谷歌表格折线图:连接缺失的点
- android - 我们如何从类中调用与 kotlin 中的随机数类似的函数:
- javascript - 如何使用 javascript 中的 onclick 函数将链接更改为活动状态
- android - 如何通过intent在android中打开Google Account Info
- google-chrome - 如何防止 Chrome 扩展在浏览器重启时重置选项?
- android - 为检测到的人脸添加过滤器
- python - 在 keras 中加载测试图像