apache-spark - 是否可以使用 Spark 结构化流中的 foreachBatch 将两个不相交的数据集写入数据同步?
问题描述
我正在尝试将数据从单一来源写入多个 DataSink(Mongo 和 Postgres DB)。传入数据
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
Dataset<Row> personalDetails = df.selectExpr("name", "id", "age");
personalDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "PI").save();
}).start();
Dataset<Row> salDetails = df.selectExpr("basicSal", "bonus");
salDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "SAL").save();
}).start();
问题是,我可以看到 Spark 正在打开两个 Streams 并两次读取相同的事件。是否可以读取一次并应用不同的转换并写入不同的集合?
解决方案
您应该缓存 DataFrame。见这里:
写入多个位置 - 如果要将流式查询的输出写入多个位置,则可以简单地多次写入输出 DataFrame/Dataset。但是,每次写入尝试都可能导致重新计算输出数据(包括可能重新读取输入数据)。为避免重新计算,您应该缓存输出 DataFrame/Dataset,将其写入多个位置,然后取消缓存。
他们的例子:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
您可以将所有代码放在一个中foreachBatch
,并将数据帧写入您的 2 个接收器。您可以通过缓存数据帧并selectExpr
在此缓存的数据帧上执行并保存它来做到这一点。
作为旁注- 请注意,在任何情况下,如果您想要“全有或全无”(即您不希望您写给 mongo 而不是 postgres 的情况),您必须只使用一个foreachBatch
,否则(如果你有 2 foreachBatch
,就像你的问题一样)你有 2 个独立的批次 - 对于相同的数据,一个可能会在另一个成功时失败。
推荐阅读
- java - java.lang.IllegalArgumentException:重复的服务器名称类型 0
- php - 将自定义日期范围字段添加到 WooCommerce Admin 单个产品
- jmeter - Jmeter 5.4 - 线程 Thread[AWT-EventQueue-0,6,main] 中的 java.lang.NullPointerException
- python - 在 Django 的一个地方从多个模型中获取 N 个最近的对象
- backup - 在 Nas Synology 上监控 HyperBackup、XOA 备份
- php - Sql如何使用位置参数限制查询?
- sql - 在 Oracle 中通过 SQL 查询更改 JSON 层次结构
- postgresql - Delete .. from .. join,postgresql 和 oracle 的语法相同
- java - Android Studio 移除 ActionBar
- java - 如何使同一域内的所有Android项目都以相同的结构创建?