apache-spark - 如何在 PySpark 的分组对象中插入一列?
问题描述
如何在分组数据中插入 PySpark 数据框?
例如:
我有一个包含以下列的 PySpark 数据框:
+--------+-------------------+--------+
|webID |timestamp |counts |
+--------+-------------------+--------+
|John |2018-02-01 03:00:00|60 |
|John |2018-02-01 03:03:00|66 |
|John |2018-02-01 03:05:00|70 |
|John |2018-02-01 03:08:00|76 |
|Mo |2017-06-04 01:05:00|10 |
|Mo |2017-06-04 01:07:00|20 |
|Mo |2017-06-04 01:10:00|35 |
|Mo |2017-06-04 01:11:00|40 |
+--------+----------------- -+--------+
我需要在他们自己的时间间隔内每分钟将 John 和 Mo 的计数数据插入一个数据点。我对任何简单的线性插值持开放态度 - 但请注意,我的真实数据是每隔几秒,我想插值到每一秒。
所以结果应该是:
+--------+-------------------+--------+
|webID |timestamp |counts |
+--------+-------------------+--------+
|John |2018-02-01 03:00:00|60 |
|John |2018-02-01 03:01:00|62 |
|John |2018-02-01 03:02:00|64 |
|John |2018-02-01 03:03:00|66 |
|John |2018-02-01 03:04:00|68 |
|John |2018-02-01 03:05:00|70 |
|John |2018-02-01 03:06:00|72 |
|John |2018-02-01 03:07:00|74 |
|John |2018-02-01 03:08:00|76 |
|Mo |2017-06-04 01:05:00|10 |
|Mo |2017-06-04 01:06:00|15 |
|Mo |2017-06-04 01:07:00|20 |
|Mo |2017-06-04 01:08:00|25 |
|Mo |2017-06-04 01:09:00|30 |
|Mo |2017-06-04 01:10:00|35 |
|Mo |2017-06-04 01:11:00|40 |
+--------+----------------- -+--------+
需要将新行添加到我的原始数据框中。寻找 PySpark 解决方案。
解决方案
如果您使用 Python,完成任务的最短方法是使用 udf 重用现有的 Pandas 函数GROUPED_MAP
:
from operator import attrgetter
from pyspark.sql.types import StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType
def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
@pandas_udf(
StructType(sorted(schema, key=attrgetter("name"))),
PandasUDFType.GROUPED_MAP)
def _(pdf):
pdf.set_index(timestamp_col, inplace=True)
pdf = pdf.resample(freq).interpolate()
pdf.ffill(inplace=True)
pdf.reset_index(drop=False, inplace=True)
pdf.sort_index(axis=1, inplace=True)
return pdf
return _
应用于您的数据:
from pyspark.sql.functions import to_timestamp
df = spark.createDataFrame([
("John", "2018-02-01 03:00:00", 60),
("John", "2018-02-01 03:03:00", 66),
("John", "2018-02-01 03:05:00", 70),
("John", "2018-02-01 03:08:00", 76),
("Mo", "2017-06-04 01:05:00", 10),
("Mo", "2017-06-04 01:07:00", 20),
("Mo", "2017-06-04 01:10:00", 35),
("Mo", "2017-06-04 01:11:00", 40),
], ("webID", "timestamp", "counts")).withColumn(
"timestamp", to_timestamp("timestamp")
)
df.groupBy("webID").apply(resample(df.schema, "60S")).show()
它产生
+------+-------------------+-----+
|counts| timestamp|webID|
+------+-------------------+-----+
| 60|2018-02-01 03:00:00| John|
| 62|2018-02-01 03:01:00| John|
| 64|2018-02-01 03:02:00| John|
| 66|2018-02-01 03:03:00| John|
| 68|2018-02-01 03:04:00| John|
| 70|2018-02-01 03:05:00| John|
| 72|2018-02-01 03:06:00| John|
| 74|2018-02-01 03:07:00| John|
| 76|2018-02-01 03:08:00| John|
| 10|2017-06-04 01:05:00| Mo|
| 15|2017-06-04 01:06:00| Mo|
| 20|2017-06-04 01:07:00| Mo|
| 25|2017-06-04 01:08:00| Mo|
| 30|2017-06-04 01:09:00| Mo|
| 35|2017-06-04 01:10:00| Mo|
| 40|2017-06-04 01:11:00| Mo|
+------+-------------------+-----+
这是在单个节点的输入和插值数据都可以放入单个节点的内存中的假设下工作的webID
(通常其他精确的非迭代解决方案将不得不做出类似的假设)。如果不是这种情况,您可以通过重叠窗口轻松近似
partial = (df
.groupBy("webID", window("timestamp", "5 minutes", "3 minutes")["start"])
.apply(resample(df.schema, "60S")))
并汇总最终结果
from pyspark.sql.functions import mean
(partial
.groupBy("webID", "timestamp")
.agg(mean("counts")
.alias("counts"))
# Order by key and timestamp, only for consistent presentation
.orderBy("webId", "timestamp")
.show())
这当然要昂贵得多(有两次洗牌,并且某些值将被计算多次),但如果重叠不足以包括下一次观察,也会留下间隙。
+-----+-------------------+------+
|webID| timestamp|counts|
+-----+-------------------+------+
| John|2018-02-01 03:00:00| 60.0|
| John|2018-02-01 03:01:00| 62.0|
| John|2018-02-01 03:02:00| 64.0|
| John|2018-02-01 03:03:00| 66.0|
| John|2018-02-01 03:04:00| 68.0|
| John|2018-02-01 03:05:00| 70.0|
| John|2018-02-01 03:08:00| 76.0|
| Mo|2017-06-04 01:05:00| 10.0|
| Mo|2017-06-04 01:06:00| 15.0|
| Mo|2017-06-04 01:07:00| 20.0|
| Mo|2017-06-04 01:08:00| 25.0|
| Mo|2017-06-04 01:09:00| 30.0|
| Mo|2017-06-04 01:10:00| 35.0|
| Mo|2017-06-04 01:11:00| 40.0|
+-----+-------------------+------+
推荐阅读
- python - json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) 有时在 Python 中使用 googletrans 和 Pandas 会发生错误
- bash - 如何防止 GNU diff 对补丁行进行分组?
- multithreading - ZeroMQ:线程在 while(1) 循环中恰好在 2 小时后停止。为什么以及如何解决这个问题?
- c# - 从 SQL 中选择唯一行并排除某些行 Mvc c#
- java - Java 1.4 的全局异常处理程序
- c - 使用 void 指针发送浮点数组时出现问题
- sql - 有没有办法从合并语句中检索值?
- javascript - 在 React Js 中将数组迭代为表单元素的问题
- javascript - 使用 expo 创建新的反应原生应用程序的问题
- python - 关于 Python Web 自动化的菜鸟问题