首页 > 解决方案 > 从 HDFS 文件夹流式传输

问题描述

我正在尝试实现一个scala+spark解决方案,以从 HDFS 文件夹中的新值流式传输字数信息,如下所示:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._

object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

我试着spark-shell跑步HdfsWordCount.main(Array('hdfs:///user/cloudera/sparkStreaming/')) ,它只是给一个|,让我打字。难道我做错了什么?

标签: scalaapache-sparkhadoophdfscloudera-cdh

解决方案


推荐阅读