首页 > 解决方案 > 在通过 spark 结构化流写入文件时读取文件

问题描述

我正在为我的应用程序使用 spark 结构化流。我有一个用例,我需要在写入文件时读取文件。我尝试使用 spark 结构化流,如下所示:

sch=StructType([StructField("ID",IntegerType(),True),StructField("COUNTRY",StringType(),True)])
df_str = spark.readStream.format("csv").schema(sch). option("header",True).option("delimiter",','). load("<Load Path>")
query = df_str.writeStream.format("parquet").outputMode("append").trigger(processingTime='10 seconds').option("path","<HDFS location>").option("checkpointLocation","<chckpoint_loc>").start()

但它最初只读取文件,之后该文件没有被增量读取。我正在考虑在临时目录中写入文件并在一段时间后创建新文件并从正在读取的 spark 结构化流作业复制到目录的解决方法,但这会导致延迟。

有没有其他方法来处理这个(我不能使用卡夫卡)?

抱歉,如果这个问题不适用于 Stackoverflow,因为我没有找到任何其他地方可以问这个问题。

标签: apache-sparkspark-structured-streaming

解决方案


不幸的是 Spark 不支持它。文件流源的单位是“文件”。Spark 假定它读取的文件是“不可变的”,这意味着一旦将文件放在源路径中就不应更改它们。这使得偏移管理变得非常简单(不需要跟踪文件偏移),源路径中的文件数量将不断增加。合理的限制,但仍然是一个限制。


推荐阅读