apache-spark - 在火花结构化流中从 kafa 读取时,有什么方法可以解析 json(不解析模式)
问题描述
我正在尝试从 kafka 发送一个 json 字符串,并使用 spark 结构化流式编程将其保存在 hdfs json 文件中。
例如我的输入如下:
{
"customerId":1
"Name":"xyz"
"dept":"IT"
}
我的输出应该存储在某个时间间隔内来自单个文件的流中的所有数据。
{
"customerId":1
"Name":"xyz"
"dept":"IT"
}{
"customerId":1
"Name":"xyz"
"dept":"IT"
}
但我得到低于输出:
"value":{
"customerId":1
"Name":"xyz"
"dept":"IT"
}
"value":{
"customerId":1
"Name":"xyz"
"dept":"IT"
}
我想删除 value 选项。我知道它来自 kafka 。 如果我尝试使用架构,它的工作正常。但是我的架构应该是动态的,对于每个客户来说都是明智的。 任何帮助将不胜感激。
以下是示例代码库:
val inputDf = spark.read.format("kafka").option("kafka.bootstrap.servers","hd0-dn01.com:9091").option("subscribe","topic1").load()
val dataDf = inputDf.selectExpr("CAST(value AS STRING)")
dataDf.writeStream.trigger(Trigger.ProcessingTime(10,TimeUnit.SECONDS)).format("console").option("truncate",false).outputMode("append").start().awaitTermination()
解决方案
推荐阅读
- python - 访问 runtest 挂钩内的 pytest PluginManager 对象
- java - 在java中将对象添加到动态列表中
- javascript - 在 Angular ngAfterViewInit 中返回未定义的索引处的数组
- android - 如何检测 TalkBack 是否处于活动状态?使用 Xamarin Forms -Dependency 服务
- python - 从音频信号中提取特征
- python - 如何重构检查条件的临时变量?
- ms-access - Application.forms.Count 有时不准确
- ios - iOS MMDrawerController Objective-c 登录视图控制器
- r - ggplot2:有没有办法将平均值与箱线图对齐并对齐图例文本?
- typescript - @types/jest index.d.ts 文件返回错误