首页 > 解决方案 > 火花结构化 Delta 流的情况下的下推滤波器

问题描述

我有一个用例,我们需要将开源增量表流式传输到多个查询中,并在其中一个分区列上进行过滤。例如,。给定按年份列分区的增量表。

Streaming query 1
spark.readStream.format("delta").load("/tmp/delta-table/").
where("year= 2013")

Streaming query 2
spark.readStream.format("delta").load("/tmp/delta-table/").
where("year= 2014")

物理计划在流式传输后显示过滤器。

> == Physical Plan == Filter (isnotnull(year#431) AND (year#431 = 2013))
> +- StreamingRelation delta, []

我的问题是下推谓词是否适用于 Delta 中的流式查询?我们可以只从 Delta 流式传输特定分区吗?

标签: apache-sparkdelta-lake

解决方案


如果列已分区,则仅扫描所需的分区。

让我们创建分区和非分区增量表并执行结构化流。

分区增量表流式传输:

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
    
//sample dataframe
val df = Seq((1,2020),(2,2021),(3,2020),(4,2020),
(5,2020),(6,2020),(7,2019),(8,2019),(9,2018),(10,2020)).toDF("id","year")
    
//partionBy year column and save as delta table
df.write.format("delta").partitionBy("year").save("delta-stream")
    
//streaming delta table
spark.readStream.format("delta").load("delta-stream")
.where('year===2020)
.writeStream.format("console").start().awaitTermination()

上述流式查询的物理计划:注意 partitionFilters

在此处输入图像描述

非分区增量表流式传输:

df.write.format("delta").save("delta-stream")

spark.readStream.format("delta").load("delta-stream")
    .where('year===2020)
    .writeStream.format("console").start().awaitTermination()

上述流式查询的物理计划:注意 pushFilters

在此处输入图像描述


推荐阅读