scala - Spark Structured Streaming writeStream 输出一个全局 csv
问题描述
我目前正在使用 Spark Structured Streaming 制作原始日志数据聚合器。
Inputstream 由文本文件目录组成:
// == Input == //
val logsDF = spark.readStream
.format("text")
.option("maxFilesPerTrigger", 1)
.load("input/*")
然后解析日志...
// == Parsing == //
val logsDF2 = ...
...并聚合
// == Aggregation == //
val windowedCounts = logsDF2
.withWatermark("window_start", "15 minutes")
.groupBy(
col("window"),
col("node")
).count()
当我使用“控制台”接收器时一切正常:结果在控制台中逐个更新:
// == Output == //
val query = windowedCounts.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
现在我想将我的结果保存在一个唯一的文件中(json、parquet、csv ..)
// == Output == //
val query = windowedCounts.writeStream
.format("csv")
.option("checkpointLocation", "checkpoint/")
.start("output/")
.awaitTermination()
但它输出了我 400 个空 csv ......我怎样才能像在控制台中那样得到我的结果?
非常感谢 !
解决方案
很久以前,但我自己遇到了这个问题,并认为我会解决它。确实,我认为您的代码很好,直到您尝试将数据放入 csv 文件中。尝试将 writeStream csv 代码更改为:
// == Output == //
val query = windowedCounts.writeStream
.format("csv")
.trigger(processingTime="10 seconds")
.option("checkpointLocation", "checkpoint/")
.option("path", "output_path/")
.outputMode("append")
.start()
.awaitTermination()
该行:
.trigger(processingTime="10 seconds")
应该解决您的 400 个文件,因为它每 10 秒只写入一个新文件。这两行:
.option("path", "output_path/")
.outputMode("append")
当您附加最新值并将文件输出到特定输出目录时,应该可以解决空文件问题。
推荐阅读
- angular - rxjs:如何从 catchError 中返回另一个 observable 的结果
- sql - where 子句中的多项选择
- c# - 在 C# 中执行具有复杂输出的 cmd 命令
- reactjs - 如何在 URL 中传递值以及如何在 React JS 中从 URl 获取值
- html - 从本地文件系统打开文件时如何创建下载链接
- java - ZonedDateTime 作为 Epoch 时间而不是 Spring App 中的标准字符串返回
- css - 更改颜色 Office UI Fabric 切换
- angular - 如何在 Angular 7 中找到此错误来自“_co.xyz 变量未定义”的行号或文件
- c# - 将 JSON 数组反序列化为字典
> - c# - 使用 Utf8Json 序列化/反序列化时指定 Epoch TimeDate 格式