pyspark - Pyspark 流构造历史数据
问题描述
我在用 :
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)
lines = ssc.socketTextStream("host", port)
lines.saveAsTextFiles("path").f
我有很多保存的文件,我想用它们来创建像 LogisticRegressionclassifier、DecisionTreeClassifier 这样的模型......
我所做的是:
allfiles = spark.read.option("header","false").csv("home/-*/part-*")
allfiles.coalesce(1).write.format("csv").option("header", "false").save("newPath")
然后我用:
my_schema = tp.StructType([ tp.StructField(name= 'id', dataType= tp.StringType(), nullable= True), tp.StructField(name= 'tweet', dataType= tp.StringType(), nullable= True), tp.StructField(name= 'label', dataType= tp.StringType(), nullable= True) ])
my_data = spark.read.csv("newfile.csv",
schema=my_schema,
header=True)
我的第一个问题是:它是处理流数据的正确方法吗(在这个加载的数据上创建模型,然后在实时流上使用它们?)我的第二个问题是:我加载 csv 文件时得到的数据非常混乱。它包含损坏的数据。有没有办法在它被加载到我的目录之前更正这个日期?还是我应该用其他方式清洁它们?
解决方案
推荐阅读
- corda - 在 Corda 中运行回溯事务
- sql - SQL UPDATE 语句抛出错误“无效使用组函数”
- python-3.x - 每次我运行这段代码时,它都会说 numpy.ndarray 没有属性“索引”
- etl - NiFi 控制器服务重用和模式注册表架构
- python - 我怎样才能找到数组中不变的点?
- reactjs - 如何将 React 服务作为单例、可注入和 Redux 连接
- python - 如何找到它们自己的行和列中最大的值?
- amazon-web-services - 修改 S3 存储桶后 CloudFront 不再工作
- python - 如何在 new_comment 页面上显示帖子名称
- javascript - 当状态更新为与 redux 相同的值时,组件未更新