首页 > 解决方案 > Spark Structured Streaming writeStream 输出一个全局 csv

问题描述

我目前正在使用 Spark Structured Streaming 制作原始日志数据聚合器。

Inputstream 由文本文件目录组成:

// == Input == //

val logsDF = spark.readStream
  .format("text")
  .option("maxFilesPerTrigger", 1)
  .load("input/*")

然后解析日志...

// == Parsing == //

val logsDF2 = ...

...并聚合

// == Aggregation == //

val windowedCounts = logsDF2
  .withWatermark("window_start", "15 minutes")
  .groupBy(
    col("window"),
    col("node")
  ).count()

当我使用“控制台”接收器时一切正常:结果在控制台中逐个更新:

// == Output == //

val query = windowedCounts.writeStream
  .format("console")
  .outputMode("complete")
  .start()
  .awaitTermination()

现在我想将我的结果保存在一个唯一的文件中(json、parquet、csv ..)

// == Output == //

val query = windowedCounts.writeStream
  .format("csv")
  .option("checkpointLocation", "checkpoint/")
  .start("output/")
  .awaitTermination()

但它输出了我 400 个空 csv ......我怎样才能像在控制台中那样得到我的结果?

非常感谢 !

标签: scalaspark-streaming

解决方案


很久以前,但我自己遇到了这个问题,并认为我会解决它。确实,我认为您的代码很好,直到您尝试将数据放入 csv 文件中。尝试将 writeStream csv 代码更改为:

// == Output == //
val query = windowedCounts.writeStream
  .format("csv")
  .trigger(processingTime="10 seconds")
  .option("checkpointLocation", "checkpoint/")
  .option("path", "output_path/")
  .outputMode("append")
  .start()
  .awaitTermination()

该行:

.trigger(processingTime="10 seconds")

应该解决您的 400 个文件,因为它每 10 秒只写入一个新文件。这两行:

.option("path", "output_path/")
.outputMode("append")

当您附加最新值并将文件输出到特定输出目录时,应该可以解决空文件问题。


推荐阅读