首页 > 解决方案 > pandas pivot_table (slow) vs pyspark pivot

问题描述

我正在尝试对大小为 40GB 的文件的 pandas 执行 pivot_table 操作。

这需要很长时间才能运行(1.5 小时)。我通过将 spark 数据帧转换为 pandas 并使用 pyarrow 在 hadoop 集群上执行此操作。但是使用熊猫,我相信这个操作只在那个特定的节点上运行。最后,我再次将其转换为 spark 数据框。

输入:

+---------+-----------------+-----------------+------+------------------+------------------+------------------+---------+------------------+
| uniqueid | Measure1_month1 | Measure1_month2 | .... | Measure1_month72 | Measure2_month_1 | Measure2_month_2 | ….so on | Measure2_month72 |
+---------+-----------------+-----------------+------+------------------+------------------+------------------+---------+------------------+
|       1 |              10 |              20 | ….   |              500 |               40 |               50 | …       |                  |
|       2 |              20 |              40 | ….   |              800 |               70 |              150 | …       |                  |
+---------+-----------------+-----------------+------+------------------+------------------+------------------+---------+------------------+

输出:

+---------+-------+----------+----------+
| uniqueid| Month | Measure1 | Measure2 |
+---------+-------+----------+----------+
|       1 |     1 |       10 |       30 |
|       1 |     2 |       20 |       40 |
|       1 |     3 |       30 |       80 |
|       1 |     4 |       70 |       90 |
|       1 |     5 |       40 |      100 |
|       . |     . |        . |        . |
|       . |     . |        . |        . |
|       1 |    72 |      700 |       50 |
+---------+-------+----------+----------+

代码:

import pandas as pd
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    df_input = spark.sql("select * from inputtable")
    
    df_input_pd=df_input.toPandas()
    
    d = df_input_pd.set_index('uniqueid')
    d.columns = d.columns.str.replace('m\_', 'm').str.split('_', expand=True)
    u = d.stack((0, 1)).rename_axis(
          ['uniqueid', 'Measure', 'Month']).to_frame('Value').reset_index()
    f2 = u.pivot_table(index=['uniqueid', 'Month'], columns='Measure', values='Value', fill_value=0).sort_values(['uniqueid', 'Month'])
    
    
    spDF = sqlContext.createDataFrame(f2)
    spDF.write.insertInto("outputtable",overwrite=False)

f2(pivot) 和 insertinto 是耗时最长的操作。有没有办法通过使用 pyspark pivot 或任何其他操作来优化它?我在一组较小的数据上对此进行了测试,并且效果很好。有没有更优化的方法来做到这一点?如果有不清楚的地方,请告诉我。再次感谢

问候, 萨万

标签: pythonpandaspyspark

解决方案


远离熊猫并使用火花pivot功能:

df.pivot('Measure')

*发布数据样本以及您期望的最终结果。


推荐阅读