scala - 如果我在本地执行了函数,则在 foreachRdd 中使用 kafka 代码的 Spart 流不会执行
问题描述
我在本地设置了 spark 2.2 并使用 scala
火花会话配置如下
val sparkSession = SparkSession
.builder()
.appName("My application")
.config("es.nodes", "localhost:9200")
.config("es.index.auto.create", true)
.config("spark.streaming.backpressure.initialRate", "1")
.config("spark.streaming.kafka.maxRatePerPartition", "7")
.master("local[2]")
.enableHiveSupport()
.getOrCreate()
我在本地机器上运行 spark
当我做
kafkaStream.foreachRDD(rdd => {
calledFunction(rdd)
})
def calledFunction(rdd: RDD[ConsumerRecord[String, String]]): Unit ={
rdd.foreach(r=>{
print("hello")})
}
对于我本地机器上的上述代码,“hello”没有打印,但所有作业都在排队。
如果我将代码更改为
kafkaStream.foreachRDD(rdd => {
rdd.foreach(r=>{
print("hello")})
})
然后它在控制台上打印“你好”。
你能帮我看看有什么问题吗?
解决方案
当使用 spark 1.6 运行时,它会在控制台中打印 hello。参考这里是示例代码
val message = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaConf,
Map("test" ->1),
StorageLevel.MEMORY_ONLY
)
val lines = message.map(_._2)
lines.foreachRDD(rdd => {calledFunction(rdd)})
def calledFunction(rdd: RDD[String]): Unit ={
rdd.foreach(r=>{
print("hello")})
}
希望这可以帮助。由于依赖不匹配,我目前无法使用 spark 2.0 重新生成相同的问题。
推荐阅读
- excel - 如何用不同行数的数组制作表格?
- android - 您如何找到某些导入所需的依赖项?
- java - 是否可以在 ServerListPingEvent `event.setMotd()` 中获得换行符的效果?
- ios - 如何以iso格式更改日期字符串的时区?
- javascript - 如何使用 js 将变量内容保存/加载到文本文件
- java - ListView:列表视图的第一项影响其最后一项
- json - 如何在 Json.Text 中全局添加自定义转换器?
- python - 使用 Pandas 在 .csv 文件中查找特定值的 x,y 坐标
- python - 我怎样才能让它让python读取一个文件,任务是让某人从这个文件中命名10首歌曲,而不能重复一首歌曲
- qt - Qt垂直滚动条跳转到电子表格顶部