首页 > 解决方案 > PySpark - 使用 GroupBy 有条件地创建列

问题描述

我需要使用第二个 pyspark 数据框基于日期条件为每个 ID 创建一个指标。

指示器为 1 或 0,表示传感器发生故障。该指标取决于具有第一个失败日期和最后一个失败日期的第二个数据帧。如果故障记录在 中fail_df,则main_df行在第一个和最后一个记录的故障之间应该有一个 1 fail_df。当传感器在日期内没有失败时main_df,它应该存储一个值 0。

main_df 数据框

ID         |  Date      |Value 
-------------------------------------------------
P1         | 2016-10-01 |100
P1         | 2016-10-02 |200
P1         | 2016-12-16 |700
P1         | 2016-12-17 |800
P1         | 2016-12-18 |800
P2         | 2016-01-31 |700
P2         | 2016-02-01 |800
P2         | 2016-02-02 |900

失败列表数据框

ID         |  First Fail Date      | Last Fail Date
-----------------------------------------------------
P1         | 2016-10-01            |2016-10-02  |
P2         | 2016-01-31            |2016-02-01  |

所需的数据框

ID         |  Date      |Value  | Failure_Indicator     
-------------------------------------------------
P1         | 2016-10-01 |100    | 1
P1         | 2016-10-02 |200    | 1
P1         | 2016-12-16 |700    | 0
P1         | 2016-12-17 |800    | 0
P1         | 2016-12-18 |800    | 0
P2         | 2016-01-31 |700    | 1
P2         | 2016-02-01 |800    | 1
P2         | 2016-02-02 |900    | 0

我试过的:AttributeError:'GroupedData'对象没有属性'withColumn'

df = main_df.groupBy('ID').withColumn(
    'Failure_Indicator',
    F.when((fail_df.col("First fail Date") >= main_df.Date) & 
           (fail_df.col("Last fail Date") >= main_df.Date), 1)
     .otherwise(0))

标签: pythonapache-sparkif-statementpysparkgroup-by

解决方案


df_1=spark.createDataFrame([("P1", "2016-10-01", 100), ("P1", "2016-10-03", 200), ("P3", "2016-10-09", 200)], ["id", "date", "value"])

+---+----------+-----+
| id|      date|value|
+---+----------+-----+
| P1|2016-10-01|  100|
| P1|2016-10-03|  200|
| P3|2016-10-09|  200|
+---+----------+-----+

df_2=spark.createDataFrame([("P1", "2016-10-01", "2016-10-02")], ["id", "start_date", "end_date"])

+---+----------+----------+
| id|start_date|  end_date|
+---+----------+----------+
| P1|2016-10-01|2016-10-02|
+---+----------+----------+


df_1.join(df_2, 'id', 'left_outer') \
    .withColumn('failure_indicator', when((col("date") >= col("start_date")) & (col("date") <= col("end_date")), 1).otherwise(0)) \
    .select('id', 'date', 'value', 'failure_indicator') \
    .show()

+---+----------+-----+-----------------+
| id|      date|value|failure_indicator|
+---+----------+-----+-----------------+
| P3|2016-10-09|  200|                0|
| P1|2016-10-01|  100|                1|
| P1|2016-10-03|  200|                0|
+---+----------+-----+-----------------+

推荐阅读