首页 > 解决方案 > 必须使用 writeStream.start() 执行带有流源的查询;pyspark

问题描述

尝试从 kafka 读取消息时遇到问题,出现以下异常“必须使用 writeStream.start() 执行带有流式源的查询;”

这是我的代码:

from dataclasses import dataclass
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

@dataclass
class DeviceData:
    device: str
    temp: float
    humd: float
    pres: float
    

spark:SparkSession = SparkSession.builder \
    .master("local[1]") \
    .appName("StreamHandler") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

inputDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "weather") \
    .load()

rawDF = inputDF.selectExpr("CAST(value AS STRING)")

df_split = inputDF.select(f.split(inputDF.value, ",")) \
    .rdd.Map(lambda x: DeviceData(x[0], x[1], x[2], x[3])) \
    .toDF(schema=['device', 'temp', 'humd', 'pres'])

summaryDF = df_split.groupBy('device') \
    .agg(f.avg('temp'), f.avg('humd'), f.avg('pres'))

query = summaryDF.writeStream.format('console').outputMode('update').start()
query.awaitTermination()

标签: apache-sparkpysparkapache-kafkaspark-streaming

解决方案


推荐阅读