scala - 在 Spark Stream 中保存 PairRdd 时出错
问题描述
我正在尝试将我的 Pair Rdd 保存在火花流中,但在最后一步保存时出错。
这是我的示例代码
def main(args: Array[String]) {
val inputPath = args(0)
val output = args(1)
val noOfHashPartitioner = args(2).toInt
println("IN Streaming ")
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")
val sc = new SparkContext(conf)
val hadoopConf = sc.hadoopConfiguration;
//hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
val ssc = new org.apache.spark.streaming.StreamingContext(sc, Seconds(60))
val input = ssc.textFileStream(inputPath)
val pairedRDD = input.map(row => {
val split = row.split("\\|")
val fileName = split(0)
val fileContent = split(1)
(fileName, fileContent)
})
import org.apache.hadoop.io.NullWritable
import org.apache.spark.HashPartitioner
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}
//print(pairedRDD)
pairedRDD.partitionBy(new HashPartitioner(noOfHashPartitioner)).saveAsHadoopFile(output, classOf[String], classOf[String], classOf[RddMultiTextOutputFormat], classOf[GzipCodec])
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
我在保存时进入了最后一步。我是火花流的新手,所以必须在这里遗漏一些东西。得到错误,如
值 partitionBy 不是 org.apache.spark.streaming.dstream.DStream[(String, String)] 的成员
请帮忙
解决方案
pairedRDD
是DStream[(String, String)]
not类型RDD[(String,String)]
。该方法partitionBy
在 s 上不可用DStream
。
也许看看foreachRDD
哪些应该在DStream
s 上可用。
编辑:更多上下文解释textFileStream
将在指定路径上设置目录监视,并且每当有新文件时都会流式传输内容。这就是流方面的来源。那是你要的吗?还是您只想“按原样”阅读目录的内容一次?然后readTextFiles
将返回一个非流容器。
推荐阅读
- c# - 列表
属性映射不起作用 - css - 如何以角度更改下拉框的高度?
- python - 如何获得 r'\\\|' 来自 yaml 文件
- node.js - 在 feathersjs 中将自定义事件发布到 socket.io 通道的问题
- javascript - firebase 中的 Javascript 函数
- css - 是否有任何具有“光标:指针”与“btn”不同的引导程序 4 类?
- android - 将应用程序转换为库后,.aar 文件不可用
- vue.js - Vuejs通过路由器将数据作为道具传递给组件
- python - Django:如何将平面查询集处理为嵌套字典?
- java - 如何使用 Java 执行 Allure 命令