首页 > 解决方案 > 在 Spark Stream 中保存 PairRdd 时出错

问题描述

我正在尝试将我的 Pair Rdd 保存在火花流中,但在最后一步保存时出错。

这是我的示例代码

def main(args: Array[String]) {

    val inputPath = args(0)
    val output = args(1)
    val noOfHashPartitioner = args(2).toInt

    println("IN Streaming ")
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val hadoopConf = sc.hadoopConfiguration;
    //hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

    val ssc = new org.apache.spark.streaming.StreamingContext(sc, Seconds(60))
    val input = ssc.textFileStream(inputPath)

    val pairedRDD = input.map(row => {
      val split = row.split("\\|")
      val fileName = split(0)
      val fileContent = split(1)
      (fileName, fileContent)
    })
    import org.apache.hadoop.io.NullWritable
    import org.apache.spark.HashPartitioner
    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

    class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
      override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
      override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
    }
    //print(pairedRDD)

    pairedRDD.partitionBy(new HashPartitioner(noOfHashPartitioner)).saveAsHadoopFile(output, classOf[String], classOf[String], classOf[RddMultiTextOutputFormat], classOf[GzipCodec])

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }

我在保存时进入了最后一步。我是火花流的新手,所以必须在这里遗漏一些东西。得到错误,如

值 partitionBy 不是 org.apache.spark.streaming.dstream.DStream[(String, String)] 的成员

请帮忙

标签: scalaapache-sparkspark-streaming

解决方案


pairedRDDDStream[(String, String)]not类型RDD[(String,String)]。该方法partitionBy在 s 上不可用DStream

也许看看foreachRDD哪些应该在DStreams 上可用。

编辑:更多上下文解释textFileStream将在指定路径上设置目录监视,并且每当有新文件时都会流式传输内容。这就是流方面的来源。那是你要的吗?还是您只想“按原样”阅读目录的内容一次?然后readTextFiles将返回一个非流容器。


推荐阅读