首页 > 解决方案 > 使用相同数据集进行迭代和写入时出现问题

问题描述

执行以下代码时出现错误

必须使用 writeStream.start() 执行带有流源的查询

代码:

SparkSession ss = SparkSession.builder().config(this.sparkConf).getOrCreate();
ss.sparkContext().setLogLevel("ERROR");

Dataset<Row> rsvpDT = ss.readStream().format(KafkaConstants.STREAM_FORMAT)
        .option("kafka.bootstrap.servers", KafkaConstants.KAFKA_BROKERS)
        .option("subscribe", KafkaConstants.KAFKA_TOPIC).option("failOnDataLoss", false).load();


for(Iterator<Row> iter = rsvpDT.toLocalIterator(); iter.hasNext();) {
    String item = (iter.next()).toString();
    System.out.println("********************************"+item.toString()+ "*******************************");    
}

StreamingQuery query = rsvpDT.writeStream().outputMode(OutputMode.Update()).format("console")
        .option("path", KafkaConstants.CHECKPOINT_LOCATION)
        .option("checkpointLocation", KafkaConstants.CHECKPOINT_LOCATION).option("truncate", false).start();
query.awaitTermination();
ss.stop();

为什么会发生?我不能同时将同一个数据集用于两个目的吗?

标签: javaapache-sparkspark-streaming

解决方案


推荐阅读