pyspark - 无法在 Spark 结构化流中写入聚合输出
问题描述
我刚刚开始使用 spark 结构化流,所以只是尝试一下。在汇总我的数据时;我无法将其写为 csv 文件。我尝试了以下不同的组合,但还没有实现写操作。
我的样本数据是
colum,values
A,12
A,233
B,232
A,67
B,5
A,89
A,100
作为流数据帧读取
userSchema = StructType([
StructField("colum", StringType()),
StructField("values", IntegerType())
])
line2 = spark \
.readStream \
.format('csv')\
.schema(userSchema)\
.csv("/data/location")
我正在做以下聚合计算
save=line2.groupBy("colum").count()
预期的输出是
+-----+-----+
|colum|count|
+-----+-----+
|B |2 |
|A |5 |
|colum|1 |
+-----+-----+
场景一:
save.writeStream.format("csv").queryName("a").outputMode("append").option("path", "/xyz/saveloc").option("checkpointLocation", "/xyz/chkptloc").start()
错误:当没有水印的流数据帧/数据集上有流聚合时,不支持附加输出模式;;
备注:由于数据中没有时间戳,因此无法添加水印。
场景二:
save.writeStream.format("csv").queryName("a").outputMode("complete").option("path", "/xyz/saveloc").option("checkpointLocation", "/xyz/chkptloc").start()
错误::org.apache.spark.sql.AnalysisException:数据源csv不支持完整输出模式;
场景 3:
save.writeStream.format("csv").queryName("a").outputMode("update").option("path", "/xyz/saveloc").option("checkpointLocation", "/xyz/chkptloc").start()
错误:org.apache.spark.sql.AnalysisException:数据源csv不支持更新输出模式;
场景四:
save.writeStream.format("parquet").queryName("a").outputMode("update").option("path", "/xyz/saveloc").option("checkpointLocation", "/xyz/chkptloc"").start()
错误:org.apache.spark.sql.AnalysisException:数据源拼花不支持更新输出模式;
场景 5:
save.writeStream.format("console").queryName("a").outputMode("complete").option("path", "/xyz/saveloc").option("checkpointLocation", "/xyz/chkptloc"").start()
评论:在该位置没有生成输出。
场景 6:
save.writeStream.format("memory").queryName("a").outputMode("complete").option("path", "/xyz/saveloc").option("checkpointLocation", "/xyz/chkptloc"").start()
评论:没有产生输出。
场景 7:
save.writeStream.format("memory").queryName("a").outputMode("update").option("path", "/xyz/saveloc").option("checkpointLocation", "/xyz/chkptloc"").start()
评论:没有产生输出。
请建议我适当的配置。
解决方案
推荐阅读
- python-3.x - 无法从另一个源文件夹导入 Cython 模块
- php - 尝试将其从图像更改为按钮,当我将其更改为看起来像按钮时,我失去了功能
- c++ - 将最后 N 个字节移动到数组开头的最快方法?
- performance - 并行 Julia 代码的性能问题
- batch-file - 批处理文件(cmd):删除目录名称中的最后一个“\”
- c# - 如何获取当前Windows登录用户全名而不是域名
- database - 烧瓶中的primary_key,可为空且唯一的意外参数?
- python - 让 pip 为位于本地的包找到正确的平台
- javascript - 如何在 Chrome 的网络选项卡中格式化 json?
- php - 使用存储库模式在 laravel 中使用“键”、“值”结构处理存储在数据库中的网站全局数据的最佳方法