apache-spark - Spark Structured Streaming:将流与应在每个微批处理中读取的数据连接起来
问题描述
我有一个来自 HDFS 的流,我需要将它与我也在 HDFS 中的元数据(两个 Parquets)加入。
我的元数据有时会更新,我需要加入新的和最新的,这意味着理想情况下从 HDFS 读取每个流微批次的元数据。
我试图对此进行测试,但不幸的是,即使我尝试使用spark.sql.parquet.cacheMetadata=false
.
有没有办法读取每个微批次?Foreach Writer 不是我要找的?
下面是代码示例:
spark.sql("SET spark.sql.streaming.schemaInference=true")
spark.sql("SET spark.sql.parquet.cacheMetadata=false")
val stream = spark.readStream.parquet("/tmp/streaming/")
val metadata = spark.read.parquet("/tmp/metadata/")
val joinedStream = stream.join(metadata, Seq("id"))
joinedStream.writeStream.option("checkpointLocation", "/tmp/streaming-test/checkpoint").format("console").start()
/tmp/metadata/ got updated with spark append mode.
据我了解,通过 JDBC jdbc 源和 spark 结构化流访问元数据,Spark 将查询每个微批处理。
解决方案
据我发现,有两种选择:
创建临时视图并使用间隔刷新它:
metadata.createOrReplaceTempView("元数据")
并在单独的线程中触发刷新:
spark.catalog.refreshTable("metadata")
注意:在这种情况下,spark 将只读取相同的路径,如果您需要从 HDFS 上的不同文件夹读取元数据,例如时间戳等,则它不起作用。
- 按照Tathagata Das 建议的间隔重新启动流
这种方式不适合我,因为我的元数据可能每小时刷新几次。
推荐阅读
- php - 使用虚拟主机的 Laravel API 返回 404
- amazon-web-services - 如何修改与我雇用的公司集群分离的 kubernetes 配置文件以添加第二个集群?
- angular - 如何在 URL 参数中存储 UI 详细信息?
- php - 需要反序列化我的实体,但它们位于专用的 API 项目中
- jdbc - 通过 jdbc 驱动查询时值不正确。为什么?
- c++ - 如何使用 cv::imshow 显示 2 通道图像?
- python - 来自 Python 的 SOAP 请求
- python - 使用 AWS Lambda 进行 Python 依赖管理
- datetime - 在带有日期的时间间隔之外生成文件
- java - 如何在 Talend 中捕获组件异常?