python - 提高 Spark.SQL 中的数据整理性能
问题描述
我有一个包含多个 csv 文件的大型数据库。每个 csv 文件都包含最近 10 天,只有最早的日期是最终数据。
例如“file_2019-08-11.csv”文件包含从 08-02 到 08-11 的数据(只有数据中日期为 08-02 的记录是最终的),“file_2019-08-12.csv”文件包含来自08-03 到 08-12(只有日期为 08-03 的记录是最终的)。
我正在使用 PySpark 来做到这一点。我的目标是仅保留 variables_2019-08-11.csv 文件中日期 08-02 的记录和 variables_2019-08-12.csv 文件中日期 08-03 的记录等。我正在使用 PySpark 和 Databricks 来做到这一点,我的代码片段正在工作,但有点慢,尽管我在足够大的集群上运行它。
我很乐意为其他场景提供建议以提高其性能。谢谢
import datetime
# define the period range
start_date="2019-08-12"
end_date="2019-08-30
# create list of dates under date_generated variable
start = datetime.datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.datetime.strptime(end_date, "%Y-%m-%d")
date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]
# read first file
filename="file_variables_"+str(date_generated[0])[0:10]+".csv"
df=spark.read.csv(data_path+filename,header="true")
df.createOrReplaceTempView("df")
#create the main file which we will use the other dates to append below this one
final=spark.sql("select * from df where data_date in (select min(data_date) from df)")
#loop on other dates than the first date
for date in date_generated[1:len(date_generated)]:
filename="file_variables_"+str(date)[0:10]+".csv"
df=spark.read.csv(data_path+filename,header="true")
df.createOrReplaceTempView("df")
temp=spark.sql("select * from df where data_date in (select min(data_date) from df)")
final=final.union(temp)
final.createOrReplaceTempView("final")
解决方案
我怀疑你的大集群上的大多数核心都是空闲的,因为根据你的代码在每个文件上使用循环的结构方式,你的工作是处理一个文件并且只使用集群中的一个核心。查看集群 -> [您的集群] -> 指标 -> [Ganglia UI]
首先,最好将所有文件作为一组处理。如果input_file_name()
您的逻辑依赖于输入文件名,请使用。在片场完成所有工作。循环会扼杀你的表现。
其次,我认为窗口化的 SQL 函数 dense_rank() 将帮助您找到组中所有日期的第一个日期 [input_file_name()]。这里有一篇介绍窗口函数的博客:https ://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
df=spark.read.csv(data_path)
from pyspark.sql.functions import input_file_name
df2 = df.withColumn('file_name',input_file_name())
final = df2.<apply logic>
推荐阅读
- python - 你如何使用有状态的 LSTM 进行预测?
- c++ - 使用 C++ 中的特征库定义和填充稀疏矩阵
- angular - 检测触摸在单独元素上的移动以检测和更改元素颜色
- php - 用一种形式插入 2 个表,其中 2 个 sql 语句不起作用
- r - R 颜色 Brewer 调色板
- java - upload large file to database is correct practise?
- php - Laravel switch locale and keep the choice does not work on routes
- docker - 无法将证书添加到 alpine linux 容器
- python - 某些情况下不应用按钮背景颜色配置
- python - 有没有办法用 discord.py 发送好友请求?