首页 > 解决方案 > 从 Spark Streaming 获取异常“未注册输出操作,因此没有可执行的操作”

问题描述

package com.scala.sparkStreaming

import org.apache.spark._
import org.apache.spark.streaming._

object Demo1 {
  def main(assdf:Array[String]){

     val sc=new SparkContext("local","Stream")

     val stream=new StreamingContext(sc,Seconds(2))

     val rdd1=stream.textFileStream("D:/My Documents/Desktop/inbound/sse/ssd/").cache()

     val mp1= rdd1.flatMap(_.split(","))
     print(mp1.count())

     stream.start()
     stream.awaitTermination()
  }
}

我已经运行它,然后它显示异常

org.apache.spark.streaming.dstream.MappedDStream@6342993220/05/22 18:14:16 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:277)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:169)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:517)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:576)
    at com.scala.sparkStreaming.Demo1$.main(Demo1.scala:18)
    at com.scala.sparkStreaming.Demo1.main(Demo1.scala)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:277)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:169)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:517)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:576)
    at com.scala.sparkStreaming.Demo1$.main(Demo1.scala:18)
    at com.scala.sparkStreaming.Demo1.main(Demo1.scala)

标签: apache-sparkspark-streamingrddspark-structured-streaming

解决方案


错误消息“未注册输出操作,因此没有可执行的操作”提示缺少某些内容。

您的 Direct Streamsrdd1mp1没有任何Action。AflatMap只是一个由 Spark 懒惰评估的转换。这就是该stream.start()方法抛出此异常的原因。

根据文档,您可以打印出如下所示的 RDD。当您处理 DStream 时,您可以遍历 RDD。下面的代码在 Spark 版本 2.4.5 上运行良好。

的文档textFileStream说它“监视与 Hadoop 兼容的文件系统中的新文件并将它们作为文本文件读取”,因此请确保在作业运行时添加/修改要读取的文件。

此外,虽然我对 Windows 上的 Spark 并不完全熟悉,但您可能需要将目录字符串更改为

file://D:\\My Documents\\Desktop\\inbound\\sse\\ssd

以下是 Spark Streaming 的完整代码示例:

import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Main extends App {
  val sc=new SparkContext("local[1]","Stream")

  val stream=new StreamingContext(sc,Seconds(2))

  val rdd1 =stream.textFileStream("file:///path/to/src/main/resources")

  val mp1= rdd1.flatMap(_.split(" "))

  mp1.foreachRDD(rdd => rdd.collect().foreach(println(_)))

  stream.start()
  stream.awaitTermination()
}

在 Spark 版本 2.4.5Spark Streaming中已弃用,我建议您熟悉Spark Structured Streaming. 代码看起来像这样:

// Structured Streaming
  val lines: DataFrame = spark.readStream
    .format("text")
    .option("path", "file://path/to/src/main/resources")
    .option("maxFilesPerTrigger", "1")
    .load()

  val query = lines.writeStream
    .outputMode("append")
    .format("console")
    .start()

  query.awaitTermination()

推荐阅读