首页 > 解决方案 > Spark Structured Streaming:将流与应在每个微批处理中读取的数据连接起来

问题描述

我有一个来自 HDFS 的流,我需要将它与我也在 HDFS 中的元数据(两个 Parquets)加入。

我的元数据有时会更新,我需要加入新的和最新的,这意味着理想情况下从 HDFS 读取每个流微批次的元数据。

我试图对此进行测试,但不幸的是,即使我尝试使用spark.sql.parquet.cacheMetadata=false.

有没有办法读取每个微批次?Foreach Writer 不是我要找的?

下面是代码示例:

spark.sql("SET spark.sql.streaming.schemaInference=true")

spark.sql("SET spark.sql.parquet.cacheMetadata=false")

val stream = spark.readStream.parquet("/tmp/streaming/")

val metadata = spark.read.parquet("/tmp/metadata/")

val joinedStream = stream.join(metadata, Seq("id"))

joinedStream.writeStream.option("checkpointLocation", "/tmp/streaming-test/checkpoint").format("console").start()



/tmp/metadata/ got updated with spark append mode.

据我了解,通过 JDBC jdbc 源和 spark 结构化流访问元数据,Spark 将查询每个微批处理。

标签: apache-sparkapache-spark-sqlspark-streaming

解决方案


据我发现,有两种选择:

  1. 创建临时视图并使用间隔刷新它:

    metadata.createOrReplaceTempView("元数据")

并在单独的线程中触发刷新:

spark.catalog.refreshTable("metadata")

注意:在这种情况下,spark 将只读取相同的路径,如果您需要从 HDFS 上的不同文件夹读取元数据,例如时间戳等,则它不起作用。

  1. 按照Tathagata Das 建议的间隔重新启动流

这种方式不适合我,因为我的元数据可能每小时刷新几次。


推荐阅读