首页 > 解决方案 > 无法解析带有流源的查询必须使用 writeStream.start() 结构化 Spark Streaming - Pyspark 执行

问题描述

我正在尝试在 spark 中读取来自 kafka 的消息并打印出来。我正在尝试从数据框转换为列表以处理系统请求。看起来我的 toPandas() 函数有问题。我真的很感激这个帮助。

df_index = df.select(df.Name).toPandas()['Name']

df = df.drop('Name')

model = model()
pre_data = model.transform(df)

# convert dataframe to list

df_age = pre_data.select(pre_data.Age).toPandas()['Age']

# create struct
data = sqlContext.createDataFrame(zip(df_index, df_age), schema=['Name', 'Age'])

# create list row flow
data_list = data.select(data.Name, data.Age).toPandas()

checkAnomaly(data_list)

net_stream = data \
    .writeStream \
    .trigger(processingTime="30 seconds") \
    .format("console") \
    .start()
    
net_stream.awaitTermination()

标签: pysparkspark-structured-streaming

解决方案


推荐阅读