pyspark - 无法解析带有流源的查询必须使用 writeStream.start() 结构化 Spark Streaming - Pyspark 执行
问题描述
我正在尝试在 spark 中读取来自 kafka 的消息并打印出来。我正在尝试从数据框转换为列表以处理系统请求。看起来我的 toPandas() 函数有问题。我真的很感激这个帮助。
df_index = df.select(df.Name).toPandas()['Name']
df = df.drop('Name')
model = model()
pre_data = model.transform(df)
# convert dataframe to list
df_age = pre_data.select(pre_data.Age).toPandas()['Age']
# create struct
data = sqlContext.createDataFrame(zip(df_index, df_age), schema=['Name', 'Age'])
# create list row flow
data_list = data.select(data.Name, data.Age).toPandas()
checkAnomaly(data_list)
net_stream = data \
.writeStream \
.trigger(processingTime="30 seconds") \
.format("console") \
.start()
net_stream.awaitTermination()
解决方案
推荐阅读
- java - 2个不同的Json对象被打印为响应,如何将它们分开并读取第二个对象中某个元素的值
- elasticsearch - 启动logstash的grok过滤器出错
- java - JPA 查询列规范
- cmake - 如何在 CMake 中指定依赖的库源文件路径
- sorting - 按日期自动排序
- python - Django 从模板中的序列化程序获取 ForeignKey 字段值
- java - 在 Spring Boot 中配置灵活的层次结构
- mysql - 根据最小时间 MYSQL 获取不同的数据
- r - R分组和计数亲和词典
- javascript - 在我的 Web 表单上单击提交按钮时,输入字段的验证错误怎么会显示,即使这些字段有值?