首页 > 解决方案 > 需要拆分数据集时如何加快 Pyspark 编程

问题描述

我现在遇到了一些问题。

#read data
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing1').getOrCreate()
sc=spark.sparkContext
df=spark.read.csv('file:///D:/t/*.csv',inferSchema=True,header=True,encoding='GBK')

df_empty=spark.createDataFrame(sc.emptyRDD(),t_schema)#pre-defined schema
#generate code list
codelist=df.select('stks').distinct().collect()
col_list=['stks','time']
df=df.orderBy(col_list,ascending=True)
from time import strftime, localtime
#For loops
for code in codelist:
    print('++++++++++++++++++++++++++')
    print(code)
    print(strftime("%Y-%m-%d %H:%M:%S", localtime()))
    df2=df.filter(df['stks']==code[0]).select('mkt','stks','time','price')
    df2_1=df2.toPandas()
    timelist=df2_1['time'].tolist()
    pricelist=df2_1['price'].tolist()
#Add flag----target computation
    flag_1=[]
    flag_2=[]
    for i in range(len(timelist)):
#        calculate 
        flag_1.append(calc_incre_2(timelist,pricelist,i,30,0.05))
        flag_2.append(calc_incre_2(timelist,pricelist,i,40,0.05))
    df2_1['flag_1']=flag_1
    df2_1['flag_2']=flag_2
    df2_2=sqlContext.createDataFrame(df2_1)
    df_empty=df_empty.union(df2_2)
#sub-function----------------------------
def calc_incre_2(timelist,pricelist,start_pos,secs_spec,incre_spec):
    i=start_pos
    flag=0
#    timelist=df2['时间'].tolist()
#    pricelist=df2['最新'].tolist()
    for t in range(len(timelist)-i):
        if (timelist[i+t]-timelist[i]).seconds>secs_spec:
            incre=(pricelist[i+t]-pricelist[i])/pricelist[i]
            if incre>=incre_spec:
                flag=1
            else:
                flag=0
            break
    return flag

我试图使用 pandas_udf,但它仍然无法工作。必须在这里或那里得到一些空的 RDD。

#read data
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing1').getOrCreate()
sc=spark.sparkContext
df=spark.read.csv('file:///D:/t/*.csv',inferSchema=True,header=True,encoding='GBK')
df1=df.select('mkt','stks','time','price')
#Using PandasUDF
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
#create the schema for the resulting dataframe
dfs_schema=StructType([StructField('mkt',StringType(),False),
                       StructField('stks',IntegerType(),False),
                       StructField('time',TimestampType(),False),
                       StructField('price',DoubleType(),False),
                       StructField('flag',IntegerType(),False)])
@pandas_udf(dfs_schema,functionType=PandasUDFType.GROUPED_MAP)
def calc_incre_3(spd):
    timelist=spd['time'].tolist()
    pricelist=spd['price'].tolist()
    flaglist=[]
    for i in range(len(timelist)):
        for t in range(len(timelist)-i):
            if (timelist[i+t]-timelist[i]).seconds>30:
                if (pricelist[i+t]-pricelist[i])/pricelist[i]>=0.05:
                    flaglist.append(1)
                else:
                    flaglist.append(0)
                break
    spd['flag']=flaglist
    return spd
pls_df=df1.groupBy('stks').apply(calc_incre_3)
pls_df.show()

我已经上传了原始数据文件。 https://www.dropbox.com/sh/gc5j36mik71a3yc/AAAUVfNINegdv9ozQPqJYiZHa?dl=0

标签: dataframepyspark

解决方案


如果您可以共享一些示例数据(如果存在数据共享问题,则屏蔽一个),很容易建议如何处理。

另外,分享以下信息

  • 您的市场 DF 拥有的记录总数。
  • 市场和股票的不同记录数(这有助于设计分区)。

您的代码有 2 个问题。太多的循环(嵌套)和函数调用(这不是 PySpark 中的最佳实践,因此要尽量避免,或者您可以编写 Java/Scala 函数并通过 PySpark 使用它,这会带来更好的性能,但会带来更多的复杂性涉及代码可维护性)

推荐的方法

  • 使用未指定的正确分区数
  • 使用窗口函数来避免嵌套循环。
  • 此外,了解 Spark UI 中的 shuffle 行为以解决性能问题。
  • 展平表格,而不是使用嵌套循环进行计算。

推荐阅读