apache-spark - 如何在 Spark Structured Streaming 中指定批处理间隔?
问题描述
我正在通过 Spark Structured Streaming 并遇到问题。
在 StreamingContext、DStreams 中,我们可以定义一个批处理间隔,如下所示:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5) # 5 second batch interval
如何在结构化流中做到这一点?
我的流媒体是这样的:
sparkStreaming = SparkSession \
.builder \
.appName("StreamExample1") \
.getOrCreate()
stream_df = sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").option("maxFilesPerTrigger", 1).\
csv("C:/sparkStream")
sql1 = stream_df.groupBy("col0").sum("col1")
query = sql1.writeStream.queryName("stream1").outputMode("complete").format("memory").start()
此代码按预期工作,但是,如何/在哪里定义批处理间隔?
我是结构化流媒体的新手,请指导我。
解决方案
tl;dr使用trigger(...)
(在 上DataStreamWriter
,即在 之后writeStream
)
这是一个很好的来源https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html。
有多种选择,如果您不设置批处理间隔,Spark 将在处理完最后一批后立即查找数据。触发器就是这里。
从手册:
流式查询的触发设置定义了流式数据处理的时间,查询是作为具有固定批处理间隔的微批处理查询还是作为连续处理查询执行。
一些例子:
默认触发器(尽快运行微批处理)
df.writeStream \
.format("console") \
.start()
具有两秒微批处理间隔的 ProcessingTime 触发器
df.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()
一次性触发
df.writeStream \
.format("console") \
.trigger(once=True) \
.start()
具有一秒检查点间隔的连续触发
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()
推荐阅读
- javascript - 想要使用输入字段更改视频 src
- javascript - 在入口点自动加载块
- perl - 在 for/while 循环中通过 HTTP POST 查询重用 LWP Useragent 对象
- javascript - 解析
作为字符串 HTML 一部分的标签 - javascript - 如何从特定数组中添加/求和所需的数组索引?
- python - 无法在 python3 中正确使用 unpack()
- macos - /bin/zsh 在 launchd 运行时无法打开脚本
- ruby-on-rails - = javascript_include_tag "application" 给出错误 ExecJS::RuntimeError at / SyntaxError: [stdin]:1:1: unexpected //=
- javascript - 当我调用 array.length 时,数组长度不正确
- python - iterrows(), itertuples() 使用不同的行值迭代 pandas 数据帧