scala - 从 HDFS 文件夹流式传输
问题描述
我正在尝试实现一个scala
+spark
解决方案,以从 HDFS 文件夹中的新值流式传输字数信息,如下所示:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._
object HdfsWordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: HdfsWordCount <directory>")
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
我试着spark-shell
跑步HdfsWordCount.main(Array('hdfs:///user/cloudera/sparkStreaming/'))
,它只是给一个|
,让我打字。难道我做错了什么?
解决方案
推荐阅读
- python-3.x - 导入 stl 模型 PyQt3D 和 Pyinstaller
- amazon-web-services - 使用 COPY 命令从 s3 插入具有特殊字符(如 \ \n " ' )的数据到从 springboot 项目红移时出现问题
- pandas - 熊猫将多列堆叠成多列
- c++ - 字符串推回问题的向量,你们能帮我吗?
- typescript - Typescript 泛型函数,带有关于索引类型的断言
- swift - Firebase 数据库不会停止更新
- android - 在 ViewModel 环境中哪里轮询传感器数据?
- xcode - SwiftUI Xcode 11 beta 7 @Binding for collections 正在打破预览
- javascript - 为什么我的计时器不会每三天动态重置一次?
- c# - 无效的 URI:无法确定 URI 的格式