apache-spark - 如何在单个 Spark 作业中调用多个 writeStream 操作?
问题描述
我正在尝试编写一个 Spark Structured Streaming 作业,该作业从 Kafka 主题读取并通过writeStream
操作写入单独的路径(在执行一些转换之后)。但是,当我运行以下代码时,只有第一个writeStream
被执行,第二个被忽略。
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start() \
.awaitTermination()
// transform df to df2
write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start() \
.awaitTermination()
我最初认为我的问题与这篇文章有关,但是,在将我的代码更改为以下内容后:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start()
// transform df to df2
write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start()
write_one.awaitTermination()
write_two.awaitTermination()
我收到以下错误:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
我不确定为什么 和 之间的附加代码start()
会awaitTermination()
导致上面的错误(但我认为这可能是一个单独的问题,在上面同一篇文章的答案中引用了这个问题)。writeStream
在同一个作业中调用多个操作的正确方法是什么?最好将两个写入都包含在由调用的函数中,foreachBatch
还是有更好的方法来实现这一点?
解决方案
Spark 文档说,如果您需要对多个位置执行写入操作,则需要使用foreachBatch
方法。
您的代码应类似于:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
注意:persist
需要以防止重新计算。
推荐阅读
- amazon-web-services - 调用 SSH 时 Shell 脚本停止
- elasticsearch - Elasticsearch 对过滤条件组进行“或”搜索
- c++ - 部分模板特化:std::allocator_traits?
- homebrew - 尝试在 MacOS 上更新 Homebrew 但收到错误消息“LibreSSL 错误”
- json - 阅读 REST API JSON 回复
- leaflet - 如何以编程方式获取 PixiOverlay 标记并通过绘制的边界获取它们的属性
- c# - 如何将“System.Windows.Controls.TextBlock”转换为“System.Windows.Controls.Control”WPF C#
- windows - UWP 画布像素操作
- r - 带变量 R 的 sum 函数
- html - 使用 django 表单时如何为标签标签启用 id?