python - 在 Spark 中优化稀疏向量的聚合总和(并保存到镶木地板)
问题描述
请原谅 Pyspark NOOB 问题。
我在 PySpark 中生成 Spark 数据帧的最后阶段如下:
indexer = StringIndexer(inputCol="kpID", outputCol="KPindex")
inputs = [indexer.getOutputCol()]
encoder = OneHotEncoderEstimator(inputCols=inputs, outputCols=["KPvec"])
pipeline = Pipeline(stages=[indexer, encoder])
df_bayes = pipeline.fit(df_bayes).transform(df_bayes)
def sparse_to_array(v):
v = DenseVector(v)
new_array = list([float(x) for x in v])
return new_array
sparse_to_array_udf = F.udf(sparse_to_array, ArrayType(FloatType()))
df_bayes = df_bayes.select('id',sparse_to_array_udf(col('KPvec')).alias('KPvec'))
df_bayes = df_bayes.repartition(15000,col('id'))
df_bayes = df_bayes.select('id','KPvec').groupby('id').agg(F.array(*[F.sum(F.col('KPvec')[i]) for i in range(len(kids))]).alias("KPvec")).cache()
我正在尝试汇总一个稀疏向量,该向量表示一个热编码的分类变量。
在我的 EMR 集群上,这需要 188 秒才能完成。生成的数据帧有约 50M 行。然后我尝试将此数据框写入镶木地板。
我努力了:
df_bayes.write.format("parquet") \
.partitionBy("id") \
.bucketBy(500,"KPvec") \
.option("path", "s3://..."+"output.parquet") \
.saveAsTable("output")
和:
df_bayes.write.repartition(1500,col('id')).parquet("s3://..."+"output.parquet")
并且无需重新分区。
在每种情况下,作业都需要很长时间并最终失败并出现 ExecutorLostFailure(这是由于 EMR 与许多 Spot 实例一起运行)。
尽管之前有缓存,但我怀疑其中许多步骤实际上与拼花写入无关,而是与我要求的计算步骤有关。
我怀疑是这种情况,因为如果我尝试计算数据框的尺寸,我会看到 DAG 可视化是:
在作业失败之前重复的步骤和大约 6GB 的随机写入向我表明我执行计算的方式效率非常低。
此外,当我运行时,explain
我得到以下信息:
== Physical Plan ==
InMemoryTableScan [id#1, KPvec#52167]
+- InMemoryRelation [id#1, KPvec#52167], StorageLevel(disk, memory, deserialized, 1 replicas)
+- HashAggregate(keys=[id#1], functions=[sum(cast(KPvec#27[0] as double)), sum(cast(KPvec#27[1] as double)), sum(cast(KPvec#27[2] as double)), sum(cast(KPvec#27[3] as double)), sum(cast(KPvec#27[4] as double)), sum(cast(KPvec#27[5] as double)), sum(cast(KPvec#27[6] as double)), sum(cast(KPvec#27[7] as double)), sum(cast(KPvec#27[8] as double)), sum(cast(KPvec#27[9] as double)), sum(cast(KPvec#27[10] as double)), sum(cast(KPvec#27[11] as double)), sum(cast(KPvec#27[12] as double)), sum(cast(KPvec#27[13] as double)), sum(cast(KPvec#27[14] as double)), sum(cast(KPvec#27[15] as double)), sum(cast(KPvec#27[16] as double)), sum(cast(KPvec#27[17] as double)), sum(cast(KPvec#27[18] as double)), sum(cast(KPvec#27[19] as double)), sum(cast(KPvec#27[20] as double)), sum(cast(KPvec#27[21] as double)), sum(cast(KPvec#27[22] as double)), sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
+- HashAggregate(keys=[id#1], functions=[partial_sum(cast(KPvec#27[0] as double)), partial_sum(cast(KPvec#27[1] as double)), partial_sum(cast(KPvec#27[2] as double)), partial_sum(cast(KPvec#27[3] as double)), partial_sum(cast(KPvec#27[4] as double)), partial_sum(cast(KPvec#27[5] as double)), partial_sum(cast(KPvec#27[6] as double)), partial_sum(cast(KPvec#27[7] as double)), partial_sum(cast(KPvec#27[8] as double)), partial_sum(cast(KPvec#27[9] as double)), partial_sum(cast(KPvec#27[10] as double)), partial_sum(cast(KPvec#27[11] as double)), partial_sum(cast(KPvec#27[12] as double)), partial_sum(cast(KPvec#27[13] as double)), partial_sum(cast(KPvec#27[14] as double)), partial_sum(cast(KPvec#27[15] as double)), partial_sum(cast(KPvec#27[16] as double)), partial_sum(cast(KPvec#27[17] as double)), partial_sum(cast(KPvec#27[18] as double)), partial_sum(cast(KPvec#27[19] as double)), partial_sum(cast(KPvec#27[20] as double)), partial_sum(cast(KPvec#27[21] as double)), partial_sum(cast(KPvec#27[22] as double)), partial_sum(cast(KPvec#27[23] as double)), ... 52109 more fields])
+- Exchange hashpartitioning(id#1, 15000)
+- *(2) Project [id#1, pythonUDF0#52170 AS KPvec#27]
+- BatchEvalPython [sparse_to_array(KPvec#3)], [KPvec#3, id#1, pythonUDF0#52170]
+- *(1) Project [KPvec#3, id#1]
+- *(1) FileScan parquet [id#1,KPvec#3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[s3://...bayesnetw..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,KPvec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>
谁能指出我在这里做错的方向?
先感谢您。
解决方案
因此,为了回答我自己的问题以防它对其他人有所帮助,解决方案是收集_设置您想要一次性编码的功能,然后使用 CountVectorizor 而不是 Spark ML 的 OneHotEncoder。
df.select('id','feature').groupby('id').agg(F.collect_set('feature').alias('feature'))
countModel = CountVectorizer().setInputCol("feature").setOutputCol("feature_vec").fit(df)
df = countModel.transform(df).select('id','KPvec')
然后你可以简单地将它保存到镶木地板上。对我来说,这是相当快的。
推荐阅读
- sql - 一周的用户留存率
- python - 是否可以使用内联语句?
- python - 如何创建一个条形图,其下方有一个表格,显示单个条形值,如 Excel 图表表
- c# - 迁移到 .net core 3 添加 TResult IAsyncQueryProvider 时的 IAsyncQueryProvider 模拟问题
- javascript - Javascript计算季节
- python - 如何将数据框行转换为列?
- javascript - 如何从控制器获取保存的下拉值以查看
- salesforce - 在 Salesforce Lightning 控制台中 - 无法将机会添加为案例的子选项卡
- google-sheets - 有没有办法在执行 batchGet 之前等待 Google 表格完成计算?
- google-bigquery - 如何从 Cloud Function 执行 .sql 文件(BigQuery)?