首页 > 解决方案 > 为什么 PySpark 任务花费了太多时间?

问题描述

我正在运行一个没有问题的 Pyspark 进程。该过程的第一步是将特定的 UDF 应用于数据帧。这是功能:

import html2text

class Udfs(object):
    def __init__(self):
        self.h2t = html2text.HTML2Text()
        self.h2t.ignore_links = True
        self.h2t.ignore_images = True

    def extract_text(self, raw_text):
        try:
            texto = self.h2t.handle(raw_text)
        except:
            texto = "PARSE HTML ERROR"
        return texto

以下是我应用 UDF 的方法:

import pyspark.sql.functions as f
import pyspark.sql.types as t
from udfs import Udfs

udfs = Udfs()
extract_text_udf = f.udf(udfs.extract_text, t.StringType())
df = df.withColumn("texto", extract_text_udf("html_raw"))

它处理大约 2900 万行和 300GB。问题是某些任务需要太多时间来处理。任务的平均时间为:

平均次数

其他任务已完成,持续时间超过 1 小时。

但是有些任务需要太多时间处理:

任务时间

该过程在 AWS 中使用 EMR 在具有 100 个节点的集群中运行,每个节点具有 32gb 的 RAM 和 4 个 CPU。还启用了火花推测。

这些任务的问题在哪里?UDF有问题吗?是线程问题?

标签: pythonapache-sparkpysparkbigdatauser-defined-functions

解决方案


我的直觉是你使用了太多的分区。我会通过显着减少他们的数量来进行第一次尝试。你可以找到关于这个主题的这个有趣的帖子

如果您的分区是平衡的,则您可以29 millions /80k partitions = 362按分区进行平均观察。我想这还不够。您花费大量时间安排任务而不是执行任务。

如果您没有平衡分区,情况会变得更糟(请参阅此处。这通常会造成瓶颈,这似乎是您的情况。有几个选项:

  • 您可以coalesce将您的数据分配到数量较少的分区中。这比使用更好,repartition因为它避免了完全洗牌
  • repartitionByRange如果您希望根据某些列拆分数据。您将不会像使用coalesce或那样拥有平衡的分区,repartition但它可能对后者有用,因为您需要使用那些拆分列的操作

spark.sql.shuffle.partitions您可以使用和更改有关分区的默认值spark.default.parallelism

这是根据我的经验推测的。找到足够数量的分区很难,但值得。让我知道它是否有帮助,或者您是否仍然遇到瓶颈。


推荐阅读