首页 > 解决方案 > 如何拆分pyspark数据框并创建新列

问题描述

我有如下示例输入数据框,但值(以 m 开头的 clm)列可以是 n 个数字。我还使用 customer_id 作为主键(但是,根据输入数据,我可以拥有更多的主键)。

customer_id|month_id|m1    |m2 |m3 ....to....m_n
1001      |  01    |10     |20    
1002      |  01    |20     |30    
1003      |  01    |30     |40
1001      |  02    |40     |50    
1002      |  02    |50     |60    
1003      |  02    |60     |70
1001      |  03    |70     |80    
1002      |  03    |80     |90    
1003      |  03    |90     |100

现在,基于输入值列 - 我必须根据累积总和或平均值计算新列。让我们考虑一个例子:

cumulative sum on [m1, ......, m10] and 
cumulative avg on [m11, ......., m20] columns 

基于此,我必须计算新列。我已经根据 windows 函数进行了尝试,并且能够计算新列。但是,我的问题是由于数据的大小,我正在使用带有新列的更新数据框一个接一个地进行计算。

我的尝试:

a = [m1, ......, m10]
b = [m11, ......, m20]
rnum = (Window.partitionBy("partner_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0))
for item in a:
   var = n
   df = df.withColumn(var + item[1:], F.sum(item).over(rnum))
for item in b:
   var = n
   df = df.withColumn(var + item[1:], F.avg(item).over(rnum))

输出数据:

customer_id|month_id|m1     |m2    |m11     |m12   |n1   |n2  |n11  |n12
1001       |  01    |10     |20    |10      |20    |10   |20  |10   |20
1002       |  01    |20     |30    |10      |20    |20   |30  |10   |20
1003       |  01    |30     |40    |10      |20    |30   |40  |10   |20
1001       |  02    |40     |50    |10      |20    |50   |35  |10   |20
1002       |  02    |50     |60    |10      |20    |70   |55  |10   |20
1003       |  02    |60     |70    |10      |20    |90   |75  |10   |20
1001       |  03    |70     |80    |10      |20    |120  |75  |10   |20
1002       |  03    |80     |90    |10      |20    |150  |105 |10   |20
1003       |  03    |90     |100   |10      |20    |180  |135 |10   |20

但是,我们可以通过将数据框分成两部分来执行相同的操作,其中一个中的累积总和列和另一个数据框中的累积平均列以及主键,然后执行操作然后合并计算的数据框????

标签: pythondataframepysparkhivepyspark-dataframes

解决方案


根据您的问题,我的理解是您正在尝试拆分操作以并行执行任务并节省时间。

您不必并行化执行,因为当您执行任何操作(例如 collect()、show()、count()、在您创建的数据帧上写入时,执行将在 spark 中自动并行化)。这是由于 spark 的延迟执行

如果您出于其他原因仍想拆分操作,则可以使用线程。下面的文章将为您提供有关 pyspark 中线程的更多信息:https ://medium.com/@everisUS/threads-in-pyspark-a6e8005f6017


推荐阅读