首页 > 解决方案 > 如何在与 Spark 相同的查询结果中使用聚合和连接?

问题描述

我需要加入以使用 postgres 数据丰富我的数据框。在 Spark Streaming 中我可以正常执行,因为数据是批量处理的。但是,在结构化流中,每当我尝试将聚合与连接一起使用时,都会出现错误。

例如:如果我在输出模式完成的情况下使用聚合,则作业正常工作,但是,如果我添加连接,则会返回错误:

Join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode;

如果我反其道而行之,也会发生同样的情况。当我使用输出模式附加连接时,作业正常运行,但是,如果我添加聚合,作业将返回错误:

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;

最后,我想知道是否有任何方法可以将连接和聚合一起使用而不会使用火花结构化流式传输出现错误。

如果是这样,一个不会产生那种错误的实现是什么样的?

def main(args: Array[String]): Unit = {
  val ss: SparkSession = Spark.getSparkSession
  val postgresSQL = new PostgresConnection
  val dataCollector = new DataCollector(postgresSQL)
  val collector = new Collector(ss,dataCollector)
  
  import ss.implicits._
  
  val stream: DataFrame = Kafka.setStructuredStream(ss)

  val parsed: DataFrame = Stream.parseInputMessages(stream)

  val getRelation: DataFrame = collector.getLastRelation(parsed)

    getRelation
    .writeStream.format("console")
    .trigger(Trigger.ProcessingTime(5000))
      .outputMode("complete")
      .queryName("Join")
      .start()

  ss.streams.awaitAnyTermination()
}

在我的 getLastRelation 方法中,我调用了 convertData 方法和 compareData 方法。

    def getLastRelation(messageToProcess: DataFrame): DataFrame = {
    // Faz tratamentos no DF para preparar a busca
    val dss: Dataset[Message] = this.convertData(messageToProcess)
    val dsRelacaolista: Dataset[WithStructure] = this.getPersonStructure(ds)
    val compareData = this.compareData(dsRelacaolista,messageToProcess)
    compareData
}

在我的 convertData 方法中,我使用了一个 agg。

    def convertData(data: DataFrame): Dataset[Message] = {
    data.selectExpr("country","code","order")
      .groupBy($"country",$"order")
      .agg(collect_list("code")
        .as("code"))
      .as[Message]
  }

在我的 compareData 方法中,我使用了 join:

def compareData(data: Dataset[WithStructure], message: DataFrame): DataFrame = {
    val tableJoin = message.selectExpr("order","order_id","hashCompare","created_at")

    data.toDF()
      .withColumn("hashCompare",hash($"country",$"code"))
      .join(tableJoin,"hashCompare")
  }

注意:我使用 scala 作为一种语言(我不知道这些信息是否对这个问题很重要)

标签: scalaapache-sparkspark-structured-streaming

解决方案


如果您正在对流进行聚合查询,则需要指定水印和窗口。

例如:

 data
     .withWatermark("created_at", "10 minutes")
     .selectExpr("country","code","order")
     .groupBy(window($"created_at", "10 minutes", "5 minutes"), $"country",$"order")
     .agg(collect_list("code")
     .as("code"))
     .as[Message]

与您的流一起到达的数据可能出于任何原因延迟(由于网络速度变慢等)。水印允许指定聚合应等待滞后事件多长时间。所有延迟高于水印中指定时间的事件都将被忽略。

追加模式不允许修改以前输出的结果。因此,它需要水印来确保聚合数据不会进一步更新。

您可以为水印选择更长的窗口,这将为您处理延迟数据提供更高的容忍度。缺点是上游将被水印持续时间延迟,因为查询必须等待水印中指定的时间通过才能完成聚合。

此外,对于流-流连接(当连接的双方都是流数据集时),您还需要指定 window.

来自文档:

在两个数据流之间生成连接结果的挑战在于,在任何时间点,连接两侧的数据集视图都是不完整的,这使得在输入之间找到匹配变得更加困难。

请查看有关进行流式连接进行流式聚合的文档。


推荐阅读