java - 例外:不支持完整输出模式
问题描述
我为我的教程创建了 sparkStreaming Simulation。当我执行 outputMode(“完成”)操作时,出现错误。
错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;
我的数据集示例:
2006-04-01 00:00:00.000 +0200,Partly Cloudy,rain,9.472222222222221,7.3888888888888875,0.89,14.1197,251.0,15.826300000000002,0.0,1015.13,Partly cloudy throughout the day.
第一个进程代码(Partition(summary)):
System.setProperty("hadoop.home.dir","C:\\hadoop-common-2.2.0-bin-master");
SparkSession sparkSession = SparkSession.builder()
.appName("SparkStreamingMessageListener")
.master("local")
.getOrCreate();
StructType schema = new StructType()
.add("Formatted Date", "String")
.add("Summary","String")
.add("Precip Type", "String")
.add("Temperature", "Double")
.add("Apparent Temperature", "Double")
.add("Humidity","Double")
.add("Wind Speed (km/h)","Double")
.add("Wind Bearing (degrees)","Double")
.add("Visibility (km)","Double")
.add("Loud Cover","Double")
.add("Pressure(milibars)","Double")
.add("Dailiy Summary","String");
Dataset<Row> formatted_date = sparkSessionDataFrame.read().schema(schema).option("header", true).csv("C:\\Users\\Kaan\\Desktop\\Kaan Proje\\SparkStreamingListener\\archivecsv\\weatherHistory.csv");
Dataset<Row> avg = formatted_date.groupBy("Summary", "Precip Type").avg("Temperature").sort(functions.desc("avg(Temperature)"));
formatted_date.write().partitionBy("Summary").csv("C:\\Users\\Kaan\\Desktop\\Kaan Proje\\SparkStreamingListener\\archivecsv\\weatherHistoryFile\\");
第二个监听进程代码:
SparkSession sparkSession = SparkSession.builder()
.appName("SparkStreamingMessageListener1")
.master("local")
.getOrCreate();
StructType schema1 = new StructType()
.add("Formatted Date", "String")
.add("Precip Type", "String")
.add("Temperature", "Double")
.add("Apparent Temperature", "Double")
.add("Humidity","Double")
.add("Wind Speed (km/h)","Double")
.add("Wind Bearing (degrees)","Double")
.add("Visibility (km)","Double")
.add("Loud Cover","Double")
.add("Pressure(milibars)","Double")
.add("Dailiy Summary","String");
Dataset<Row> rawData = sparkSession.readStream().schema(schema1).option("sep", ",").csv("C:\\Users\\Kaan\\Desktop\\Kaan Proje\\sparkStreamingWheather\\*");
Dataset<Row> heatData = rawData.select("Temperature", "Precip Type").where("Temperature>10");
StreamingQuery start = heatData.writeStream().outputMode("complete").format("console").start();
start.awaitTermination();
我通过将分区文件复制到指定的侦听器文件路径来创建流模拟。如果你能帮忙,我会很高兴。谢谢。
解决方案
该错误非常具体地说明了实际问题是什么:查询类型不支持输出模式完成。
如关于OutputeModes的结构化流式传输指南中所述:
“不支持完整模式,因为在结果表中保留所有未聚合的数据是不可行的。”
选择附加模式时,此问题将得到解决:
StreamingQuery start = heatData.writeStream().outputMode(cappend").format("console").start()
推荐阅读
- sql-server - Spring Data JPA 异常:数据库中已经有一个名为“disease_view”的对象
- tensorflow - 如何创建一个 Keras 层来进行 4D 卷积(Conv4D)?
- ios - Xcode 11.3.1 没有将二进制第三方框架嵌入到包 (IPA)
- ios - Firestore分页查询未返回所有结果
- asp.net - 如何在 TextMode="Date" 中首先插入“日”而不是月 - ASP.NET C#
- php - 如何在不知道键名的情况下回显关联数组的第一个键值
- java - 我的数组成员为空,即使它们已初始化
- javascript - JS 秒计数计时器通过单击按钮停止
- python - BigQuery GCP Python 集成
- typescript - 无法使用 Fetch TYPESCRIPT POST 多部分/表单数据