首页 > 解决方案 > 在 Spark 中优化稀疏向量的聚合总和(并保存到镶木地板)

问题描述

请原谅 Pyspark NOOB 问题。

我在 PySpark 中生成 Spark 数据帧的最后阶段如下:

indexer = StringIndexer(inputCol="kpID", outputCol="KPindex")
inputs = [indexer.getOutputCol()]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["KPvec"])
pipeline = Pipeline(stages=[indexer, encoder])
df_bayes = pipeline.fit(df_bayes).transform(df_bayes)

def sparse_to_array(v):
    v = DenseVector(v)
    new_array = list([float(x) for x in v])
    return new_array

sparse_to_array_udf = F.udf(sparse_to_array, ArrayType(FloatType()))
df_bayes = df_bayes.select('id',sparse_to_array_udf(col('KPvec')).alias('KPvec'))
df_bayes = df_bayes.repartition(15000,col('id'))
df_bayes = df_bayes.select('id','KPvec').groupby('id').agg(F.array(*[F.sum(F.col('KPvec')[i]) for i in range(len(kids))]).alias("KPvec")).cache()

我正在尝试汇总一个稀疏向量,该向量表示一个热编码的分类变量。

在我的 EMR 集群上,这需要 188 秒才能完成。生成的数据帧有约 50M 行。然后我尝试将此数据框写入镶木地板。

我努力了:

df_bayes.write.format("parquet") \
.partitionBy("id") \
.bucketBy(500,"KPvec") \
.option("path", "s3://..."+"output.parquet") \
.saveAsTable("output")

和:

df_bayes.write.repartition(1500,col('id')).parquet("s3://..."+"output.parquet")

并且无需重新分区。

在每种情况下,作业都需要很长时间并最终失败并出现 ExecutorLostFailure(这是由于 EMR 与许多 Spot 实例一起运行)。

这是 Spark DAG 可视化

尽管之前有缓存,但我怀疑其中许多步骤实际上与拼花写入无关,而是与我要求的计算步骤有关。

我怀疑是这种情况,因为如果我尝试计算数据框的尺寸,我会看到 DAG 可视化是:

银

在作业失败之前重复的步骤和大约 6GB 的随机写入向我表明我执行计算的方式效率非常低。

此外,当我运行时,explain我得到以下信息:

== Physical Plan ==
InMemoryTableScan [id#1, KPvec#52167]
   +- InMemoryRelation [id#1, KPvec#52167], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- HashAggregate(keys=[id#1], functions=[sum(cast(KPvec#27[0] as double)), sum(cast(KPvec#27[1] as double)), sum(cast(KPvec#27[2] as double)), sum(cast(KPvec#27[3] as double)), sum(cast(KPvec#27[4] as double)), sum(cast(KPvec#27[5] as double)), sum(cast(KPvec#27[6] as double)), sum(cast(KPvec#27[7] as double)), sum(cast(KPvec#27[8] as double)), sum(cast(KPvec#27[9] as double)), sum(cast(KPvec#27[10] as double)), sum(cast(KPvec#27[11] as double)), sum(cast(KPvec#27[12] as double)), sum(cast(KPvec#27[13] as double)), sum(cast(KPvec#27[14] as double)), sum(cast(KPvec#27[15] as double)), sum(cast(KPvec#27[16] as double)), sum(cast(KPvec#27[17] as double)), sum(cast(KPvec#27[18] as double)), sum(cast(KPvec#27[19] as double)), sum(cast(KPvec#27[20] as double)), sum(cast(KPvec#27[21] as double)), sum(cast(KPvec#27[22] as double)), sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
            +- HashAggregate(keys=[id#1], functions=[partial_sum(cast(KPvec#27[0] as double)), partial_sum(cast(KPvec#27[1] as double)), partial_sum(cast(KPvec#27[2] as double)), partial_sum(cast(KPvec#27[3] as double)), partial_sum(cast(KPvec#27[4] as double)), partial_sum(cast(KPvec#27[5] as double)), partial_sum(cast(KPvec#27[6] as double)), partial_sum(cast(KPvec#27[7] as double)), partial_sum(cast(KPvec#27[8] as double)), partial_sum(cast(KPvec#27[9] as double)), partial_sum(cast(KPvec#27[10] as double)), partial_sum(cast(KPvec#27[11] as double)), partial_sum(cast(KPvec#27[12] as double)), partial_sum(cast(KPvec#27[13] as double)), partial_sum(cast(KPvec#27[14] as double)), partial_sum(cast(KPvec#27[15] as double)), partial_sum(cast(KPvec#27[16] as double)), partial_sum(cast(KPvec#27[17] as double)), partial_sum(cast(KPvec#27[18] as double)), partial_sum(cast(KPvec#27[19] as double)), partial_sum(cast(KPvec#27[20] as double)), partial_sum(cast(KPvec#27[21] as double)), partial_sum(cast(KPvec#27[22] as double)), partial_sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
               +- Exchange hashpartitioning(id#1, 15000)
                  +- *(2) Project [id#1, pythonUDF0#52170 AS KPvec#27]
                     +- BatchEvalPython [sparse_to_array(KPvec#3)], [KPvec#3, id#1, pythonUDF0#52170]
                        +- *(1) Project [KPvec#3, id#1]
                           +- *(1) FileScan parquet [id#1,KPvec#3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://...bayesnetw..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,KPvec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>

谁能指出我在这里做错的方向?

先感谢您。

标签: pythonapache-sparkamazon-emrparquetpyspark-dataframes

解决方案


因此,为了回答我自己的问题以防它对其他人有所帮助,解决方案是收集_设置您想要一次性编码的功能,然后使用 CountVectorizo​​r 而不是 Spark ML 的 OneHotEncoder。

df.select('id','feature').groupby('id').agg(F.collect_set('feature').alias('feature'))

countModel = CountVectorizer().setInputCol("feature").setOutputCol("feature_vec").fit(df)
df = countModel.transform(df).select('id','KPvec')

然后你可以简单地将它保存到镶木地板上。对我来说,这是相当快的。


推荐阅读