apache-spark - 从 Spark Streaming 获取异常“未注册输出操作,因此没有可执行的操作”
问题描述
package com.scala.sparkStreaming
import org.apache.spark._
import org.apache.spark.streaming._
object Demo1 {
def main(assdf:Array[String]){
val sc=new SparkContext("local","Stream")
val stream=new StreamingContext(sc,Seconds(2))
val rdd1=stream.textFileStream("D:/My Documents/Desktop/inbound/sse/ssd/").cache()
val mp1= rdd1.flatMap(_.split(","))
print(mp1.count())
stream.start()
stream.awaitTermination()
}
}
我已经运行它,然后它显示异常
org.apache.spark.streaming.dstream.MappedDStream@6342993220/05/22 18:14:16 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:277)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:169)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:517)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:576)
at com.scala.sparkStreaming.Demo1$.main(Demo1.scala:18)
at com.scala.sparkStreaming.Demo1.main(Demo1.scala)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:277)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:169)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:517)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:576)
at com.scala.sparkStreaming.Demo1$.main(Demo1.scala:18)
at com.scala.sparkStreaming.Demo1.main(Demo1.scala)
解决方案
错误消息“未注册输出操作,因此没有可执行的操作”提示缺少某些内容。
您的 Direct Streamsrdd1
并mp1
没有任何Action。AflatMap
只是一个由 Spark 懒惰评估的转换。这就是该stream.start()
方法抛出此异常的原因。
根据文档,您可以打印出如下所示的 RDD。当您处理 DStream 时,您可以遍历 RDD。下面的代码在 Spark 版本 2.4.5 上运行良好。
的文档textFileStream
说它“监视与 Hadoop 兼容的文件系统中的新文件并将它们作为文本文件读取”,因此请确保在作业运行时添加/修改要读取的文件。
此外,虽然我对 Windows 上的 Spark 并不完全熟悉,但您可能需要将目录字符串更改为
file://D:\\My Documents\\Desktop\\inbound\\sse\\ssd
以下是 Spark Streaming 的完整代码示例:
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Main extends App {
val sc=new SparkContext("local[1]","Stream")
val stream=new StreamingContext(sc,Seconds(2))
val rdd1 =stream.textFileStream("file:///path/to/src/main/resources")
val mp1= rdd1.flatMap(_.split(" "))
mp1.foreachRDD(rdd => rdd.collect().foreach(println(_)))
stream.start()
stream.awaitTermination()
}
在 Spark 版本 2.4.5Spark Streaming
中已弃用,我建议您熟悉Spark Structured Streaming
. 代码看起来像这样:
// Structured Streaming
val lines: DataFrame = spark.readStream
.format("text")
.option("path", "file://path/to/src/main/resources")
.option("maxFilesPerTrigger", "1")
.load()
val query = lines.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
推荐阅读
- c - 如何将一个简单的树写入文件并将其读回?
- ionic-framework - 在 Ionic 4 中使用 Camera Native 获取后图像未显示
- authentication - 对 API 服务器使用 ID 令牌或访问令牌
- c# - 如何在组合框中仅显示字符串的一部分
- excel - 基于 MS-word VBA 宏从 Excel 工作表中读取特定列
- ruby-on-rails - 如何在rails中获取用户的最后登录IP地址和用户当前的登录IP地址?
- node.js - 当我键入此命令“npm install”时不安装 node_modules
- php - 有没有办法从表 #2 中获取数据,如果它具有表 #1 的外键并且我查询表 #1
- typescript - Typescript 类增强器和静态方法
- oauth-2.0 - 为什么在 OAuth2 中使用授权令牌获取访问令牌时必须指定重定向 URI?