首页 > 解决方案 > 在pyspark中以分布式方式有效地生成大型DataFrame(没有pyspark.sql.Row)

问题描述

问题归结为以下几点:我想在 pyspark 中使用现有的并行化输入集合和一个给定一个输入的函数生成一个 DataFrame,该函数可以生成相对较大的一批行。在下面的示例中,我想使用例如 1000 个执行器生成 10^12 行数据帧:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
       StructField("seed", IntegerType()),
       StructField("n", IntegerType()),
       StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

(我真的不想研究给定种子的随机数分布 - 这只是我能够想出的一个例子来说明大型数据帧不是从仓库加载而是由代码生成的情况)

上面的代码几乎完全符合我的要求。问题是它以一种非常低效的方式进行 - 代价是为每一行创建一个 python Row 对象,然后将 python Row 对象转换为内部 Spark 列表示。

np_array有没有一种方法可以通过让 spark 知道这些是一批值的列来转换已经以列表示形式的一批行(例如,上面的一个或几个 numpy 数组)?

例如,我可以编写代码来生成 python 集合 RDD,其中每个元素都是 pyarrow.RecordBatch 或 pandas.DataFrame,但我找不到将其中任何一个转换为 Spark DataFrame 的方法,而无需在其中创建 pyspark Row 对象的 RDD过程。

至少有十几篇文章提供了如何使用 pyarrow + pandas 有效地将本地(到驱动程序)pandas 数据帧转换为 Spark 数据帧的示例,但这对我来说不是一个选择,因为我需要在在执行程序上以分布式方式,而不是在驱动程序上生成一个 pandas 数据帧并将其发送给执行程序。

UPD。 我找到了一种避免创建 Row 对象的方法——使用 Python 元组的 RDD。正如预期的那样,它仍然太慢了,但仍然比使用 Row 对象快一点。尽管如此,这并不是我真正想要的(这是一种将列数据从 python 传递到 Spark 的非常有效的方法)。

还测量了在机器上执行某些操作的时间(粗略的方法,测量时间有很大的变化,但在我看来仍然具有代表性):有问题的数据集是 10M 行,3 列(一列是常数整数,其他是从 0 到 10M-1 的整数范围,第三个是使用生成的浮点值np.random.random_sample

仅使用 1 个执行程序和 1 个初始种子值生成 Spark 数据帧:

本地到驱动程序 pandas 数据帧的示例在 ~1s 内转换为 Spark 数据帧,10M 行,这让我有理由相信在执行程序中生成的数据帧应该是可能的。然而,我现在可以达到的最快速度是使用 Python 元组的 RDD 处理 10M 行约 40 秒。

所以问题仍然存在 - 有没有办法在 pyspark 中以分布式方式有效地生成大型 Spark 数据帧?

标签: apache-sparkpysparkpyarrowapache-arrow

解决方案


听起来瓶颈是从 RDD -> Dataframes 的转换,并且手头的功能相当快,并且 pandas DF 转换通过 pyarrow 触发 DF 非常快。以下是两种可能的解决方案:

  1. 由于很容易并行创建 pandas df,而不是从执行程序返回它,而是使用 编写生成的 df df.to_parquet,即:
def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    df.reset_index().to_parquet(f"s3://bucket/part-{str(seed).zfill(5)}.parquet"

之后在生成的镶木地板文件中读取火花应该是微不足道的。然后你的瓶颈变成了 IO 限制,这应该比 spark 转换元组/行类型更快。

  1. 如果您不允许将任何内容保存到文件中,pandas_udf并且GROUPED_MAP可能会帮助您,假设您的 spark 版本足够新。它也使用 pyarrow 在 spark DF 和 pandas DF 之间进行转换,因此它应该比使用元组更快,并允许您以分布式方式从 UDF 创建和返回 pandas DF。
import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

N = 10

df = spark.createDataFrame(
    [(i,) for i in range(N)], ["seed"]
)

def generate_data(seed):
    M = 10
    np.random.seed(seed)
    np_array = np.random.random_sample(M) # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    return df.reset_index()

@pandas_udf("index long, x double, seed long", PandasUDFType.GROUPED_MAP)
def generate_data_udf(pdf):
    output = []
    for idx, row in pdf.iterrows():
        output.append(generate_data(row["seed"]))
    return pd.concat(output)


df.groupby("seed").apply(generate_data_udf).show()

较慢的部分将是groupby您可能能够加快速度的部分,具体取决于您将种子放入的方式generate_data_udf,即:

@udf(returnType=IntegerType())
def batch_seed(seed):
    return seed // 10

df.withColumn("batch_seed", batch_seed(col("seed"))). \
groupBy("batch_seed").apply(generate_data_udf).show()

推荐阅读