首页 > 解决方案 > 使用 spark 数据框读取 kafka 主题

问题描述

我想在 kafka 主题之上创建数据框,然后我想将该数据框注册为临时表以对数据执行减法操作。我写了下面的代码。但是在查询注册表时出现错误“org.apache.spark.sql.AnalysisException:必须使用 writeStream.start();; 执行带有流源的查询”

org.apache.spark.sql.types.DataType
org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "SERVER ******").option("subscribe", "TOPIC_NAME").option("startingOffsets", "earliest").load()

df.printSchema()

val personStringDF = df.selectExpr("CAST(value AS STRING)")

val user_schema =StructType(Array(StructField("OEM",StringType,true),StructField("IMEI",StringType,true),StructField("CUSTOMER_ID",StringType,true),StructField("REQUEST_SOURCE",StringType,true),StructField("REQUESTER",StringType,true),StructField("REQUEST_TIMESTAMP",StringType,true),StructField("REASON_CODE",StringType,true)))


val personDF = personStringDF.select(from_json(col("value"),user_schema).as("data")).select("data.*")

personDF.registerTempTable("final_df1")

spark.sql("select * from final_df1").show 

错误:----------“org.apache.spark.sql.AnalysisException:必须使用 writeStream.start();; 执行带有流式源的查询;”

我也使用了 start() 方法,但我遇到了错误。

20/08/11 00:59:30 错误流。MicroBatchExecution:查询 final_df1 [id = 1a3e2ea4-2ec1-42f8-a5eb-8a12ce0fb3f5,runId = 7059f3d2-21ec-43c4-b55a-8c735272bf0f] 以错误 java.lang.AbstractMethodError 终止

注意:我编写此脚本的主要目标是我想对此数据编写减号查询,并希望将其与集群中的一个注册表进行比较。所以,总结一下,如果我从 oracle 数据库中发送 1000 条 kafka 主题记录,我将在 oracle 表之上创建数据框,将其注册为临时表,我正在使用 kafka 主题。比我想在源(oracle)和目标(kafka 主题)之间运行减号查询。在源和目标之间执行 100% 的数据验证。(可以将kafka主题注册为临时表吗?)

标签: scalaapache-sparkapache-kafkaapache-spark-sql

解决方案


使用memorysink 而不是 registerTempTable。检查下面的代码。

org.apache.spark.sql.types.DataType
org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types._

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "SERVER ******")
.option("subscribe", "TOPIC_NAME")
.option("startingOffsets", "earliest")
.load()

df.printSchema()

val personStringDF = df.selectExpr("CAST(value AS STRING)")

val user_schema =StructType(Array(StructField("OEM",StringType,true),StructField("IMEI",StringType,true),StructField("CUSTOMER_ID",StringType,true),StructField("REQUEST_SOURCE",StringType,true),StructField("REQUESTER",StringType,true),StructField("REQUEST_TIMESTAMP",StringType,true),StructField("REASON_CODE",StringType,true)))


val personDF = personStringDF.select(from_json(col("value"),user_schema).as("data")).select("data.*")


personDF
.writeStream
.outputMode("append")
.format("memory")
.queryName("final_df1").start()

spark.sql("select * from final_df1").show(10,false)


推荐阅读