scala - 如何在与 Spark 相同的查询结果中使用聚合和连接?
问题描述
我需要加入以使用 postgres 数据丰富我的数据框。在 Spark Streaming 中我可以正常执行,因为数据是批量处理的。但是,在结构化流中,每当我尝试将聚合与连接一起使用时,都会出现错误。
例如:如果我在输出模式完成的情况下使用聚合,则作业正常工作,但是,如果我添加连接,则会返回错误:
Join between two streaming DataFrames/Datasets is not supported in Complete output mode, only in Append output mode;
如果我反其道而行之,也会发生同样的情况。当我使用输出模式附加连接时,作业正常运行,但是,如果我添加聚合,作业将返回错误:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
最后,我想知道是否有任何方法可以将连接和聚合一起使用而不会使用火花结构化流式传输出现错误。
如果是这样,一个不会产生那种错误的实现是什么样的?
def main(args: Array[String]): Unit = {
val ss: SparkSession = Spark.getSparkSession
val postgresSQL = new PostgresConnection
val dataCollector = new DataCollector(postgresSQL)
val collector = new Collector(ss,dataCollector)
import ss.implicits._
val stream: DataFrame = Kafka.setStructuredStream(ss)
val parsed: DataFrame = Stream.parseInputMessages(stream)
val getRelation: DataFrame = collector.getLastRelation(parsed)
getRelation
.writeStream.format("console")
.trigger(Trigger.ProcessingTime(5000))
.outputMode("complete")
.queryName("Join")
.start()
ss.streams.awaitAnyTermination()
}
在我的 getLastRelation 方法中,我调用了 convertData 方法和 compareData 方法。
def getLastRelation(messageToProcess: DataFrame): DataFrame = {
// Faz tratamentos no DF para preparar a busca
val dss: Dataset[Message] = this.convertData(messageToProcess)
val dsRelacaolista: Dataset[WithStructure] = this.getPersonStructure(ds)
val compareData = this.compareData(dsRelacaolista,messageToProcess)
compareData
}
在我的 convertData 方法中,我使用了一个 agg。
def convertData(data: DataFrame): Dataset[Message] = {
data.selectExpr("country","code","order")
.groupBy($"country",$"order")
.agg(collect_list("code")
.as("code"))
.as[Message]
}
在我的 compareData 方法中,我使用了 join:
def compareData(data: Dataset[WithStructure], message: DataFrame): DataFrame = {
val tableJoin = message.selectExpr("order","order_id","hashCompare","created_at")
data.toDF()
.withColumn("hashCompare",hash($"country",$"code"))
.join(tableJoin,"hashCompare")
}
注意:我使用 scala 作为一种语言(我不知道这些信息是否对这个问题很重要)
解决方案
如果您正在对流进行聚合查询,则需要指定水印和窗口。
例如:
data
.withWatermark("created_at", "10 minutes")
.selectExpr("country","code","order")
.groupBy(window($"created_at", "10 minutes", "5 minutes"), $"country",$"order")
.agg(collect_list("code")
.as("code"))
.as[Message]
与您的流一起到达的数据可能出于任何原因延迟(由于网络速度变慢等)。水印允许指定聚合应等待滞后事件多长时间。所有延迟高于水印中指定时间的事件都将被忽略。
追加模式不允许修改以前输出的结果。因此,它需要水印来确保聚合数据不会进一步更新。
您可以为水印选择更长的窗口,这将为您处理延迟数据提供更高的容忍度。缺点是上游将被水印持续时间延迟,因为查询必须等待水印中指定的时间通过才能完成聚合。
此外,对于流-流连接(当连接的双方都是流数据集时),您还需要指定 window.
来自文档:
在两个数据流之间生成连接结果的挑战在于,在任何时间点,连接两侧的数据集视图都是不完整的,这使得在输入之间找到匹配变得更加困难。
请查看有关进行流式连接和进行流式聚合的文档。
推荐阅读
- html - 如何使用 CSS 选择器从类中提取子字符串?
- rest - 解析模板时出错 - url 映射失败
- c - 检查两个整数数组是否具有相同的元素,而不管它们的顺序如何
- c# - 使用 EF Core 2.2 在 SQL 中为地理数据类型添加索引
- ruby-on-rails - Rails 用户部分作为变量
- php - php - Mysql Query 工作除了一个过滤器
- c# - 关于加速硬盘备份代码的建议
- java - 在 Java 中 String hashCode() 的旧 impl 中跳过字符背后的想法是什么
- python - 如何运行托管在 a2hosting 上的 Flask 应用程序?
- matlab - 如何优化此字符串替换代码