apache-spark - 在通过 spark 结构化流写入文件时读取文件
问题描述
我正在为我的应用程序使用 spark 结构化流。我有一个用例,我需要在写入文件时读取文件。我尝试使用 spark 结构化流,如下所示:
sch=StructType([StructField("ID",IntegerType(),True),StructField("COUNTRY",StringType(),True)])
df_str = spark.readStream.format("csv").schema(sch). option("header",True).option("delimiter",','). load("<Load Path>")
query = df_str.writeStream.format("parquet").outputMode("append").trigger(processingTime='10 seconds').option("path","<HDFS location>").option("checkpointLocation","<chckpoint_loc>").start()
但它最初只读取文件,之后该文件没有被增量读取。我正在考虑在临时目录中写入文件并在一段时间后创建新文件并从正在读取的 spark 结构化流作业复制到目录的解决方法,但这会导致延迟。
有没有其他方法来处理这个(我不能使用卡夫卡)?
抱歉,如果这个问题不适用于 Stackoverflow,因为我没有找到任何其他地方可以问这个问题。
解决方案
不幸的是 Spark 不支持它。文件流源的单位是“文件”。Spark 假定它读取的文件是“不可变的”,这意味着一旦将文件放在源路径中就不应更改它们。这使得偏移管理变得非常简单(不需要跟踪文件偏移),源路径中的文件数量将不断增加。合理的限制,但仍然是一个限制。
推荐阅读
- c# - 没有 ParameterInjection 或 Constructor Injection 的 GetRequiredService
- rx-java - 使用 retryWhen 时没有成功
- android - 通过意图(或任何远程方法)禁用和启用谷歌播放保护扫描
- r - 在 R 中优化 + if
- npm - 在 ember 项目中将 bower 移动到 npm 时出错
- angular - 所需的 ng-packgr 入口点不存在
- powershell - 逐行阅读 Invoke-WebRequest
- c++ - 该库的用户是否使用静态库的 cpp 中的编译指示?
- batch-file - 将文件或文件夹放在同一个蝙蝠上时备份它
- python - 在google colab中安装CV3包的问题