首页 > 解决方案 > 如果我在本地执行了函数,则在 foreachRdd 中使用 kafka 代码的 Spart 流不会执行

问题描述

我在本地设置了 spark 2.2 并使用 scala

火花会话配置如下

val sparkSession = SparkSession
  .builder()
  .appName("My application")
  .config("es.nodes", "localhost:9200")
  .config("es.index.auto.create", true)
  .config("spark.streaming.backpressure.initialRate", "1")
  .config("spark.streaming.kafka.maxRatePerPartition", "7")
  .master("local[2]")
  .enableHiveSupport()
  .getOrCreate()

我在本地机器上运行 spark

当我做

  kafkaStream.foreachRDD(rdd => {
   calledFunction(rdd)
 })


def calledFunction(rdd: RDD[ConsumerRecord[String, String]]): Unit ={

 rdd.foreach(r=>{
 print("hello")})
}

对于我本地机器上的上述代码,“hello”没有打印,但所有作业都在排队。

如果我将代码更改为

kafkaStream.foreachRDD(rdd => { rdd.foreach(r=>{ print("hello")}) })

然后它在控制台上打印“你好”。

你能帮我看看有什么问题吗?

标签: scalaapache-sparkspark-streaming

解决方案


当使用 spark 1.6 运行时,它会在控制台中打印 hello。参考这里是示例代码

val message = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
  ssc,
  kafkaConf,
  Map("test" ->1),
  StorageLevel.MEMORY_ONLY
)
val lines = message.map(_._2)
lines.foreachRDD(rdd => {calledFunction(rdd)})


def calledFunction(rdd: RDD[String]): Unit ={
  rdd.foreach(r=>{
    print("hello")})
}

希望这可以帮助。由于依赖不匹配,我目前无法使用 spark 2.0 重新生成相同的问题。


推荐阅读