首页 > 解决方案 > PySpark:获取最近 3 天的数据

问题描述

给定一个带有日期的数据框,我只想获取数据框中最近 3 天的行

|id|date|
|1|2019-12-01|
|2|2019-11-30|
|3|2019-11-29|
|4|2019-11-28|
|5|2019-12-01|

应该返回

|id|date|
|1|2019-12-01|
|2|2019-11-30|
|3|2019-11-29|
|5|2019-12-01|

一直在尝试使用此代码,但出现错误

df = sqlContext.createDataFrame([
    (1, '/raw/gsec/qradar/flows/dt=2019-12-01/hour=00/1585218406613_flows_20191201_00.jsonl'),
    (2, '/raw/gsec/qradar/flows/dt=2019-11-30/hour=00/1585218406613_flows_20191201_00.jsonl'),
    (3, '/raw/gsec/qradar/flows/dt=2019-11-29/hour=00/1585218406613_flows_20191201_00.jsonl'),
    (4, '/raw/gsec/qradar/flows/dt=2019-11-28/hour=00/1585218406613_flows_20191201_00.jsonl'),
    (5, '/raw/gsec/qradar/flows/dt=2019-11-27/hour=00/1585218406613_flows_20191201_00.jsonl')
], ['id','partition'])

df = df.withColumn('date', F.regexp_extract('partition', '[0-9]{4}-[0-9]{2}-[0-9]{2}', 0))
dates = df.select('date').orderBy(F.desc('date')).distinct().limit(3).collect()

df.filter(df.date.isin(F.lit(dates))).show(10,False)

我得到的错误是

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [[2019-12-01], [2019-11-30], [2019-11-29]]
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
    at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
    at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
    at scala.util.Try.getOrElse(Try.scala:79)
    at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:163)
    at org.apache.spark.sql.functions$.typedLit(functions.scala:127)
    at org.apache.spark.sql.functions$.lit(functions.scala:110)
    at org.apache.spark.sql.functions.lit(functions.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

标签: pyspark

解决方案


您可以使用collect_list而不是collect转换dates为单元素数组并将其传递给isinlit函数。

dates = df.select('date').distinct().orderBy(F.desc('date')).limit(3).select(F.collect_list('date').alias('dates'))[0]

# ['2019-12-01', '2019-11-30', '2019-11-29']

df.filter(df.date.isin(dates)).show(10,False)
+---+----------------------------------------------------------------------------------+----------+
|id |partition                                                                         |date      |
+---+----------------------------------------------------------------------------------+----------+
|1  |/raw/gsec/qradar/flows/dt=2019-12-01/hour=00/1585218406613_flows_20191201_00.jsonl|2019-12-01|
|2  |/raw/gsec/qradar/flows/dt=2019-11-30/hour=00/1585218406613_flows_20191201_00.jsonl|2019-11-30|
|3  |/raw/gsec/qradar/flows/dt=2019-11-29/hour=00/1585218406613_flows_20191201_00.jsonl|2019-11-29|
+---+----------------------------------------------------------------------------------+----------+

获取前 n 个函数

def get_top(df, column ,n):
     dates = df.select(column).distinct().orderBy(F.desc(column)).limit(n).select(F.collect_list(column)).first()[0]
     return dates

dates = get_top(df, 'date', 3)
# ['2019-12-01', '2019-11-30', '2019-11-29']

推荐阅读