首页 > 解决方案 > Apache Spark:对流数据集进行连接操作后更新输出模式

问题描述

我正在尝试编写一个代码,它首先进行连接,然后进行聚合(groupby 和 count)。

我希望我的聚合阶段的输出是可更新的。下面是我使用的代码:

    val spark = SparkSession.builder().master("local").getOrCreate()

    import spark.implicits._


    val df = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "testerIn")
      .load().selectExpr("CAST(value AS STRING)").as[String]


    val interimDF = df.join(df,"value")

    val newDF = interimDF.groupBy("value").count().toJSON

    newDF.writeStream.format("kafka").outputMode("update") .option("kafka.bootstrap.servers", "localhost:9092") . option("checkpointLocation","/path/to/directory")
      .option("topic", "tester").start()

    spark.streams.awaitAnyTermination()

此代码引发错误,因为 spark 中的流-流连接不支持更新模式:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported in Update output mode, only in Append output mode;;

现在,我完全理解了为什么 spark 会抛出这个错误,因为当我们加入时;更新模式几乎没有任何意义(因为只要输入中有任何新行,我们就会在输出中获得一个新行,因此追加)。

如果我在加入(interimDF)之后以追加模式将数据帧输出到 Kafka,然后从中读取并执行我的聚合阶段(newDF)并以更新模式将其写回其他流中,我的问题将得到解决。这正是我想做的,但我想避免中间写到 Kafka 阶段。有什么办法可能吗?我也愿意接受 hack-ish 解决方案或指向某人可能针对类似内容提出的拉取请求的链接。

标签: apache-sparkjoingroup-byapache-spark-sqlspark-streaming

解决方案


推荐阅读