首页 > 解决方案 > 遍历行以在 PySpark 中创建自定义公式结构

问题描述

我有一个带有变量名称以及分子和分母的数据框。每个变量都是一个比率,例如:

在此处输入图像描述

另一个包含实际数据的数据集来计算属性:

在此处输入图像描述

目标是使用 1st 中的公式创建这些属性并使用 2nd 进行计算。

目前我的方法非常幼稚:

df = df.withColumn("var1", col('a')/col('b'))./
.
.
.

期望的输出:

在此处输入图像描述

由于我有超过 500 个变量,因此欢迎提出任何更聪明的方法来解决这个问题!

标签: apache-sparkpysparkapache-spark-sql

解决方案


这可以通过 PySpark 中的 cross join 、 unpivot 和 pivot 函数来实现。

import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import *

data = [
       ("var1", "a","c"),
       ("var2", "b","d"),
       ("var3", "b","a"),
       ("var4", "d","c")
        ]
        
schema = StructType([
  StructField('name', StringType(),True), \
  StructField('numerator', StringType(),True), \
  StructField('denonminator', StringType(),True)
])

data2 = [
       ("ID1", 6,4,3,7),
       ("ID2", 1,2,3,9)
        ]

schema2 = StructType([
  StructField('ID', StringType(),True), \
  StructField('a', IntegerType(),True), \
  StructField('b', IntegerType(),True),\
  StructField('c', IntegerType(),True), \
  StructField('d', IntegerType(),True)
])

df = spark.createDataFrame(data=data, schema=schema)
df2 = spark.createDataFrame(data=data2, schema=schema2)
df.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
df.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
""" CRoss Join for Duplicating the values """
df3=spark.sql("select * from table1 cross join table2")
df3.createOrReplaceTempView("table3")
""" Unpivoting the values and joining to fecth the value of numerator and denominator"""
cols = df2.columns[1:]
df4=df2.selectExpr('ID', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))
df4.createOrReplaceTempView("table4")
df5=spark.sql("select name,B.ID,round(B.col1/C.col1,2) as value from table3 A left outer join table4 B on A.ID=B.ID and a.numerator=b.col0 left outer join table4 C on A.ID=C.ID and a.denonminator=C.col0 order by name,ID")
""" Pivot for fetching the results """
df_final=df5.groupBy("ID").pivot("name").max("value")

所有中间和最终数据帧的结果

>>> df.show()
+----+---------+------------+
|name|numerator|denonminator|
+----+---------+------------+
|var1|        a|           c|
|var2|        b|           d|
|var3|        b|           a|
|var4|        d|           c|
+----+---------+------------+

>>> df2.show()
+---+---+---+---+---+
| ID|  a|  b|  c|  d|
+---+---+---+---+---+
|ID1|  6|  4|  3|  7|
|ID2|  1|  2|  3|  9|
+---+---+---+---+---+

>>> df3.show()
+----+---------+------------+---+---+---+---+---+
|name|numerator|denonminator| ID|  a|  b|  c|  d|
+----+---------+------------+---+---+---+---+---+
|var1|        a|           c|ID1|  6|  4|  3|  7|
|var2|        b|           d|ID1|  6|  4|  3|  7|
|var1|        a|           c|ID2|  1|  2|  3|  9|
|var2|        b|           d|ID2|  1|  2|  3|  9|
|var3|        b|           a|ID1|  6|  4|  3|  7|
|var4|        d|           c|ID1|  6|  4|  3|  7|
|var3|        b|           a|ID2|  1|  2|  3|  9|
|var4|        d|           c|ID2|  1|  2|  3|  9|
+----+---------+------------+---+---+---+---+---+

>>> df4.show()
+---+----+----+
| ID|col0|col1|
+---+----+----+
|ID1|   a|   6|
|ID1|   b|   4|
|ID1|   c|   3|
|ID1|   d|   7|
|ID2|   a|   1|
|ID2|   b|   2|
|ID2|   c|   3|
|ID2|   d|   9|
+---+----+----+

>>> df5.show()
+----+---+-----+
|name| ID|value|
+----+---+-----+
|var1|ID1|  2.0|
|var1|ID2| 0.33|
|var2|ID1| 0.57|
|var2|ID2| 0.22|
|var3|ID1| 0.67|
|var3|ID2|  2.0|
|var4|ID1| 2.33|
|var4|ID2|  3.0|
+----+---+-----+

>>> df_final.show()                                                             final
+---+----+----+----+----+
| ID|var1|var2|var3|var4|
+---+----+----+----+----+
|ID2|0.33|0.22| 2.0| 3.0|
|ID1| 2.0|0.57|0.67|2.33|
+---+----+----+----+----+

推荐阅读