首页 > 解决方案 > 在多个列上并行运行回归

问题描述

我有一个非常宽的带有标签列的数据框。我想独立地为每一列运行逻辑回归。我正在尝试找到最有效的方法来并行运行它。

+----------+--------+--------+--------+-----+------------+
| features | label1 | label2 | label3 | ... | label30000 |
+----------+--------+--------+--------+-----+------------+

我最初的想法是使用ThreadPoolExecutor,获取每列的结果,然后加入:

extract_prob = udf(lambda x: float(x[1]), FloatType())

def lr_for_column(argm):
    col_name = argm[0]
    test_res = argm[1]
    lr = LogisticRegression(featuresCol="features", labelCol=col_name, regParam=0.1)
    lrModel = lr.fit(tfidf)
    res = lrModel.transform(test_tfidf)
    test_res = test_res.join(res.select('id', 'probability'), on="id")
    test_res = test_res.withColumn(col_name, extract_prob('probability')).drop("probability")
    return test_res.select('id', col_name)


with futures.ThreadPoolExecutor(max_workers=100) as executor:
    future_results = [executor.submit(lr_for_column, [colname, test_res]) for colname in list_of_label_columns]
    futures.wait(future_results)
    for future in future_results:
       test_res = test_res.join(future.result(), on="id")

但是这种方法的性能不是很好。有没有更快的方法来做到这一点?

标签: pythonapache-sparkpysparkapache-spark-ml

解决方案


考虑到可用资源,您无法通过使用获得任何收益ThreadPoolExecutor-总共有 32 个内核和 200 个分区,您只能同时处理约 16% 的数据,如果数据增长,这部分只会变得更糟。

如果你想训练 30000 个模型并使用默认的迭代次数(100,实际上可能很低),Spark 程序将提交大约 3000000 个作业(每次迭代创建一个单独的作业),并且每个作业只能处理一小部分同时 - 这不会给改进带来太大希望,除非您添加更多资源。

尽管有一些事情你可以尝试:

  • 确保不必重新计算最终特征。如有必要,将数据写入持久存储并将其加载回来,并确保传递给模型的数据被缓存。
  • 考虑应用一些降维算法。特征数为 300000 不仅高,而且接近记录数(500000)。它不仅计算成本高,而且还可能导致严重的过拟合。
  • 如果您决定减少维度,请考虑采样以进一步减少训练数据的大小,从而减少分区数量并提高整体吞吐量。

    如果您的数据中有很强的线性趋势,即使在较小的样本上也应该可见,而不会显着降低精度。

  • pyspark.ml考虑用不需要多个作业的变体替换昂贵的算法,例如使用一些工具组合(您可以通过在每个分区上spark-sklearn拟合模型来创建集成模型)。sklearn

  • 超额认购核心。例如,如果您有 4 个物理内核/节点,则允许 8 或 16 个来考虑 IO 等待时间。


推荐阅读