pyspark - 我可以将流“分支”成许多流并在 pyspark 中并行写入它们吗?
问题描述
我在 pyspark 中接收 Kafka 流。目前我正在按一组字段对其进行分组并将更新写入数据库:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
.option("subscribe", topic)
...
df = df \
.groupBy("myfield1") \
.agg(
expr("count(*) as cnt"),
min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
) \
.select("cnt", "minData.*") \
.select(
col("...").alias("..."),
...
col("userId").alias("user_id")
query = df \
.writeStream \
.outputMode("update") \
.foreachBatch(lambda df, epoch: write_data_frame(table_name, df, epoch)) \
.start()
query.awaitTermination()
我可以在中间使用相同的链并创建另一个分组,例如
df2 = df \
.groupBy("myfield2") \
.agg(
expr("count(*) as cnt"),
min(struct(col("mycol.myfield").alias("mmm"), col("*"))).alias("minData")
) \
.select("cnt", "minData.*") \
.select(
col("...").alias("..."),
...
col("userId").alias("user_id")
并将它的输出并行写入不同的地方?
在哪里打电话writeStream
和awaitTermination
?
解决方案
是的,您可以将 Kafka 输入流分支到任意数量的流查询中。
您需要考虑以下几点:
query.awaitTermination
是一种阻塞方法,这意味着您在此方法之后编写的任何代码都不会在此query
终止之前执行。- 每个“分支”流式查询都将并行运行,并且在每个 writeStream 调用中定义检查点位置很重要。
总体而言,您的代码需要具有以下结构:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
.option("subscribe", topic) \
.[...]
# note that I changed the variable name to "df1"
df1 = df \
.groupBy("myfield1") \
.[...]
df2 = df \
.groupBy("myfield2") \
.[...]
query1 = df1 \
.writeStream \
.outputMode("update") \
.option("checkpointLocation", "/tmp/checkpointLoc1") \
.foreachBatch(lambda df, epoch: write_data_frame(table_name, df1, epoch)) \
.start()
query2 = df2 \
.writeStream \
.outputMode("update") \
.option("checkpointLocation", "/tmp/checkpointLoc2") \
.foreachBatch(lambda df, epoch: write_data_frame(table_name, df2, epoch)) \
.start()
spark.streams.awaitAnyTermination
只是补充一点:在您显示的代码中,您正在覆盖df
,因此推导df2
可能无法获得预期的结果。
推荐阅读
- c - 在 C 中使用 va_arg 搜索键
- c# - 在 OVH 域上发布部署 ASP.NET Core MVC 站点
- node.js - 是否有 Json 格式的 ERD 表?
- c++ - 未定义引用“std::thread::_State@GLIBCXX_3.4.22 的 typeinfo”的原因?
- powerapps - 在没有高级计划的情况下使用 Web 服务是否有其他选择?
- python - ValueError:视图 users.views.logout_user 没有返回 HttpResponse 对象。它返回 None 而不是
- git - 只读存在的远程 git 分支
- python - PIL PNG与Gif背景
- java - JDK 7 RECV TLSv1 ALERT:致命,handshake_failure
- python - 如何使用 Windows 10 中的 python (3.8) 脚本在 Cygwin 中运行程序?