apache-spark - 如何将火花流应用程序的输出写入单个文件
问题描述
我正在使用火花流从 Kafka 读取数据并传递给 py 文件进行预测。它返回预测以及原始数据。它将原始数据及其预测保存到文件中,但是它为每个 RDD 创建了一个文件。我需要一个包含所有收集到的数据的文件,直到我停止将程序保存到一个文件中。
我试过 writeStream 它甚至不会创建一个文件。我尝试使用 append 将其保存到镶木地板,但它会为每个 RDD 创建多个文件,即 1 个文件。我尝试使用附加模式写入多个文件作为输出。下面的代码创建一个文件夹 output.csv 并将所有文件输入其中。
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder()
.appName("consumer")
.master("local[*]")
.getOrCreate()
val scc = new StreamingContext(ss.sparkContext, Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer"->
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer">
"org.apache.kafka.common.serialization.StringDeserializer",
"group.id"-> "group5" // clients can take
)
mappedData.foreachRDD(
x =>
x.map(y =>
ss.sparkContext.makeRDD(List(y)).pipe(pyPath).toDF().repartition(1)
.write.format("csv").mode("append").option("truncate","false")
.save("output.csv")
)
)
scc.start()
scc.awaitTermination()
我只需要获取 1 个文件,其中包含在流式传输时一一收集的所有语句。
任何帮助将不胜感激,谢谢您的期待。
解决方案
hdfs 中的任何文件一旦写入就不能修改。如果您希望实时写入文件(每 2 秒将来自流式作业的数据块附加到同一文件中),则根本不允许这样做,因为 hdfs 文件是不可变的。如果可能,我建议您尝试编写从多个文件读取的读取逻辑。
但是,如果您必须从单个文件中读取,我建议您使用两种方法之一,在您将输出写入单个 csv/parquet 文件夹后,使用“Append”SaveMode(它将为您编写的每个块创建部分文件2 秒)。
- 您可以在此文件夹顶部创建一个配置单元表,从该表中读取数据。
您可以在 spark 中编写一个简单的逻辑来读取包含多个文件的文件夹,然后使用 reparation(1) 或 coalesce(1) 将其作为单个文件写入另一个 hdfs 位置,然后从该位置读取数据。见下文:
spark.read.csv("oldLocation").coalesce(1).write.csv("newLocation")
推荐阅读
- objective-c - 对具有不可变参数且没有初始化程序的方法进行单元测试的最佳方法是什么?
- polymer - 为什么 Webcomponent 对象的值在同一个 webcomponent 的不同实例之间共享?
- macos - 如何在 Mac 上调试访问相机的应用程序?
- python - requestsession 在 Django 中无法将 html 表单变量传递给另一个函数
- ios - 在 Mac Catalyst 项目中未调用 IBAction
- reactjs - @nivo/line:如何在鼠标离开时保持最后一个点处于活动状态?
- android - Android SAF DocumentsProvider 未显示在文件选择器中
- javascript - Javascript onnclick 调用 ejs 中的函数
- javascript - 在Javascript中合并两个对象数组
- c - 头文件中数组的生命周期