python - PySpark:将一个df的列与第二个df的行进行比较
问题描述
我想比较两个 PySpark 数据框。
我有具有数百列(Col1、Col2、...、Col800)的 Df1 和具有数百个相应行的 Df2。
Df2 描述了 Df1 中 800 列中每一列的限制值,如果该值太低或太高,那么我想在 Final_Df 中实现结果,在这里我创建一个列Problem
来检查是否有任何列出现的限制。
我考虑过用 pivot 转置 Df2,但它需要一个聚合函数,所以我不确定它是否是一个相关的解决方案。
我也不知道如何加入两个 Dfs 进行比较,因为它们不共享任何公共列。
DF1:
| X | Y | Col1 | Col2 | Col3 |
+-----------+-----------+------+------+------+
| Value_X_1 | Value_Y_1 | 5000 | 250 | 500 |
+-----------+-----------+------+------+------+
| Value_X_2 | Value_Y_2 | 1000 | 30 | 300 |
+-----------+-----------+------+------+------+
| Value_X_3 | Value_Y_3 | 0 | 100 | 100 |
+-----------+-----------+------+------+------+
DF2:
+------+------+-----+
| name | max | min |
+------+------+-----+
| Col1 | 2500 | 0 |
+------+------+-----+
| Col2 | 120 | 0 |
+------+------+-----+
| Col3 | 400 | 0 |
+------+------+-----+
Final_Df(比较后):
+-----------+-----------+------+------+------+---------+
| X | Y | Col1 | Col2 | Col3 | Problem |
+-----------+-----------+------+------+------+---------+
| Value_X_1 | Value_Y_1 | 5000 | 250 | 500 | Yes |
+-----------+-----------+------+------+------+---------+
| Value_X_2 | Value_Y_2 | 1000 | 30 | 300 | No |
+-----------+-----------+------+------+------+---------+
| Value_X_3 | Value_Y_3 | 0 | 100 | 100 | No |
+-----------+-----------+------+------+------+---------+
解决方案
如果df2
不是大数据框,您可以将其转换为字典,然后使用列表推导和when函数检查状态,例如:
from pyspark.sql import functions as F
>>> df1.show()
+---------+---------+----+----+----+
| X| Y|Col1|Col2|Col3|
+---------+---------+----+----+----+
|Value_X_1|Value_Y_1|5000| 250| 500|
|Value_X_2|Value_Y_2|1000| 30| 300|
|Value_X_3|Value_Y_3| 0| 100| 100|
+---------+---------+----+----+----+
>>> df2.show()
+----+----+---+
|name| max|min|
+----+----+---+
|Col1|2500| 0|
|Col2| 120| 0|
|Col3| 400| 0|
+----+----+---+
# concerned columns
cols = df1.columns[2:]
>>> cols
['Col1', 'Col2', 'Col3']
注意:我假设 df1 和 df2.min、df2.max 中的上述列的数据类型已经设置为整数。
从 df2 创建地图:
map1 = { r.name:[r.min, r.max] for r in df2.collect() }
>>> map1
{u'Col1': [0, 2500], u'Col2': [0, 120], u'Col3': [0, 400]}
基于两个 when() 函数添加新字段“问题”,使用列表推导遍历所有相关列
- F.when(df1[c].between(min, max), 0).otherwise(1))
- F.when(sum(...) > 0, 'Yes').otherwise('No')
我们为每个相关列的第一个函数设置一个标志(0 或 1)when()
,然后对该标志求和。如果大于 0,则 Problem = 'Yes',否则为'No':
df_new = df1.withColumn('Problem', F.when(sum([ F.when(df1[c].between(map1[c][0], map1[c][1]), 0).otherwise(1) for c in cols ]) > 0, 'Yes').otherwise('No'))
>>> df_new.show()
+---------+---------+----+----+----+-------+
| X| Y|Col1|Col2|Col3|Problem|
+---------+---------+----+----+----+-------+
|Value_X_1|Value_Y_1|5000| 250| 500| Yes|
|Value_X_2|Value_Y_2|1000| 30| 300| No|
|Value_X_3|Value_Y_3| 0| 100| 100| No|
+---------+---------+----+----+----+-------+
推荐阅读
- flutter - Dart/Flutter - 如何在格式化值时避免由 NumberFormat.compactCurrency(locale: "en_IN").format() 方法完成的自动舍入?
- javascript - 当我出于某种原因组合减速器时,我的 auth 减速器没有附加到商店,那是怎么回事?
- python - 使用 OpenCV 将相机坐标中的点云均匀变换到世界坐标
- swift - UITableView 的委托是 Nil
- android - Android GridLayout — 删除一个布局权重会导致所有其他单元格消失
- r - 如何在 R 中编辑单元格 Shapefile?
- arrays - 为什么我得到 org.json.JSONException: Not a original array?
- html - 由于 css 中的 calc(width),React 应用程序中的“npm run build”失败?
- sql - 加入具有多个值的不同列时获得相同的结果
- pytorch - 如何在 torch.nn.parallel.DistributedDataParallel 中设置环境变量?