首页 > 解决方案 > spark streaming kafka 将字符串转换为 JSON

问题描述

我正在通过 MQTT 代理将传感器数据读取到 kafka 管道,并希望使用 spark 结构化流处理数据。

我已经测试了以下内容,并且能够将数据转换为 spark(到目前为止一切顺利)。

// create conf and streaming context: 
val conf = new SparkConf().setMaster("yarn").setAppName("sensor_example")
val ssc = new StreamingContext(sc, Seconds(1))

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.2.109:9092").option("subscribe", "test_message").option("startingOffsets", "earliest").load()

// cast the value from binary to string: 
val sensorDf = df.selectExpr("CAST(value AS STRING)")
val sensorOutput = sensorDf.as[String]   // do I need this?

    // write to the command line: 
sensorOutput.writeStream.format("console").option("truncate","false").start().awaitTermination()

上面代码的输出是:

Batch: 1
-------------------------------------------
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                            |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{
   "temperature": "28", 
   "message_no": "4727", 
   "timestamp": "2019-06-02 12:59:02.526809", 
   "average": "28.6454410831", 
   "humidity": "38", 
   "fahrenheit": "82"
}|
|{
   "temperature": "28", 
   "message_no": "4728", 
   "timestamp": "2019-06-02 12:59:08.276594", 
   "average": "28.6453045685", 
   "humidity": "38", 
   "fahrenheit": "82"
}|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

我需要将其转换为具有相应列结构的数据框。创建的对象是 DataFrame 或 DataSet(只有一列)。

scala> val sensorDf = df.selectExpr("CAST(value AS STRING)")
sensorDf: org.apache.spark.sql.DataFrame = [value: string]

scala> val sensorOutput = sensorDf.as[String]
sensorOutput: org.apache.spark.sql.Dataset[String] = [value: string]

scala> sensorOutput.printSchema
root
 |-- value: string (nullable = true)

但是,我找不到如何将其转换为具有由字符串字段定义的列和值的 DataFrame 的方法。架构应如下所示:

// create the schema: 
val sensorSchema = StructType(Array(
 StructField("temperature", IntegerType),
 StructField("message_no", IntegerType),
 StructField("timestamp", StringType),
 StructField("average", DoubleType),
 StructField("humidity", IntegerType),
 StructField("fahrenheit", IntegerType)
))

我已经尝试过:

scala> val dataDf = sensorDf.selectExpr("CAST(value AS STRING) as json").select(from_json($"json", schema=sensorSchema).as("data")).select("data.*")
dataDf: org.apache.spark.sql.DataFrame = [temperature: int, message_no: int ... 4 more fields]

scala> dataDf.printSchema()
root
 |-- temperature: integer (nullable = true)
 |-- message_no: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- average: double (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- fahrenheit: integer (nullable = true)

但输出是:

scala> dataDf.writeStream.format("console").option("truncate","false").start().awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+----------+---------+-------+--------+----------+
|temperature|message_no|timestamp|average|humidity|fahrenheit|
+-----------+----------+---------+-------+--------+----------+
|null       |null      |null     |null   |null    |null      |
|null       |null      |null     |null   |null    |null      |
|null       |null      |null     |null   |null    |null      |
|null       |null      |null     |null   |null    |null      |
|null       |null      |null     |null   |null    |null      |
|null       |null      |null     |null   |null    |null      |
|null       |null      |null     |null   |null    |null      |


Thanks for any suggestion.

标签: jsonapache-sparkapache-kafkaspark-structured-streaming

解决方案


推荐阅读