python - 如何应用 dask 方法对列表中的文件应用函数?
问题描述
首先,感谢这个社区以及我们可以检索到的所有建议,非常感谢!
这是我第一次尝试并行处理,我一直在自己研究 Dask,但我在实际编码时遇到了麻烦……老实说,我真的迷路了
在我的项目中,我想触发 URL 并从 xml 文件中检索观测数据(气象站)。对于每个 URL,我运行一些不同的过程,以便:从 URL 检索数据,将 XML 信息解析为数据框,应用过滤器并将数据存储在 MySQL 数据库中。
所以我需要在数千个 URL(站)上循环这些过程......
我写了一个顺序代码,它需要 300 秒才能完成计算,这真的很长而且效率不高。
由于我们对每个站点应用相同的过程,我想我可以加快所有计算,但我不知道从哪里开始。我使用了从 dask 延迟,但我认为这不是最好的方法。
到目前为止,这是我的代码:首先我有一些功能。
def xml_to_dataframe(ood_xml):
tmp_file = wget.download(ood_xml)
prstree = ETree.parse(tmp_file)
root = prstree.getroot()
################ Section to retrieve data for one station and apply parameter
all_obs = []
for obs in root.iter('observations'):
ood_observation = []
for n, param in enumerate(list_parameters):
x=obs.find(variable_to_check).text
ood_observation.append(x)
all_obs.append(ood_observation)
return(pd.DataFrame(all_obs, columns=list_parameters))
def filter_criteria(df,threshold,criteria):
if criteria in df.columns:
result = []
for index, row in df.iterrows():
if pd.to_numeric(row[criteria],errors='coerce') >= threshold:
result.append(index)
return result
else:
#print(criteria + ' parameter does not exist for this station !!! ')
return([])
def get_and_filter_data(filename,criteria,threshold):
try:
xmlToDf = xml_to_dataframe(filename)
final_df = xmlToDf.loc[filter_criteria(xmlToDf,threshold,criteria)]
some msql connection and instructions....
except:
pass
然后是我要并行化的主要代码:
criteria = 'temperature'
threshold = 22
filenames =[url1.html, url2.html, url3.html]
for file in filenames:
get_and_filter_data(file,criteria,threshold)
你有什么建议吗?
非常感谢您的帮助 !
纪尧姆
解决方案
不是 100% 确定这是您所追求的,但一种方法是通过delayed
:
from dask import delayed, compute
delayeds = [delayed(get_and_filter_data)(file,criteria,threshold) for file in filenames]
results = compute(delayeds)
推荐阅读
- sql - 如果不满足另一个表中的条件,如何从表中删除记录
- node.js - 当类型是有条件的时,如何在 Typescript 中检测变量的正确类型
- pandas - 在 Pyspark 中使用 toPandas 或 Pyarrow 函数转换为熊猫时,Pyspark Dataframe 未返回所有行
- coding-style - 如何消除方法副作用?
- regex - 正则表达式匹配两个字符串之间的特定字符
- c++ - 如何在模板容器类中实现复制构造函数和赋值运算符?
- sql - INNER JOIN 查询性能很慢
- python - 下载 twitter 视频的 flac 文件?
- c# - 如何使用实体框架获取模型以供 ViewModel 引用
- angular - 如何使用 Dialogflow V2 查询修复身份验证错误