首页 > 解决方案 > 如何在 Pyspark 中将 1000 万条记录的列转换为 20000 条记录的列?

问题描述

正如您在图像中看到的,我们有一个数据框,其中一列包含 1000 万个值。我们的目的是重塑这个数据框,并获得一个将 1000 万个值除以 20000 个值的列。有什么方法可以在 Pyspark 中执行此操作?

https://i.stack.imgur.com/FvZKf.png

接下来,我添加一个图像,您可以在其中看到我想要实现的目标。该图显示了输入值(我们所在的点)和输出值(我们想要实现的结果)的示例。

https://i.stack.imgur.com/6gJHg.png

我还有一个疑问,下一步是将列转换为行。

https://i.stack.imgur.com/udHbk.png

标签: apache-sparkdataframepyspark

解决方案


这是一种可能的方法(我已经在图像中生成了表单的数据,可以通过调整输入参数来切换确切的大小generate_data):

from pyspark.sql import functions as f
from pyspark.sql import Window

def generate_data(size=10000, colsize=200):
    # generate data with a partition marker every colsize rows
    # to mark where the next column begins
    data = (sc.parallelize(np.random.rand(size).tolist())
            .zipWithIndex()
            .toDF(schema=['value', 'rownum'])
           .withColumn('colnum',(f.col('rownum')/colsize).cast('integer')))

    # generate a row number within column
    win = Window.partitionBy('colnum').orderBy('rownum')
    data = data.withColumn('col_rownum', f.rank().over(win))
    return data

df = generate_data(size=10010)

# now pivot to get columns
pivoted = (df.groupby('col_rownum')
           .pivot('colnum')
           .max('value')
           .orderBy('col_rownum'))

pivoted.select(pivoted.columns[1:])

推荐阅读