java - 使用相同数据集进行迭代和写入时出现问题
问题描述
执行以下代码时出现错误
必须使用 writeStream.start() 执行带有流源的查询
代码:
SparkSession ss = SparkSession.builder().config(this.sparkConf).getOrCreate();
ss.sparkContext().setLogLevel("ERROR");
Dataset<Row> rsvpDT = ss.readStream().format(KafkaConstants.STREAM_FORMAT)
.option("kafka.bootstrap.servers", KafkaConstants.KAFKA_BROKERS)
.option("subscribe", KafkaConstants.KAFKA_TOPIC).option("failOnDataLoss", false).load();
for(Iterator<Row> iter = rsvpDT.toLocalIterator(); iter.hasNext();) {
String item = (iter.next()).toString();
System.out.println("********************************"+item.toString()+ "*******************************");
}
StreamingQuery query = rsvpDT.writeStream().outputMode(OutputMode.Update()).format("console")
.option("path", KafkaConstants.CHECKPOINT_LOCATION)
.option("checkpointLocation", KafkaConstants.CHECKPOINT_LOCATION).option("truncate", false).start();
query.awaitTermination();
ss.stop();
为什么会发生?我不能同时将同一个数据集用于两个目的吗?
解决方案
推荐阅读
- python - POST 请求没有被定向到 Flask 中的 POST 路由
- c++ - 在线编译器和 VScode Insiders 上的分段错误让我 [Done] exited with code=3221225725
- python - AWS Lambda 格式的 JSON Schema 如何检查特定日期时间模式的字符串?
- c - 插入 B 树
- node.js - 升级到节点 14 导致:TypeError [ERR_INVALID_ARG_TYPE]:“数据”参数必须是字符串类型或缓冲区实例
- javascript - 使用 Javascript 将数字格式转换为 WORDS 格式包括任何数字
- flutter - 从 gridview 中选择多种颜色时需要更改 CircleBorder 颜色
- javascript - 是否有用于选择选项的 javascript 代码
- c# - 在 Xamarin Android 应用程序上的网络还原时出现“网络无法访问”错误
- join - 如何将一个表与另一个表连接,然后计算非空列并将它们按另外两个字段分组?