apache-spark - 是否可以使用 Spark Streaming SQL 实时解析来自 Kafka 主题的 JSON 字符串?
问题描述
我有一个 Pyspark 笔记本,它连接到 kafka 代理并创建一个名为 temp 的 spark writeStream。Kafka 主题中的数据值是 json 格式,但我不确定如何创建一个可以实时解析这些数据的 spark sql 表。我知道的唯一方法是创建表的副本,将其转换为 RDD 或 DF,然后将值解析为另一个 RDD 和 DF。在写入流时是否可以在实时处理中完成此操作?
代码:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","localhost:9092") \
.option("subscribe","hoteth") \
.option("startingOffsets", "earliest") \
.load()
ds = df.selectExpr("CAST (key AS STRING)", "CAST(value AS STRING)", "timestamp")
ds.writeStream.queryName("temp").format("memory").start()
spark.sql("select * from temp limit 5").show()
输出:
+----+--------------------+--------------------+
| key| value| timestamp|
+----+--------------------+--------------------+
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
|null|{"e":"trade","E":...|2018-09-18 15:41:...|
+----+--------------------+--------------------+
解决方案
我可以解决这个问题的一种方法是像在 Hive HQL 中那样横向查看 json_tuple。我仍在寻找一种解决方案,它可以直接从流中解析数据,这样就不会花费额外的处理时间来使用查询进行解析。
spark.sql("""
select value, v1.transaction,ticker,price
from temp
lateral view json_tuple(value,"e","s","p") v1 as transaction, ticker,price
limit 5
""").show()
推荐阅读
- ms-access - 数据管道可以在 Access 或 SAS 过程中吗?定义
- android - Android如何将触摸事件坐标发送到另一个不同尺寸和密度的屏幕并保留它?
- shell - Shellscript 打开 csvfile 并打印第 4 列,如果该字段为 null,则打印 null
- asp.net - 如何将数据存储在内存中并由其他按钮单击事件使用来显示数据?
- amazon-web-services - AWS 目的地不工作,我不知道为什么
- jython - 带有 jython 脚本的 wsadmin 给出错误 AttributeError: 'javapackage' object has no attribute 'SEC_SEGMENT'
- python - Python 3 - 'While loop' 验证用户输入是否在 2 个数字之间
- jquery - 使用键盘进行选择时的事件
- python - 如何为 jupyter notebook 手动安装 chromium?
- angular - PrimeNG 主题字体覆盖 Angular 应用程序字体