首页 > 解决方案 > 在火花结构化流中从 kafa 读取时,有什么方法可以解析 json(不解析模式)

问题描述

我正在尝试从 kafka 发送一个 json 字符串,并使用 spark 结构化流式编程将其保存在 hdfs json 文件中。

例如我的输入如下:

{
"customerId":1
"Name":"xyz"
"dept":"IT"
}

我的输出应该存储在某个时间间隔内来自单个文件的流中的所有数据。

{
"customerId":1
"Name":"xyz"
"dept":"IT"
}{
"customerId":1
"Name":"xyz"
"dept":"IT"
}

但我得到低于输出:

"value":{
"customerId":1
"Name":"xyz"
"dept":"IT"
}
"value":{
"customerId":1
"Name":"xyz"
"dept":"IT"
}

我想删除 value 选项。我知道它来自 kafka 。 如果我尝试使用架构,它的工作正常。但是我的架构应该是动态的,对于每个客户来说都是明智的。 任何帮助将不胜感激。

以下是示例代码库:

val inputDf = spark.read.format("kafka").option("kafka.bootstrap.servers","hd0-dn01.com:9091").option("subscribe","topic1").load()

val dataDf = inputDf.selectExpr("CAST(value AS STRING)")

dataDf.writeStream.trigger(Trigger.ProcessingTime(10,TimeUnit.SECONDS)).format("console").option("truncate",false).outputMode("append").start().awaitTermination()

标签: apache-sparkapache-kafkaspark-structured-streaming

解决方案


推荐阅读