apache-spark - 必须使用 writeStream.start() 执行带有流源的查询;pyspark
问题描述
尝试从 kafka 读取消息时遇到问题,出现以下异常“必须使用 writeStream.start() 执行带有流式源的查询;”
这是我的代码:
from dataclasses import dataclass
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
@dataclass
class DeviceData:
device: str
temp: float
humd: float
pres: float
spark:SparkSession = SparkSession.builder \
.master("local[1]") \
.appName("StreamHandler") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
inputDF = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "weather") \
.load()
rawDF = inputDF.selectExpr("CAST(value AS STRING)")
df_split = inputDF.select(f.split(inputDF.value, ",")) \
.rdd.Map(lambda x: DeviceData(x[0], x[1], x[2], x[3])) \
.toDF(schema=['device', 'temp', 'humd', 'pres'])
summaryDF = df_split.groupBy('device') \
.agg(f.avg('temp'), f.avg('humd'), f.avg('pres'))
query = summaryDF.writeStream.format('console').outputMode('update').start()
query.awaitTermination()
解决方案
推荐阅读
- android - 尝试在 Flutter 中使用多个模拟器进行编译时出现 Gradle 错误
- python - 模型性能的皮尔逊相关系数
- json - 在 Swift 5 中使用泛型解析 JSON - 本地文件与 URL
- django - 我只想滑动,但我不知道如何
- php - 无法让我的下拉信息以这种格式 MM/DD/YYYY 从数据库中正确显示并按顺序显示
- c# - 无法使用 VS Code 在 WebAPI、DTO、AutoMapper 和 .NET Core 中检索 EF 外键值
- facebook-graph-api - facebook 的活跃用户是什么意思?
- ruby-on-rails - Rails:如何从一个 .yml 文件中获取对第二个 .yml 文件的引用
- reactjs - 我可以在 react-pivottable 中为表值应用自定义格式吗?
- ruby - Phusion 乘客无法生成应用程序 - ruby on rails 4.2.8 ruby 2.5.1