首页 > 解决方案 > 如何在 Spark Structured Streaming 中指定批处理间隔?

问题描述

我正在通过 Spark Structured Streaming 并遇到问题。

在 StreamingContext、DStreams 中,我们可以定义一个批处理间隔,如下所示:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5) # 5 second batch interval

如何在结构化流中做到这一点?

我的流媒体是这样的:

sparkStreaming = SparkSession \
.builder \
.appName("StreamExample1") \
.getOrCreate()

stream_df = sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").option("maxFilesPerTrigger", 1).\
csv("C:/sparkStream")

sql1 = stream_df.groupBy("col0").sum("col1")
query = sql1.writeStream.queryName("stream1").outputMode("complete").format("memory").start() 

此代码按预期工作,但是,如何/在哪里定义批处理间隔?

我是结构化流媒体的新手,请指导我。

标签: apache-sparkpysparkspark-structured-streaming

解决方案


tl;dr使用trigger(...)(在 上DataStreamWriter,即在 之后writeStream


这是一个很好的来源https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

有多种选择,如果您不设置批处理间隔,Spark 将在处理完最后一批后立即查找数据。触发器就是这里。

从手册:

流式查询的触发设置定义了流式数据处理的时间,查询是作为具有固定批处理间隔的微批处理查询还是作为连续处理查询执行。

一些例子:

默认触发器(尽快运行微批处理)

df.writeStream \
  .format("console") \
  .start()

具有两秒微批处理间隔的 ProcessingTime 触发器

df.writeStream \
  .format("console") \
  .trigger(processingTime='2 seconds') \
  .start()

一次性触发

df.writeStream \
  .format("console") \
  .trigger(once=True) \
  .start()

具有一秒检查点间隔的连续触发

df.writeStream
  .format("console")
  .trigger(continuous='1 second')
  .start()

推荐阅读