dataframe - 需要拆分数据集时如何加快 Pyspark 编程
问题描述
我现在遇到了一些问题。
- 我有一大堆股票报价数据 csv 文件。我需要计算每只股票的秒级数据。
我希望每只股票的数据可以在 10 秒内处理完毕,合并成一个大文件,最后输出到 csv。因为使用 pandas 会受到笔记本电脑内存的限制,如果我想在 pandas 中做,我需要做很多 read_csv/to_csv 工作。我认为这会很耗时,所以我选择了这种方式: - (1) 我使用 pyspark 读取所有的 csv 文件,生成一个大文件 df.
- (2) 我从 df 获得了股票清单。然后进行迭代,每次我选择股票数据的 pyspark 数据帧,将其传输到 pandas 数据帧,在 pandas 中计算。最后,将此文件输出到本地文件中。
现在的问题是程序运行速度非常慢,对于一些股票,它需要 2 分钟来完成处理。对于某些股票,它使用大约 18 分钟来完成处理。
我认为问题与 pyspark 数据框数据分区有关。pyspark 以非常远程的方式存储数据,因此它会重新组织数据以满足我的需求。我怎样才能加快速度?
#read data
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing1').getOrCreate()
sc=spark.sparkContext
df=spark.read.csv('file:///D:/t/*.csv',inferSchema=True,header=True,encoding='GBK')
df_empty=spark.createDataFrame(sc.emptyRDD(),t_schema)#pre-defined schema
#generate code list
codelist=df.select('stks').distinct().collect()
col_list=['stks','time']
df=df.orderBy(col_list,ascending=True)
from time import strftime, localtime
#For loops
for code in codelist:
print('++++++++++++++++++++++++++')
print(code)
print(strftime("%Y-%m-%d %H:%M:%S", localtime()))
df2=df.filter(df['stks']==code[0]).select('mkt','stks','time','price')
df2_1=df2.toPandas()
timelist=df2_1['time'].tolist()
pricelist=df2_1['price'].tolist()
#Add flag----target computation
flag_1=[]
flag_2=[]
for i in range(len(timelist)):
# calculate
flag_1.append(calc_incre_2(timelist,pricelist,i,30,0.05))
flag_2.append(calc_incre_2(timelist,pricelist,i,40,0.05))
df2_1['flag_1']=flag_1
df2_1['flag_2']=flag_2
df2_2=sqlContext.createDataFrame(df2_1)
df_empty=df_empty.union(df2_2)
#sub-function----------------------------
def calc_incre_2(timelist,pricelist,start_pos,secs_spec,incre_spec):
i=start_pos
flag=0
# timelist=df2['时间'].tolist()
# pricelist=df2['最新'].tolist()
for t in range(len(timelist)-i):
if (timelist[i+t]-timelist[i]).seconds>secs_spec:
incre=(pricelist[i+t]-pricelist[i])/pricelist[i]
if incre>=incre_spec:
flag=1
else:
flag=0
break
return flag
我试图使用 pandas_udf,但它仍然无法工作。必须在这里或那里得到一些空的 RDD。
#read data
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('data_processing1').getOrCreate()
sc=spark.sparkContext
df=spark.read.csv('file:///D:/t/*.csv',inferSchema=True,header=True,encoding='GBK')
df1=df.select('mkt','stks','time','price')
#Using PandasUDF
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
#create the schema for the resulting dataframe
dfs_schema=StructType([StructField('mkt',StringType(),False),
StructField('stks',IntegerType(),False),
StructField('time',TimestampType(),False),
StructField('price',DoubleType(),False),
StructField('flag',IntegerType(),False)])
@pandas_udf(dfs_schema,functionType=PandasUDFType.GROUPED_MAP)
def calc_incre_3(spd):
timelist=spd['time'].tolist()
pricelist=spd['price'].tolist()
flaglist=[]
for i in range(len(timelist)):
for t in range(len(timelist)-i):
if (timelist[i+t]-timelist[i]).seconds>30:
if (pricelist[i+t]-pricelist[i])/pricelist[i]>=0.05:
flaglist.append(1)
else:
flaglist.append(0)
break
spd['flag']=flaglist
return spd
pls_df=df1.groupBy('stks').apply(calc_incre_3)
pls_df.show()
我已经上传了原始数据文件。 https://www.dropbox.com/sh/gc5j36mik71a3yc/AAAUVfNINegdv9ozQPqJYiZHa?dl=0
解决方案
如果您可以共享一些示例数据(如果存在数据共享问题,则屏蔽一个),很容易建议如何处理。
另外,分享以下信息
- 您的市场 DF 拥有的记录总数。
- 市场和股票的不同记录数(这有助于设计分区)。
您的代码有 2 个问题。太多的循环(嵌套)和函数调用(这不是 PySpark 中的最佳实践,因此要尽量避免,或者您可以编写 Java/Scala 函数并通过 PySpark 使用它,这会带来更好的性能,但会带来更多的复杂性涉及代码可维护性)
推荐的方法
- 使用未指定的正确分区数
- 使用窗口函数来避免嵌套循环。
- 此外,了解 Spark UI 中的 shuffle 行为以解决性能问题。
- 展平表格,而不是使用嵌套循环进行计算。
推荐阅读
- php - TLS 协商失败:ldap_connect 与默认端口以外的端口
- python - ansible python错误:没有名为os的模块
- python - 使用 pyinstaller 获取目录
- python - 自定义字段上的简单销售人员更新
- oracle - 访问 Oracle 表单时出现 URL 验证失败错误
- android - 如何在okhttp3中设置连接超时?
- ios - 为什么 Apple 不支持检查连接是否具有活动的 Internet 连接?
- php - 使用正则表达式替换搜索短语或搜索词匹配的 php 代码
- firebase - 身份验证错误 - 此 Firebase 项目需要导入 Google Cloud Platform 才能恢复服务
- swift - 复制金属帧缓冲纹理以供阅读