首页 > 解决方案 > 我可以将流“分支”成许多流并在 pyspark 中并行写入它们吗?

问题描述

我在 pyspark 中接收 Kafka 流。目前我正在按一组字段对其进行分组并将更新写入数据库:

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
        .option("subscribe", topic)

...

df = df \
        .groupBy("myfield1") \
        .agg(
            expr("count(*) as cnt"),
            min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
        ) \
        .select("cnt", "minData.*") \
        .select(
            col("...").alias("..."),
            ...
            col("userId").alias("user_id")

query = df \
        .writeStream \
        .outputMode("update") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df, epoch)) \
        .start()

query.awaitTermination()

我可以在中间使用相同的链并创建另一个分组,例如

df2 = df \
        .groupBy("myfield2") \
        .agg(
            expr("count(*) as cnt"),
            min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
        ) \
        .select("cnt", "minData.*") \
        .select(
            col("...").alias("..."),
            ...
            col("userId").alias("user_id")

并将它的输出并行写入不同的地方?

在哪里打电话writeStreamawaitTermination

标签: pysparkapache-kafkaspark-structured-streaming

解决方案


是的,您可以将 Kafka 输入流分支到任意数量的流查询中。

您需要考虑以下几点:

  1. query.awaitTermination是一种阻塞方法,这意味着您在此方法之后编写的任何代码都不会在此query终止之前执行。
  2. 每个“分支”流式查询都将并行运行,并且在每个 writeStream 调用中定义检查点位置很重要。

总体而言,您的代码需要具有以下结构:

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
        .option("subscribe", topic) \
        .[...]

# note that I changed the variable name to "df1"
df1 = df \
    .groupBy("myfield1") \
    .[...]

df2 = df \
    .groupBy("myfield2") \
    .[...]


query1 = df1 \
        .writeStream \
        .outputMode("update") \
        .option("checkpointLocation", "/tmp/checkpointLoc1") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df1, epoch)) \
        .start()

query2 = df2 \
        .writeStream \
        .outputMode("update") \
        .option("checkpointLocation", "/tmp/checkpointLoc2") \
        .foreachBatch(lambda df, epoch: write_data_frame(table_name, df2, epoch)) \
        .start()

spark.streams.awaitAnyTermination

只是补充一点:在您显示的代码中,您正在覆盖df,因此推导df2可能无法获得预期的结果。


推荐阅读