apache-spark - 使用合并函数将 RDD 保存为 csv 文件
问题描述
我正在尝试使用 Intellij 中的 Apache Spark 流式传输 twitter 数据,但是当我使用该函数时coalesce
,它说它无法解析符号合并。这是我的主要代码:
val spark = SparkSession.builder().appName("twitterStream").master("local[*]").getOrCreate()
import spark.implicits._
val sc: SparkContext = spark.sparkContext
val streamContext = new StreamingContext(sc, Seconds(5))
val filters = Array("Singapore")
val filtered = TwitterUtils.createStream(streamContext, None, filters)
val englishTweets = filtered.filter(_.getLang() == "en")
//englishTweets.print()
englishTweets.foreachRDD{rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val tweets = rdd.map( field =>
(
field.getId,
field.getUser.getScreenName,
field.getCreatedAt.toInstant.toString,
field.getText.toLowerCase.split(" ").filter(_.matches("^[a-zA-Z0-9 ]+$")).fold("")((a, b) => a + " " + b).trim,
sentiment(field.getText)
)
)
val tweetsdf = tweets.toDF("userID", "user", "createdAt", "text", "sentimentType")
tweetsdf.printSchema()
tweetsdf.show(false)
}.coalesce(1).write.csv("hdfs://localhost:9000/usr/sparkApp/test/testing.csv")
解决方案
我已经尝试过使用自己的数据集,并且我已经阅读了一个数据集,并且在编写时我已经应用了合并函数并且它正在给出结果,请参考这个它可能会对你有所帮助。
import org.apache.spark.sql.SparkSession
import com.spark.Rdd.DriverProgram
import org.apache.log4j.{ Logger, Level }
import org.apache.spark.sql.SaveMode
import java.sql.Date
object JsonDataDF {
System.setProperty("hadoop.home.dir", "C:\\hadoop");
System.setProperty("hadoop.home.dir", "C:\\hadoop"); // This is the system property which is useful to find the winutils.exe
Logger.getLogger("org").setLevel(Level.WARN) // This will remove Logs
case class AOK(appDate:Date, arr:String, base:String, Comments:String)
val dp = new DriverProgram
val spark = dp.getSparkSession()
def main(args : Array[String]): Unit = {
import spark.implicits._
val jsonDf = spark.read.option("multiline", "true").json("C:\\Users\\34979\\Desktop\\Work\\Datasets\\JSONdata.txt").as[AOK]
jsonDf.coalesce(1) // Refer Here
.write
.mode(SaveMode.Overwrite)
.option("header", "true")
.format("csv")
.save("C:\\Users\\34979\\Desktop\\Work\\Datasets\\JsonToCsv")
}
}
推荐阅读
- python - python中的字符串替换-如何只获取一个值
- pandas - 从矩阵搜索计算 Pandas 创建数据框
- reactjs - TypeError:使用 Chart-Race-React 时无法读取未定义的属性“高度”
- pandas - 具有特定列的熊猫模式验证
- android - 切换标签时滞后
- architecture - 微服务的用户特定设置
- php - SQL ALTER TABLE AUTO_INCREMENT 中的变量
- maven - 如果 Jenkins 已经知道,为什么要在 POM 中添加分发管理
- java - 从命令行运行程序时找不到我的 java .class 文件(类不属于任何包)
- javascript - Vue Js 我应该为此使用 Vuex 吗?