首页 > 解决方案 > PySpark:将一个df的列与第二个df的行进行比较

问题描述

我想比较两个 PySpark 数据框。

我有具有数百列(Col1、Col2、...、Col800)的 Df1 和具有数百个相应行的 Df2。

Df2 描述了 Df1 中 800 列中每一列的限制值,如果该值太低或太高,那么我想在 Final_Df 中实现结果,在这里我创建一个列Problem来检查是否有任何列出现的限制。

我考虑过用 pivot 转置 Df2,但它需要一个聚合函数,所以我不确定它是否是一个相关的解决方案。

我也不知道如何加入两个 Dfs 进行比较,因为它们不共享任何公共列。

DF1:

| X         | Y         | Col1 | Col2 | Col3 |
+-----------+-----------+------+------+------+
| Value_X_1 | Value_Y_1 | 5000 | 250  | 500  |
+-----------+-----------+------+------+------+
| Value_X_2 | Value_Y_2 | 1000 | 30   | 300  |
+-----------+-----------+------+------+------+
| Value_X_3 | Value_Y_3 | 0    | 100  | 100  |
+-----------+-----------+------+------+------+

DF2:

+------+------+-----+
| name | max  | min |
+------+------+-----+
| Col1 | 2500 | 0   |
+------+------+-----+
| Col2 | 120  | 0   |
+------+------+-----+
| Col3 | 400  | 0   |
+------+------+-----+

Final_Df(比较后):

+-----------+-----------+------+------+------+---------+
| X         | Y         | Col1 | Col2 | Col3 | Problem |
+-----------+-----------+------+------+------+---------+
| Value_X_1 | Value_Y_1 | 5000 | 250  | 500  | Yes     |
+-----------+-----------+------+------+------+---------+
| Value_X_2 | Value_Y_2 | 1000 | 30   | 300  | No      |
+-----------+-----------+------+------+------+---------+
| Value_X_3 | Value_Y_3 | 0    | 100  | 100  | No      |
+-----------+-----------+------+------+------+---------+

标签: pythonapache-sparkpyspark

解决方案


如果df2不是大数据框,您可以将其转换为字典,然后使用列表推导和when函数检查状态,例如:

from pyspark.sql import functions as F

>>> df1.show()
+---------+---------+----+----+----+
|        X|        Y|Col1|Col2|Col3|
+---------+---------+----+----+----+
|Value_X_1|Value_Y_1|5000| 250| 500|
|Value_X_2|Value_Y_2|1000|  30| 300|
|Value_X_3|Value_Y_3|   0| 100| 100|
+---------+---------+----+----+----+

>>> df2.show()
+----+----+---+
|name| max|min|
+----+----+---+
|Col1|2500|  0|
|Col2| 120|  0|
|Col3| 400|  0|
+----+----+---+

# concerned columns
cols = df1.columns[2:]
>>> cols
['Col1', 'Col2', 'Col3']

注意:我假设 df1 和 df2.min、df2.max 中的上述列的数据类型已经设置为整数。

从 df2 创建地图:

map1 = { r.name:[r.min, r.max] for r in df2.collect() }

>>> map1
{u'Col1': [0, 2500], u'Col2': [0, 120], u'Col3': [0, 400]}

基于两个 when() 函数添加新字段“问题”,使用列表推导遍历所有相关列

  • F.when(df1[c].between(min, max), 0).otherwise(1))
  • F.when(sum(...) > 0, 'Yes').otherwise('No')

我们为每个相关列的第一个函数设置一个标志(0 或 1)when(),然后对该标志求和。如果大于 0,则 Problem = 'Yes',否则为'No':

df_new = df1.withColumn('Problem', F.when(sum([ F.when(df1[c].between(map1[c][0], map1[c][1]), 0).otherwise(1) for c in cols ]) > 0, 'Yes').otherwise('No'))

>>> df_new.show()
+---------+---------+----+----+----+-------+
|        X|        Y|Col1|Col2|Col3|Problem|
+---------+---------+----+----+----+-------+
|Value_X_1|Value_Y_1|5000| 250| 500|    Yes|
|Value_X_2|Value_Y_2|1000|  30| 300|     No|
|Value_X_3|Value_Y_3|   0| 100| 100|     No|
+---------+---------+----+----+----+-------+

推荐阅读