apache-spark - 按时差过滤 pyspark
问题描述
我在 pyspark 中有一个如下所示的数据框:
+----------+-------------------+-------+-----------------------+-----------------------+--------+
|Session_Id|Instance_Id |Actions|Start_Date |End_Date |Duration|
+----------+-------------------+-------+-----------------------+-----------------------+--------+
|14252203 |i-051fc2d21fbe001e3|2 |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0 |
|43024091 |i-051fc2d21fbe001e3|2 |2019-12-17 01:08:00.000|2019-12-17 01:08:00.000|0 |
|50961995 |i-0c733c7e356bc1615|2 |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0 |
|56308963 |i-0c733c7e356bc1615|2 |2019-12-17 01:08:00.000|2019-12-17 01:08:00.000|0 |
|60120472 |i-0c733c7e356bc1615|2 |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0 |
|69132492 |i-051fc2d21fbe001e3|2 |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0 |
+----------+-------------------+-------+-----------------------+-----------------------+--------+
我正在尝试过滤任何最近的行:
now = datetime.datetime.now()
filtered = grouped.filter(f.abs(f.unix_timestamp(now) - f.unix_timestamp(datetime.datetime.strptime(f.col('End_Date')[:-4], '%Y-%m-%d %H:%M:%S'))) > 100)
它转换End_Date
为时间戳并计算从现在到现在的差异End_Date
并过滤不到 100 秒的任何内容。我根据两列之间的时间差从过滤 pyspark 数据帧中得到
每次我运行这个,我都会收到这个错误:
TypeError: Invalid argument, not a string or column: 2019-12-19 18:55:13.268489 of type <type 'datetime.datetime'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
如何通过比较时间戳进行过滤?
解决方案
我认为您在 Python 函数和 Spark 之间感到困惑。unix_timestamp
函数需要一个字符串或 Column 对象,但您传递的是 Python datetime 对象,这就是您收到该错误的原因。
而是使用 Spark 内置函数:current_date
它为您提供具有当前日期值的列并将列to_date
转换End_Date
为日期。
这应该适合您:
filtered = grouped.filter(abs(unix_timestamp(current_date()) - unix_timestamp(to_date(col('End_Date'), 'yyyy-MM-dd HH:mm:ss'))) > 100)
推荐阅读
- pandas - 如何在熊猫中有字符时将列值从字符串转换为浮点数
- java - 如何在 .wav 文件中写入 ByteArrayOutput
- python - Hadoop Streaming 无法运行 python
- javascript - 如何将值传递给 sweetalert?
- android - 如何在 Android Xamarin.Forms 中获取附近 WiFi 网络的列表?
- outlook - 自动将发送电子邮件另存为 Adobe PDF 并保存在文件夹中
- python - 如何在元组列表中选择具有特定值的元素
- java - 对 CSV 文件中的重复项进行分组并根据某些值对数据进行排名
- python - 无法在熊猫python中删除CSV中具有不同表格格式的行
- ceph - 为什么 Ceph 通过对象哈希而不是 CRUSH 算法计算 PG ID?