apache-spark - 从 HDFS 源流式传输时如何运行多个批次?
问题描述
我有一个这样的数据集val df = spark.readStream.schema(s).parquet ("/path/to/file").where("Foo > 0").groupBy("bar").agg(expr("sum(Foo)"))
。数据集有超过 100 万条记录,Parquet 文件包含 1 个分区。
我开始流df.writeStream.outputMode("update").format("console").start
。
然后 Spark 一次处理整个文件。但是我希望 Spark 在更新结果时会如何“拆分”文件并一次处理每个拆分,就像我输入新单词时的单词计数示例更新结果一样。
我尝试添加trigger(Trigger.ProcessingTime("x seconds"))
,但没有奏效。
解决方案
然后 Spark 一次处理整个文件。但是我希望 Spark 在更新结果时会如何“拆分”文件并一次处理每个拆分,就像我输入新单词时的单词计数示例更新结果一样。
这就是 Spark 结构化流处理文件的方式。它立即处理它们,不再考虑它们。它确实将文件“拆分”为多个部分(嗯,这应该在存储的手中,例如 HDFS,而不是 Spark 本身),但它是在幕后进行的。
请注意,一旦文件被处理,该文件将永远不会被再次处理。
我尝试添加
trigger(Trigger.ProcessingTime("x seconds"))
,但没有奏效。
嗯,确实如此,但不是你想要的。
DataStreamWriter.trigger设置流查询的触发器。默认值为 ProcessingTime(0),它将尽可能快地运行查询。
请参阅DataStreamWriter的 scaladoc 。
推荐阅读
- javascript - React Native 使用 Firebase 从 JSON 树中获取特定的子值
- c# - C# 用 Windows 窗体中使用的所有按钮填充数组
- c++ - 如何访问结构的成员
- javascript - 如何在 JSX (React) 中执行 JS 而不将其输出为 HTML?
- google-bigquery - BigQuery:无效日期错误
- reactjs - 如何避免使用来自 material-ui 的输入的十进制值
- r - 将csv文件从多个目录复制到R中的新目录
- c# - 使用 ASP.net MVC 上的现有表
- mysql - 无法匹配 freeradius 中的 mysql 字符串响应
- extjs - 带触发器的文本区域 - 如何将触发器添加到选项卡