首页 > 解决方案 > 如何将 Dstream 或 RDD 的内容附加到现有输出文件 - Spark Streaming

问题描述

我正在研究简单的 SparkStreaming wordcount 示例,以计算从侦听 TCP 套接字的数据服务器接收到的文本数据中的字数。我想将每个不为空的 Dstream 的内容保存到现有的文本文件中。目前,我正在使用 Spark Shell。这是我的代码

我已经尝试过这段代码,它可以工作,但它会覆盖当前文件:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
val ssc = new StreamingContext(sc, Seconds(2))

val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)

lines.foreachRDD{ rdd => if (!rdd.isEmpty) 
//.% to check if the Dstream is empty or not  
{
rdd.saveAsTextFile("/stream_test/testLine.txt")
}
}

ssc.start()

感谢您的宝贵帮助

标签: scalaapache-sparkspark-streaming

解决方案


推荐阅读