首页 > 解决方案 > 是否可以使用 Spark Streaming SQL 实时解析来自 Kafka 主题的 JSON 字符串?

问题描述

我有一个 Pyspark 笔记本,它连接到 kafka 代理并创建一个名为 temp 的 spark writeStream。Kafka 主题中的数据值是 json 格式,但我不确定如何创建一个可以实时解析这些数据的 spark sql 表。我知道的唯一方法是创建表的副本,将其转换为 RDD 或 DF,然后将值解析为另一个 RDD 和 DF。在写入流时是否可以在实时处理中完成此操作?

代码:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers","localhost:9092") \
    .option("subscribe","hoteth") \
    .option("startingOffsets", "earliest") \
    .load()

ds = df.selectExpr("CAST (key AS STRING)", "CAST(value AS STRING)", "timestamp")
ds.writeStream.queryName("temp").format("memory").start()
spark.sql("select * from temp limit 5").show()

输出:

+----+--------------------+--------------------+
| key|               value|           timestamp|
+----+--------------------+--------------------+
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
+----+--------------------+--------------------+

标签: apache-sparkpysparkapache-spark-sqlspark-streaming

解决方案


我可以解决这个问题的一种方法是像在 Hive HQL 中那样横向查看 json_tuple。我仍在寻找一种解决方案,它可以直接从流中解析数据,这样就不会花费额外的处理时间来使用查询进行解析。

spark.sql("""
    select value, v1.transaction,ticker,price
    from temp 
    lateral view json_tuple(value,"e","s","p") v1 as transaction, ticker,price
    limit 5
    """).show()

推荐阅读