apache-spark - 如何在 pyspark 的结构化流作业中运行地图转换
问题描述
我正在尝试使用进行 REST API 调用的 map() 转换来设置结构化流作业。以下是详细信息:
(1)
df=spark.readStream.format('delta') \
.option("maxFilesPerTrigger", 1000) \
.load(f'{file_location}')
(2)
respData=df.select("resource", "payload").rdd.map(lambda row: put_resource(row[0], row[1])).collect()
respDf=spark.createDataFrame(respData, ["resource", "status_code", "reason"])
(3)
respDf.writeStream \
.trigger(once=True) \
.outputMode("append") \
.format("delta") \
.option("path", f'{file_location}/Response') \
.option("checkpointLocation", f'{file_location}/Response/Checkpoints') \
.start()
但是,我收到一个错误:必须在步骤 (2) 上使用 writeStream.start() 执行带有流式源的查询。
任何帮助将不胜感激。谢谢你。
解决方案
推荐阅读
- flutter - Dart:如何迭代嵌套 Map 中数组中的对象?
- python - Python use args/kwargs as json path parameters
- reactjs - Chrome Extensions: Script (Gapi) not loading properly in React app bundled by Webpack, how to fix?
- php - How to change a input value with PHP?
- scala - Expand Polygon using Scala
- c - 程序的错误输出,使用指向结构数组的指针
- python - 为什么 Pandas 抱怨 'n' 是 split 函数的无效关键字参数?
- geometry - what's the relationship between jaggedness and image resolution
- python - Flat file data cleanup with thousands or millions of records
- python - Pandas 数据框 groupby 使用独特的组合