apache-spark - 如何使用 PySpark 并行化我的文件处理程序
问题描述
我现在有一个大型 python 项目,其中驱动程序有一个函数,它使用 for 循环遍历我的 GCP(谷歌云平台)存储桶上的每个文件。我正在使用 CLI 将作业提交给 GCP 并让作业在 GCP 上运行。
对于在此 for 循环中遍历的每个文件,我正在调用一个函数 parse_file(...) 来解析文件并调用处理该文件的其他函数的序列。
整个项目运行需要几分钟,速度很慢,而且驱动程序还没有使用多少PySpark。问题是该文件级 for 循环中的每个 parse_file(...) 都按顺序执行。是否可以使用 PySpark 并行化该文件级 for 循环以对所有这些文件并行运行 parse_file(...) 函数以减少程序执行时间并提高效率?如果是这样,由于程序没有使用 PySpark,是否需要进行大量代码修改才能使其并行化?
所以程序的功能是这样的
# ... some other codes
attributes_table = ....
for obj in gcp_bucket.objects(path):
if obj.key.endswith('sys_data.txt'):
#....some other codes
file_data = (d for d in obj.download().decode('utf-8').split('\n'))
parse_file(file_data, attributes_table)
#....some other codes ....
如何使用 PySpark 并行化这部分,而不是一次使用一个 for 循环遍历文件?
解决方案
谢谢你问你的问题。
我建议根据您的gcp_bucket.objects(path)
.
你有你的 SparkContext 所以创建 RDD 应该很简单
my_rdd = sc.parallelize(gcp_bucket.objects(path)
:
对于外行来说,约定是将 SparkContext 分配给变量sc
。您的 for 循环的内容必须放入一个函数中,我们称之为my_function
。你现在拥有所有的作品。
您的下一步将映射您的函数,如下所示:
results_dag = my_rdd.map(my_function)
results = results_dag.collect()
回想一下 Spark 执行惰性求值。这就是为什么我们需要collect
在最后执行操作。
其他一些建议。第一个是在 GCP 存储桶中的一小组对象上运行您的代码。了解时间安排。为了促进良好的编码实践,另一个建议是考虑将 for 循环中的操作进一步分解为额外的 RDD。你总是可以把它们连在一起...
my_rdd = sc.parallelize(gcp_bucket.objects(path)
dag1 = my_rdd.map(function1)
dag2 = dag1.map(function2)
dag3 = dag2.map(function3)
results = dag3.collect()
推荐阅读
- apache-kafka - 如何使消息在所有分区上均匀分布
- java - 数据截断:不正确的日期时间值:'
- python - 当我在数据帧上调用函数时,编码器给出值错误
- python - 如果任何单元测试失败,如何使 Python 的覆盖工具失败?
- haskell - Haskell - 在背包问题中使用 Vector(优化)
- python - Pandas - 从日期列表中获取每个月的最后一个日期
- r - HANA R Kerberos 身份验证
- c# - 运行程序后数据库表被锁定
- openstack - Keystone wsgi 服务是否有任何替代方案用于生产用途?
- c# - WebForm ListView 中的多列格式