首页 > 解决方案 > 如何在单个 Spark 作业中调用多个 writeStream 操作?

问题描述

我正在尝试编写一个 Spark Structured Streaming 作业,该作业从 Kafka 主题读取并通过writeStream操作写入单独的路径(在执行一些转换之后)。但是,当我运行以下代码时,只有第一个writeStream被执行,第二个被忽略。

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

write_one = df.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
  .start() \
  .awaitTermination()

// transform df to df2

write_two = df2.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
  .start() \
  .awaitTermination()

我最初认为我的问题与这篇文章有关,但是,在将我的代码更改为以下内容后:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

write_one = df.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
  .start() 

// transform df to df2 
 
write_two = df2.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
  .start()

write_one.awaitTermination()
write_two.awaitTermination()

我收到以下错误:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

我不确定为什么 和 之间的附加代码start()awaitTermination()导致上面的错误(但我认为这可能是一个单独的问题,在上面同一篇文章的答案中引用了这个问题)。writeStream在同一个作业中调用多个操作的正确方法是什么?最好将两个写入都包含在由调用的函数中,foreachBatch还是有更好的方法来实现这一点?

标签: apache-sparkpysparkspark-structured-streaming

解决方案


Spark 文档说,如果您需要对多个位置执行写入操作,则需要使用foreachBatch方法。

您的代码应类似于:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

注意:persist需要以防止重新计算。

您可以查看更多信息:http ://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch


推荐阅读