python - 为什么 PySpark 任务花费了太多时间?
问题描述
我正在运行一个没有问题的 Pyspark 进程。该过程的第一步是将特定的 UDF 应用于数据帧。这是功能:
import html2text
class Udfs(object):
def __init__(self):
self.h2t = html2text.HTML2Text()
self.h2t.ignore_links = True
self.h2t.ignore_images = True
def extract_text(self, raw_text):
try:
texto = self.h2t.handle(raw_text)
except:
texto = "PARSE HTML ERROR"
return texto
以下是我应用 UDF 的方法:
import pyspark.sql.functions as f
import pyspark.sql.types as t
from udfs import Udfs
udfs = Udfs()
extract_text_udf = f.udf(udfs.extract_text, t.StringType())
df = df.withColumn("texto", extract_text_udf("html_raw"))
它处理大约 2900 万行和 300GB。问题是某些任务需要太多时间来处理。任务的平均时间为:
其他任务已完成,持续时间超过 1 小时。
但是有些任务需要太多时间处理:
该过程在 AWS 中使用 EMR 在具有 100 个节点的集群中运行,每个节点具有 32gb 的 RAM 和 4 个 CPU。还启用了火花推测。
这些任务的问题在哪里?UDF有问题吗?是线程问题?
解决方案
我的直觉是你使用了太多的分区。我会通过显着减少他们的数量来进行第一次尝试。你可以找到关于这个主题的这个有趣的帖子。
如果您的分区是平衡的,则您可以29 millions /80k partitions = 362
按分区进行平均观察。我想这还不够。您花费大量时间安排任务而不是执行任务。
如果您没有平衡分区,情况会变得更糟(请参阅此处。这通常会造成瓶颈,这似乎是您的情况。有几个选项:
- 您可以
coalesce
将您的数据分配到数量较少的分区中。这比使用更好,repartition
因为它避免了完全洗牌 repartitionByRange
如果您希望根据某些列拆分数据。您将不会像使用coalesce
或那样拥有平衡的分区,repartition
但它可能对后者有用,因为您需要使用那些拆分列的操作
spark.sql.shuffle.partitions
您可以使用和更改有关分区的默认值spark.default.parallelism
。
这是根据我的经验推测的。找到足够数量的分区很难,但值得。让我知道它是否有帮助,或者您是否仍然遇到瓶颈。
推荐阅读
- c++ - 使用 Qt 进行 C++ 摘要身份验证
- javascript - 单击按钮后引导模式不起作用
- javascript - 如何使用扩展运算符在 javascript 中的对象数组中删除和添加新键
- image - 如何在颤动中将图像存储到缓存的网络图像
- javascript - 使用 webpack 5 将主题提取到不同的 css
- java - 将 for 循环转换为流
- raspberry-pi - 树莓失去WiFi连接,无法重新连接
- reactjs - 使用检查的材料 UI 自动完成组件问题
- php - laravel 门不能在带有 @can 的刀片中工作
- reactjs - 尝试创建 React 应用程序并面临错误