python - PySpark - 使用 GroupBy 有条件地创建列
问题描述
我需要使用第二个 pyspark 数据框基于日期条件为每个 ID 创建一个指标。
指示器为 1 或 0,表示传感器发生故障。该指标取决于具有第一个失败日期和最后一个失败日期的第二个数据帧。如果故障记录在 中fail_df
,则main_df
行在第一个和最后一个记录的故障之间应该有一个 1 fail_df
。当传感器在日期内没有失败时main_df
,它应该存储一个值 0。
main_df 数据框
ID | Date |Value
-------------------------------------------------
P1 | 2016-10-01 |100
P1 | 2016-10-02 |200
P1 | 2016-12-16 |700
P1 | 2016-12-17 |800
P1 | 2016-12-18 |800
P2 | 2016-01-31 |700
P2 | 2016-02-01 |800
P2 | 2016-02-02 |900
失败列表数据框
ID | First Fail Date | Last Fail Date
-----------------------------------------------------
P1 | 2016-10-01 |2016-10-02 |
P2 | 2016-01-31 |2016-02-01 |
所需的数据框
ID | Date |Value | Failure_Indicator
-------------------------------------------------
P1 | 2016-10-01 |100 | 1
P1 | 2016-10-02 |200 | 1
P1 | 2016-12-16 |700 | 0
P1 | 2016-12-17 |800 | 0
P1 | 2016-12-18 |800 | 0
P2 | 2016-01-31 |700 | 1
P2 | 2016-02-01 |800 | 1
P2 | 2016-02-02 |900 | 0
我试过的:AttributeError:'GroupedData'对象没有属性'withColumn'
df = main_df.groupBy('ID').withColumn(
'Failure_Indicator',
F.when((fail_df.col("First fail Date") >= main_df.Date) &
(fail_df.col("Last fail Date") >= main_df.Date), 1)
.otherwise(0))
解决方案
df_1=spark.createDataFrame([("P1", "2016-10-01", 100), ("P1", "2016-10-03", 200), ("P3", "2016-10-09", 200)], ["id", "date", "value"])
+---+----------+-----+
| id| date|value|
+---+----------+-----+
| P1|2016-10-01| 100|
| P1|2016-10-03| 200|
| P3|2016-10-09| 200|
+---+----------+-----+
df_2=spark.createDataFrame([("P1", "2016-10-01", "2016-10-02")], ["id", "start_date", "end_date"])
+---+----------+----------+
| id|start_date| end_date|
+---+----------+----------+
| P1|2016-10-01|2016-10-02|
+---+----------+----------+
df_1.join(df_2, 'id', 'left_outer') \
.withColumn('failure_indicator', when((col("date") >= col("start_date")) & (col("date") <= col("end_date")), 1).otherwise(0)) \
.select('id', 'date', 'value', 'failure_indicator') \
.show()
+---+----------+-----+-----------------+
| id| date|value|failure_indicator|
+---+----------+-----+-----------------+
| P3|2016-10-09| 200| 0|
| P1|2016-10-01| 100| 1|
| P1|2016-10-03| 200| 0|
+---+----------+-----+-----------------+
推荐阅读
- node.js - 导入模块时大括号的必要性
- php - mysqli中的转义字符串
- nginx - Nginx:代理通过流星应用程序作为目录
- javascript - Chart JS - 基于时区的 X 轴
- html - 阻止欧盟国家通过 .htaccess 访问我的网站?
- c - C:数组的方括号之间可以有字符吗?
- javascript - 为什么我的 HTTP 请求会收到未定义的响应?
- typescript - Inferno + TypeScript 同构应用
- ios - 关闭应用程序后,Safari IOS 不会保留使用 JavaScript 设置的 Cookie
- database - 如何使用 Firebase 构建数据库