首页 > 解决方案 > Pyspark 流构造历史数据

问题描述

我在用 :

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 10)
lines = ssc.socketTextStream("host", port)
lines.saveAsTextFiles("path").f

我有很多保存的文件,我想用它们来创建像 LogisticRegressionclassifier、DecisionTreeClassifier 这样的模型......

我所做的是:

allfiles =  spark.read.option("header","false").csv("home/-*/part-*")
allfiles.coalesce(1).write.format("csv").option("header", "false").save("newPath")

然后我用:

my_schema = tp.StructType([ tp.StructField(name= 'id', dataType= tp.StringType(), nullable= True), tp.StructField(name= 'tweet', dataType= tp.StringType(), nullable= True), tp.StructField(name= 'label', dataType= tp.StringType(), nullable= True) ])

my_data = spark.read.csv("newfile.csv",
                         schema=my_schema,
                         header=True)

我的第一个问题是:它是处理流数据的正确方法吗(在这个加载的数据上创建模型,然后在实时流上使用它们?)我的第二个问题是:我加载 csv 文件时得到的数据非常混乱。它包含损坏的数据。有没有办法在它被加载到我的目录之前更正这个日期?还是我应该用其他方式清洁它们?

标签: pysparkspark-streaming

解决方案


推荐阅读