json - spark streaming kafka 将字符串转换为 JSON
问题描述
我正在通过 MQTT 代理将传感器数据读取到 kafka 管道,并希望使用 spark 结构化流处理数据。
我已经测试了以下内容,并且能够将数据转换为 spark(到目前为止一切顺利)。
// create conf and streaming context:
val conf = new SparkConf().setMaster("yarn").setAppName("sensor_example")
val ssc = new StreamingContext(sc, Seconds(1))
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.2.109:9092").option("subscribe", "test_message").option("startingOffsets", "earliest").load()
// cast the value from binary to string:
val sensorDf = df.selectExpr("CAST(value AS STRING)")
val sensorOutput = sensorDf.as[String] // do I need this?
// write to the command line:
sensorOutput.writeStream.format("console").option("truncate","false").start().awaitTermination()
上面代码的输出是:
Batch: 1
-------------------------------------------
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{
"temperature": "28",
"message_no": "4727",
"timestamp": "2019-06-02 12:59:02.526809",
"average": "28.6454410831",
"humidity": "38",
"fahrenheit": "82"
}|
|{
"temperature": "28",
"message_no": "4728",
"timestamp": "2019-06-02 12:59:08.276594",
"average": "28.6453045685",
"humidity": "38",
"fahrenheit": "82"
}|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
我需要将其转换为具有相应列结构的数据框。创建的对象是 DataFrame 或 DataSet(只有一列)。
scala> val sensorDf = df.selectExpr("CAST(value AS STRING)")
sensorDf: org.apache.spark.sql.DataFrame = [value: string]
scala> val sensorOutput = sensorDf.as[String]
sensorOutput: org.apache.spark.sql.Dataset[String] = [value: string]
scala> sensorOutput.printSchema
root
|-- value: string (nullable = true)
但是,我找不到如何将其转换为具有由字符串字段定义的列和值的 DataFrame 的方法。架构应如下所示:
// create the schema:
val sensorSchema = StructType(Array(
StructField("temperature", IntegerType),
StructField("message_no", IntegerType),
StructField("timestamp", StringType),
StructField("average", DoubleType),
StructField("humidity", IntegerType),
StructField("fahrenheit", IntegerType)
))
我已经尝试过:
scala> val dataDf = sensorDf.selectExpr("CAST(value AS STRING) as json").select(from_json($"json", schema=sensorSchema).as("data")).select("data.*")
dataDf: org.apache.spark.sql.DataFrame = [temperature: int, message_no: int ... 4 more fields]
scala> dataDf.printSchema()
root
|-- temperature: integer (nullable = true)
|-- message_no: integer (nullable = true)
|-- timestamp: string (nullable = true)
|-- average: double (nullable = true)
|-- humidity: integer (nullable = true)
|-- fahrenheit: integer (nullable = true)
但输出是:
scala> dataDf.writeStream.format("console").option("truncate","false").start().awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+----------+---------+-------+--------+----------+
|temperature|message_no|timestamp|average|humidity|fahrenheit|
+-----------+----------+---------+-------+--------+----------+
|null |null |null |null |null |null |
|null |null |null |null |null |null |
|null |null |null |null |null |null |
|null |null |null |null |null |null |
|null |null |null |null |null |null |
|null |null |null |null |null |null |
|null |null |null |null |null |null |
Thanks for any suggestion.
解决方案
推荐阅读
- c# - 如何在 UWP 中更改特殊字符的颜色
- amazon-web-services - 如何在 Route53 中将多个区域 Load-Balancers Alias 记录设置为单个名称?
- php - PHP在迭代剧集作者对象时将唯一的用户数组添加到新创建的数组
- python - 饼图未在烧瓶应用程序中正确呈现
- node.js - findall include 里面 include by use Sequelize
- sql-server - SQL Server 代理找不到备份作业的证书
- cypress - 赛普拉斯测试 - 用于 URL 匹配
- c++ - 多个类的通用调度程序
- javascript - 如何使用 get 方法从 response.data 中提取道具
- facebook - 无法使用 Facebook Graph API 阻止用户访问页面