首页 > 解决方案 > 从 HDFS 源流式传输时如何运行多个批次?

问题描述

我有一个这样的数据集val df = spark.readStream.schema(s).parquet ("/path/to/file").where("Foo > 0").groupBy("bar").agg(expr("sum(Foo)"))。数据集有超过 100 万条记录,Parquet 文件包含 1 个分区。

我开始流df.writeStream.outputMode("update").format("console").start

然后 Spark 一次处理整个文件。但是我希望 Spark 在更新结果时会如何“拆分”文件并一次处理每个拆分,就像我输入新单词时的单词计数示例更新结果一样。

我尝试添加trigger(Trigger.ProcessingTime("x seconds")),但没有奏效。

标签: apache-sparkspark-structured-streaming

解决方案


然后 Spark 一次处理整个文件。但是我希望 Spark 在更新结果时会如何“拆分”文件并一次处理每个拆分,就像我输入新单词时的单词计数示例更新结果一样。

这就是 Spark 结构化流处理文件的方式。它立即处理它们,不再考虑它们。它确实将文件“拆分”为多个部分(嗯,这应该在存储的手中,例如 HDFS,而不是 Spark 本身),但它是在幕后进行的。

请注意,一旦文件被处理,该文件将永远不会被再次处理。

我尝试添加trigger(Trigger.ProcessingTime("x seconds")),但没有奏效。

嗯,确实如此,但不是你想要的。

DataStreamWriter.trigger设置流查询的触发器。默认值为 ProcessingTime(0),它将尽可能快地运行查询。

请参阅DataStreamWriter的 scaladoc 。


推荐阅读