首页 > 解决方案 > 如何在火花结构化流式查询(Kafka)之后调用方法?

问题描述

我需要根据从主题收到的值执行一些功能。我目前正在使用 ForeachWriter 将所有主题转换为列表。现在,我想将此列表作为参数传递给方法。

这是我到目前为止所拥有的

def doA(mylist: List[String]) = { //something for A }
def doB(mylist: List[String]) = { //something for B }

Ans 这就是我如何称呼我的流式查询

//{"s":"a","v":"2"}
//{"s":"b","v":"3"}
val readTopics = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "myTopic").load()

val schema = new StructType()
      .add("s",StringType)
      .add("v",StringType)
      
val parseStringDF = readTopics.selectExpr("CAST(value AS STRING)")

val parseDF = parseStringDF.select(from_json(col("value"), schema).as("data"))
   .select("data.*")

parseDF.writeStream
  .format("console")
  .outputMode("append")
  .start()

//fails here
val listOfTopics = parseDF.select("s").map(row => (row.getString(0))).collect.toList

//unable to call the below methods
for (t <- listOfTopics ){
    if(t == "a")
        doA(listOfTopics)
    else if (t == "b")
        doB(listOfTopics)
    else
        println("do nothing")
}

spark.streams.awaitAnyTermination() 

问题:

  1. 如何在流式作业中调用独立(非流式)方法?
  2. 我不能在这里使用 ForeachWriter,因为我想将 SparkSession 传递给方法,并且由于 SparkSession 不可序列化,所以我不能使用 ForeachWriter。并行调用方法 doA 和 doB 的替代方法是什么?

标签: scalaapache-sparkapache-kafkaspark-structured-streamingspark-kafka-integration

解决方案


如果您希望能够将数据收集到本地 Spark 驱动程序/执行程序,则需要使用parseDF.write.foreachBatch,即使用ForEachWriter

目前尚不清楚您在两种方法中需要 SparkSession 的用途,但由于它们正在处理非 Spark 数据类型,因此您可能不应该使用 SparkSession 实例,无论如何

或者,您应该.select()过滤您的主题列,然后将这些函数应用于两个“topic-a”和“topic-b”数据帧,从而并行化工作负载。否则,你最好只使用常规KafkaConsumerfromkafka-clientskafka-streams而不是 Spark


推荐阅读