首页 > 解决方案 > 使用合并函数将 RDD 保存为 csv 文件

问题描述

我正在尝试使用 Intellij 中的 Apache Spark 流式传输 twitter 数据,但是当我使用该函数时coalesce,它说它无法解析符号合并。这是我的主要代码:

    val spark = SparkSession.builder().appName("twitterStream").master("local[*]").getOrCreate()
    import spark.implicits._
    val sc: SparkContext = spark.sparkContext
    val streamContext = new StreamingContext(sc, Seconds(5))

    val filters = Array("Singapore")
    val filtered = TwitterUtils.createStream(streamContext, None, filters)
    val englishTweets = filtered.filter(_.getLang() == "en")

    //englishTweets.print()

    englishTweets.foreachRDD{rdd =>
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._
      val tweets = rdd.map( field =>
        (
          field.getId,
          field.getUser.getScreenName,
          field.getCreatedAt.toInstant.toString,
          field.getText.toLowerCase.split(" ").filter(_.matches("^[a-zA-Z0-9 ]+$")).fold("")((a, b) => a + " " + b).trim,
          sentiment(field.getText)
        )
      )
      val tweetsdf = tweets.toDF("userID", "user", "createdAt", "text", "sentimentType")
      tweetsdf.printSchema()
      tweetsdf.show(false)
    }.coalesce(1).write.csv("hdfs://localhost:9000/usr/sparkApp/test/testing.csv")

标签: apache-sparktwitterspark-streaming

解决方案


我已经尝试过使用自己的数据集,并且我已经阅读了一个数据集,并且在编写时我已经应用了合并函数并且它正在给出结果,请参考这个它可能会对你有所帮助。

import org.apache.spark.sql.SparkSession
import com.spark.Rdd.DriverProgram
import org.apache.log4j.{ Logger, Level }
import org.apache.spark.sql.SaveMode
import java.sql.Date

object JsonDataDF {
  
  System.setProperty("hadoop.home.dir", "C:\\hadoop");
  System.setProperty("hadoop.home.dir", "C:\\hadoop"); // This is the system property which is useful to find the winutils.exe
  Logger.getLogger("org").setLevel(Level.WARN) // This will remove Logs

  case class AOK(appDate:Date, arr:String, base:String, Comments:String)
  
  val dp = new DriverProgram
  val spark = dp.getSparkSession()
  
  def main(args : Array[String]): Unit = {
    
    import spark.implicits._
    val jsonDf = spark.read.option("multiline", "true").json("C:\\Users\\34979\\Desktop\\Work\\Datasets\\JSONdata.txt").as[AOK]
    
    jsonDf.coalesce(1)  // Refer Here
          .write
          .mode(SaveMode.Overwrite)
          .option("header", "true")
          .format("csv")
          .save("C:\\Users\\34979\\Desktop\\Work\\Datasets\\JsonToCsv")
  }
  
}

推荐阅读