首页 > 解决方案 > 使用 ForeachWriter 打印行

问题描述

我正在使用 apache spark (Scala) 读取来自 kafka 主题的传入数据流。我想打印消息中的每一行。我正在尝试使用ForeachWriter,我的代码如下所示:

DF.writeStream.foreach(new ForeachWriter[Row] {
  
override def process(value: Row): Unit = {
  println(s"Processing ${value}")
  println(value.toString())
}
override def open(partitionId: Long, epochId: Long): Boolean = {true}

override def close(errorOrNull: Throwable): Unit = {}
}
).start()

但我没有在控制台上得到任何输出。请帮忙。

标签: scalaapache-sparkforeach

解决方案


有两种方法可以达到您所需的结果。

  1. 使用 ForeachWriter,无论您做什么都是正确的,但最终您错过了调用 awaitTermination() 方法。
  2. 使用 foreachBatch

代码:

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val kafkaDF = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "mytopic")
        .option("startingOffsets", "latest") 
        .load().select('value.cast("string"))

 // Any one approach can be used at a time

 // 1. using ForeachWriter
     
   kafkaDF.writeStream.foreach(new ForeachWriter[Row] {
    override def process(value: Row): Unit = println(s"Processing ${value}")
    override def open(partitionId: Long, epochId: Long): Boolean = true
    override def close(errorOrNull: Throwable): Unit = {}
}
).start().awaitTermination()

// 2. using foreachBatch
kafkaDF.writeStream.foreachBatch((ds, l) => {
    ds.foreach(println(_))
}).start().awaitTermination()

推荐阅读