apache-spark - 如何以控制台格式编写具有不同数据帧的相同流?
问题描述
由于我是火花结构流式传输的新手,并且在一个简单的场景中面临问题:
我正在尝试用两个不同的数据帧编写一个流。
from pyspark.sql import functions as f
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "topic1") \
.option("failOnDataLoss", False)\
.option("startingOffsets", "earliest") \
.load()
data1 = df.filter(f.col('status') == 'true')
data2 = df.filter(f.col('status') == 'false')
data2 = data2.select(df.id,f.struct(df.col1, df.col2, df.col3).alias('value'))
data2 = data2.groupBy("id").agg(f.collect_set('value').alias('history'))
data1 = data1.writeStream.format("console").option("truncate", "False").trigger(processingTime='15 seconds').start()
data2 = data2.writeStream.format("console").option("truncate", "False").trigger(processingTime='15 seconds').start()
spark.streams.awaitAnyTermination()
我也遇到了同样的错误:
Traceback (most recent call last):
File "/home/adarshbajpai/Downloads/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark /sql/utils.py", line 63, in deco
File "/home/adarshbajpai/Downloads/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o186.start.
: org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [customerid#93L], [customerid#93L, collect_set(hist_value#278, 0, 0) AS histleadstatus#284]
+- Project [customerid#93L, named_struct(islaststatus, islaststatus#46, statusid, statusid#43, status, statusname#187, createdOn, statusCreatedDate#59, updatedOn, statusUpdatedDate#60) AS hist_value#278]
+- Filter (islaststatus#46 = 0)
我认为我不应该使用水印,因为我的流媒体没有延迟和任何延迟。
请建议!提前致谢。
解决方案
推荐阅读
- python - 从 Outlook REST API v1.0 中的基本身份验证迁移到 v2.0
- android - 全局对话框 - 如何?
- javascript - Vba to IE Click JavaScript button no Id
- wso2 - WSO2 - EI 6.2.0 - 使用 EI 配置 Oracle AQ JMS 数据库以向/从 Oracle AQ JMS 发布/使用消息
- linux - 使用 r 扩展时在 Linux 上无头运行 Netlogo Behaviorspace 时不会停止
- c# - 远程服务器返回错误:(415) Unsupported Media Type using API in asp.net
- php - 在php中从exec发送电子邮件
- java - Java - 不接受硬币的自动售货机
- java - 为什么反射 API 会为现有的枚举构造函数抛出 NoSuchMethodException?
- javascript - 如何获取传递给猫鼬模式构造函数的数据