首页 > 解决方案 > Spark Counter:主要方法/扩展应用程序为同一应用程序提供不同的输出,为什么会这样?

问题描述

我有一个文件丢失了一些数据,所以我试图通过使用计数器变量来识别丢失的记录数。

File: data-error.csv                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
123,200,Abc
124,300,xyz
125,4,
126,abd
127,400,abc1

此文件包含Total No Records= 5 和 Missing Records = 2

通过扩展 App trait 进行编程:输出5 2

object CountersTest extends App{
  val conf=new SparkConf().setAppName("CounterTest").setMaster("local")
  val sc=new SparkContext(conf)

  val fileRDD=sc.textFile("data-error.csv")
  var missingRecords=0

  val rdd1=fileRDD.map(rec=>{
    val parseResult=RecordParser.parse(rec)
    if(parseResult.isLeft) missingRecords+=1
    rec
  })
  println(rdd1.count())
  println(missingRecords)
}

程序使用 main 方法:输出5 0

object CounterWithMain {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("CounterTest").setMaster("local")
    val sc=new SparkContext(conf)

    val fileRDD=sc.textFile("data-error.csv")
    var missingRecords=0

    val rdd1=fileRDD.map(rec=>{
      val parseResult=RecordParser.parse(rec)
      if(parseResult.isLeft) missingRecords+=1
      rec
    })
    println(rdd1.count())                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
    println(missingRecords)
  }
}

为什么同一个应用程序给出不同的输出?请帮我解决这个问题。提前致谢。

标签: scalaapache-sparkspark-streaming

解决方案


推荐阅读