python - 如何在结构化流中获取 DataFrame?
问题描述
我想从 MQTT 接收 JSON 字符串并将它们解析为 DataFrames df
。我该怎么做?
这是我发送到 MQTT 队列以便在 Spark 中处理的 Json 消息示例:
{
"id": 1,
"timestamp": 1532609003,
"distances": [2,5,7,8]
}
这是我的代码:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Test") \
.master("local[4]") \
.getOrCreate()
# Custom Structured Streaming receiver
reader = spark\
.readStream\
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\
.option("topic","uwb/distances")\
.option('brokerUrl', 'tcp://127.0.0.1:1883')\
.load()\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")
df = spark.read.json(reader.select("value").rdd)
# Start running the query that prints the running counts to the console
query = df \
.writeStream \
.format('console') \
.start()
query.awaitTermination()
但是这段代码失败了:
py4j.protocol.Py4JJavaError: An error occurred while calling o45.javaToPython.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
mqtt
我尝试添加start
如下:
df = spark.read.json(reader.select("value").rdd) \
.writeStream \
.format('console') \
.start()
但是得到了同样的错误。我的目标是获得一个df
可以进一步通过 ETL 流程的 DataFrame。
更新:
标记为答案的线程没有帮助我解决问题。首先,当我使用 PySpark 时,它为 Scala 提供了解决方案。其次,我测试了答案中提出的解决方案,它返回给我一个空列json
:
reader = spark\
.readStream\
.schema(spark.read.json("mqtt_schema.json").schema) \
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\
.option("topic","uwb/distances")\
.option('brokerUrl', 'tcp://127.0.0.1:1883')\
.load()\
.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS STRING)")
json_schema = spark.read.json("mqtt_schema.json").schema
df = reader.withColumn('json', from_json(col('value'), json_schema))
query = df \
.writeStream \
.format('console') \
.start()
解决方案
我想这是因为您的 df 没有流式传输。尝试一下怎么样
reader.select("value").writestream
推荐阅读
- javascript - 找出哪个键盘在javascript中生成了一个键事件
- graphql - 将 Apollo 客户端或同构 Fetch 与 GraphiQL 结合使用
- python-3.x - if 语句 get 被跳过,而只有 else 语句 get 被打印。我如何将字符串或 int 存储在单个变量中?
- python - 使用 request.session 填充 django 表单
- html - VBA Excel 运行时错误 438 / getElementbyClassName
- react-native - React native 中的平面纸阴影,如何实现?
- kotlin - 为什么使用while循环抛出未解决的引用错误,而for循环在kotlin中运行良好
- unity3d - 检查 Google Play 收据是否足以统一我的播放器偏好数据?
- javascript - 循环导入函数返回的数组?
- docker - POD 定义 - 部署到 DC/OS