首页 > 解决方案 > 按时差过滤 pyspark

问题描述

我在 pyspark 中有一个如下所示的数据框:

+----------+-------------------+-------+-----------------------+-----------------------+--------+
|Session_Id|Instance_Id        |Actions|Start_Date             |End_Date               |Duration|
+----------+-------------------+-------+-----------------------+-----------------------+--------+
|14252203  |i-051fc2d21fbe001e3|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
|43024091  |i-051fc2d21fbe001e3|2      |2019-12-17 01:08:00.000|2019-12-17 01:08:00.000|0       |
|50961995  |i-0c733c7e356bc1615|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
|56308963  |i-0c733c7e356bc1615|2      |2019-12-17 01:08:00.000|2019-12-17 01:08:00.000|0       |
|60120472  |i-0c733c7e356bc1615|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
|69132492  |i-051fc2d21fbe001e3|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
+----------+-------------------+-------+-----------------------+-----------------------+--------+

我正在尝试过滤任何最近的行:

now = datetime.datetime.now()

filtered = grouped.filter(f.abs(f.unix_timestamp(now) - f.unix_timestamp(datetime.datetime.strptime(f.col('End_Date')[:-4], '%Y-%m-%d %H:%M:%S'))) > 100)

它转换End_Date为时间戳并计算从现在到现在的差异End_Date并过滤不到 100 秒的任何内容。我根据两列之间的时间差从过滤 pyspark 数据帧中得到

每次我运行这个,我都会收到这个错误:

TypeError: Invalid argument, not a string or column: 2019-12-19 18:55:13.268489 of type <type 'datetime.datetime'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

如何通过比较时间戳进行过滤?

标签: apache-sparkpysparkpyspark-sqlpyspark-dataframes

解决方案


我认为您在 Python 函数和 Spark 之间感到困惑。unix_timestamp函数需要一个字符串或 Column 对象,但您传递的是 Python datetime 对象,这就是您收到该错误的原因。

而是使用 Spark 内置函数:current_date它为您提供具有当前日期值的列并将列to_date转换End_Date为日期。

这应该适合您:

filtered = grouped.filter(abs(unix_timestamp(current_date()) - unix_timestamp(to_date(col('End_Date'), 'yyyy-MM-dd HH:mm:ss'))) > 100)

推荐阅读