首页 > 解决方案 > 如何修复 textfilestream 代码的空输出

问题描述

 object abc {

  def main(args: Array[String]) = {
    m()
  }

  def m() {
    val spark = SparkSession.builder.appName("ola").master("local[*]").getOrCreate
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(5))
    var cnt = sc.longAccumulator("cnt")
    cnt.value

    import spark.implicits._
    val x = ssc.textFileStream("file:///home/xyz/folderone/")

    x.foreachRDD{ rddx =>
      val x2 = rddx.map { xxx =>
        cnt.add(1)
        xxx
      }
      x2.toDF.write.format("text").mode("overwrite").save("file:///home/xyz/oparekta")
    }
    println(s"value of count ${cnt.value}")
    ssc.start()
    ssc.awaitTermination()

  }

上面的代码是从给定的文件夹路径处理文件,不知何故代码中有一些问题,得到空的输出文件,可能是什么原因?

标签: scalaapache-sparkspark-streaming

解决方案


尝试这样的事情来避免处理空数据:

...
QS.foreachRDD(q => {
    if(!q.isEmpty) {   
...

此外,需要考虑覆盖附加。不确定您的用例,可能是疏忽。


推荐阅读