pyspark - pyspark 结构化流(2.4.2) - foreach 接收器
问题描述
我在 dataproc GCP 上运行 spark 2.4.2 并将状态聚合应用于流式 IOT 数据,水印为 1 天,如下所示:
csvDF = sqlcontext \
.readStream \
.option("sep", ",") \
.option("checkpointLocation", "gs://bucket_name/checkpoint") \
.schema(schema) \
.csv(bucket_path)
df_aggregated = csvDF \
.withWatermark("date_time","1 day") \
.groupBy(
csvDF.unique_device_id) \
.agg(
sum(col('overall_measure1')),
sum(col('overall_measure2')),
sum(col('overall_measure3'))
)
def process_row(row):
if row['overall_measure1'] >= 10000 :
#Write a file with a custom message to the gcs bucket
print(row)
pass
query = (
df_aggregated.writeStream \
.foreach(process_row)
.outputMode("complete")
.start()
)
我的目标是获取每一行的总和值,并检查是否有任何值(overall_measure1、overall_measure2、overall_measure3)超过某个值,比如说 10000。如果是这样,我希望能够向我的GCS 存储桶。我已经尝试过以下方法,任何地方都没有足够的文档,所以如果有人能让我知道如何去做,我将不胜感激。
面临的问题- 我无法使用 foreach 接收器捕获值,我的问题不是关于如何将自定义文件写入 gcs 存储桶。
解决方案
推荐阅读
- ubuntu - Ubuntu 主题更改
- reactjs - 如何避免导致未定义的嵌套文档的属性分配
- php - PHP:有没有办法过滤“原始”输入GET?
- aem - DefaultGetServlet 扩展 html 的无渲染器无法渲染资源 JcrNodeResource
- apache-httpasyncclient - HttpAsyncClient 5 | 处理 Gzip 内容作为响应的最佳方法
- windows - 视觉代码集成终端搞砸了
- nginx - 如何在lua(openresty)中使用kafka?
- reactjs - 对于基于 React 的表单,将文本输入元素作为 React 组件是否可以接受?
- amazon-web-services - 如何将 S3 存储桶文件发布到外部(非 AWS)云服务器?
- python - Pypsark:如何有条件地将函数应用于 Spark DataFrame 列并填充 Null 值