首页 > 解决方案 > trigger.Once() 需要元数据

问题描述

嗨,有经验的人的简单问题。我有一个在路径下读取文件的火花作业。即使源不是真正的流而只是一个包含一堆文件的文件夹,我也想使用结构化流。

我的问题可以为此使用 trigger.Once() 吗?如果是的话,我该如何制作触发器。一旦识别出新文件。

我在笔记本电脑上进行了尝试,第一次运行读取了所有内容,但是当我再次开始工作时,同时写入的文件根本无法识别和处理。

我的方法如下所示:

def executeSql(spark:SparkSession):Unit ={

    val file = "home/hansherrlich/input_event/"

    val df  = spark.readStream.format("json").schema(getStruct).load("home/hansherrlich/some_event/")

    val out =    df.writeStream.trigger(Trigger.Once()).format("json").option("path","home/hansherrlich/some_event_processed/").start()

    out.processAllAvailable()
    out.stop()
    //out.awaitTermination()
    println("done writing")

}

标签: apache-sparkspark-structured-streaming

解决方案


如果从文件中读取,这似乎只有在文件由 Data Bricks 写入 Delta 时才有效。


推荐阅读