首页 > 解决方案 > pyspark 结构化流(2.4.2) - foreach 接收器

问题描述

我在 dataproc GCP 上运行 spark 2.4.2 并将状态聚合应用于流式 IOT 数据,水印为 1 天,如下所示:

csvDF = sqlcontext \
    .readStream \
    .option("sep", ",") \
    .option("checkpointLocation", "gs://bucket_name/checkpoint") \
    .schema(schema) \
    .csv(bucket_path)

df_aggregated = csvDF \
                .withWatermark("date_time","1 day") \
                .groupBy(
                    csvDF.unique_device_id) \
                .agg(
                    sum(col('overall_measure1')),
                    sum(col('overall_measure2')),
                    sum(col('overall_measure3'))
                   )

def process_row(row):
          if row['overall_measure1'] >= 10000 : 
               #Write a file with a custom message to the gcs bucket
          print(row)
          pass

query = (

        df_aggregated.writeStream \
        .foreach(process_row)
        .outputMode("complete")
        .start()
    )

我的目标是获取每一行的总和值,并检查是否有任何值(overall_measure1、overall_measure2、overall_measure3)超过某个值,比如说 10000。如果是这样,我希望能够向我的GCS 存储桶。我已经尝试过以下方法,任何地方都没有足够的文档,所以如果有人能让我知道如何去做,我将不胜感激。

面临的问题- 我无法使用 foreach 接收器捕获值,我的问题不是关于如何将自定义文件写入 gcs 存储桶。

标签: pysparkpyspark-sqlspark-structured-streaming

解决方案


推荐阅读