json - kafka json数组消费者scala
问题描述
如果我们生成如下 JSON 数据,
[{"name":"Andy", "age":30}]
[{"name":"Romen", "age":20}]
如何使用 scala 在 Kafka 消费者上使用它。我试着把它读成 .select(cast(value as String))。但不起作用。请帮忙
我读取数据如下:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:host")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load().selectExpr("CAST(value AS STRING)")
解决方案
我只是创建一个示例代码
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("Example")
.master("local")
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
import spark.implicits._
val result = df.selectExpr("CAST(value AS STRING)")
.as[String]
result.writeStream
.format("console")
.start()
.awaitTermination()
ssc.start()
ssc.awaitTermination()
}
如果您想了解更多信息,请从 Kafka 读取消息并将其写入控制台,请在此处查看:https ://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html
推荐阅读
- entity-framework - Entity Framework Core 和复杂的值对象
- python - Deep Learning with Python book 引用了 The final output is a 2D tensor of shape (timesteps, output_features) 但 final_output_sequence 是 1D
- javascript - 如何启用在对话框外部单击
- css - 游标是否应该只应用于:悬停?
- python - 无法过滤特定的字符串模式,也无法更改 pandas 中的索引
- sql - 计算带有条件的最新项目的 SQL 年龄
- google-apps-script - 获取解析错误 setFormula Google Script
- typescript - 如何以解构方式从具有任何类型的对象提供类型
- r - 闪亮的应用程序 Markdown Latex kable 功能的问题
- javascript - 数组映射函数无法在简单示例中推送对象