scala - 使用 ForeachWriter 打印行
问题描述
我正在使用 apache spark (Scala) 读取来自 kafka 主题的传入数据流。我想打印消息中的每一行。我正在尝试使用ForeachWriter
,我的代码如下所示:
DF.writeStream.foreach(new ForeachWriter[Row] {
override def process(value: Row): Unit = {
println(s"Processing ${value}")
println(value.toString())
}
override def open(partitionId: Long, epochId: Long): Boolean = {true}
override def close(errorOrNull: Throwable): Unit = {}
}
).start()
但我没有在控制台上得到任何输出。请帮忙。
解决方案
有两种方法可以达到您所需的结果。
- 使用 ForeachWriter,无论您做什么都是正确的,但最终您错过了调用 awaitTermination() 方法。
- 使用 foreachBatch
代码:
val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val kafkaDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "mytopic")
.option("startingOffsets", "latest")
.load().select('value.cast("string"))
// Any one approach can be used at a time
// 1. using ForeachWriter
kafkaDF.writeStream.foreach(new ForeachWriter[Row] {
override def process(value: Row): Unit = println(s"Processing ${value}")
override def open(partitionId: Long, epochId: Long): Boolean = true
override def close(errorOrNull: Throwable): Unit = {}
}
).start().awaitTermination()
// 2. using foreachBatch
kafkaDF.writeStream.foreachBatch((ds, l) => {
ds.foreach(println(_))
}).start().awaitTermination()
推荐阅读
- symfony - 与现有实体的多对一关系
- c# - 在 Blazor 项目中使用 api 的网格中的特定行出现问题
- python - opencv中是否有可以检测下图中曲线的功能?
- android - 如何设置 Firebase OTP 数字的限制,例如 6 到 4?
- c# - 为什么我在打开然后关闭窗口上的菜单时收到 InvalidCastException?
- python - 在 __init__ raise TypeError("%s() got an unexpected keyword argument '%s'" % (cls.__name__, kwarg))
- augmented-reality - ArSession_update 需要很多时间
- node.js - 意外的令牌o
- intellij-idea - 如何以分离模式从终端运行 Intellij IDEA
- c# - 如何在 IIS 上发布的应用程序中打开列表框中列出的文件(excel、txt、word)?