apache-spark - Apache Spark:对流数据集进行连接操作后更新输出模式
问题描述
我正在尝试编写一个代码,它首先进行连接,然后进行聚合(groupby 和 count)。
我希望我的聚合阶段的输出是可更新的。下面是我使用的代码:
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testerIn")
.load().selectExpr("CAST(value AS STRING)").as[String]
val interimDF = df.join(df,"value")
val newDF = interimDF.groupBy("value").count().toJSON
newDF.writeStream.format("kafka").outputMode("update") .option("kafka.bootstrap.servers", "localhost:9092") . option("checkpointLocation","/path/to/directory")
.option("topic", "tester").start()
spark.streams.awaitAnyTermination()
此代码引发错误,因为 spark 中的流-流连接不支持更新模式:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported in Update output mode, only in Append output mode;;
现在,我完全理解了为什么 spark 会抛出这个错误,因为当我们加入时;更新模式几乎没有任何意义(因为只要输入中有任何新行,我们就会在输出中获得一个新行,因此追加)。
如果我在加入(interimDF)之后以追加模式将数据帧输出到 Kafka,然后从中读取并执行我的聚合阶段(newDF)并以更新模式将其写回其他流中,我的问题将得到解决。这正是我想做的,但我想避免中间写到 Kafka 阶段。有什么办法可能吗?我也愿意接受 hack-ish 解决方案或指向某人可能针对类似内容提出的拉取请求的链接。
解决方案
推荐阅读
- python - np.empty 与 np.zeros 的速度
- vba - 筛选数据透视表数据字段
- angular - 角路由器激活路由
- python - SSL: CERTIFICATE_VERIFY_FAILED Django 在 nginx 网络服务器上通过 uwsgi 运行时出错
- json - JSON 数据过滤器
- html - Papa Parse - 解析 CSV 文件后使用 JSON 填充表
- bash - Bash For循环在下一次迭代之前变回原始目录
- sql - “选择前 10 名”使用 SQLDBX
- javascript - CSS 样式效果未应用于第一张幻灯片,但适用于所有其他幻灯片
- c# - C# 使用文本框作为 streamwriter 的文件名