pyspark - PySpark:获取最近 3 天的数据
问题描述
给定一个带有日期的数据框,我只想获取数据框中最近 3 天的行
|id|date|
|1|2019-12-01|
|2|2019-11-30|
|3|2019-11-29|
|4|2019-11-28|
|5|2019-12-01|
应该返回
|id|date|
|1|2019-12-01|
|2|2019-11-30|
|3|2019-11-29|
|5|2019-12-01|
一直在尝试使用此代码,但出现错误
df = sqlContext.createDataFrame([
(1, '/raw/gsec/qradar/flows/dt=2019-12-01/hour=00/1585218406613_flows_20191201_00.jsonl'),
(2, '/raw/gsec/qradar/flows/dt=2019-11-30/hour=00/1585218406613_flows_20191201_00.jsonl'),
(3, '/raw/gsec/qradar/flows/dt=2019-11-29/hour=00/1585218406613_flows_20191201_00.jsonl'),
(4, '/raw/gsec/qradar/flows/dt=2019-11-28/hour=00/1585218406613_flows_20191201_00.jsonl'),
(5, '/raw/gsec/qradar/flows/dt=2019-11-27/hour=00/1585218406613_flows_20191201_00.jsonl')
], ['id','partition'])
df = df.withColumn('date', F.regexp_extract('partition', '[0-9]{4}-[0-9]{2}-[0-9]{2}', 0))
dates = df.select('date').orderBy(F.desc('date')).distinct().limit(3).collect()
df.filter(df.date.isin(F.lit(dates))).show(10,False)
我得到的错误是
Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [[2019-12-01], [2019-11-30], [2019-11-29]]
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78)
at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:164)
at scala.util.Try.getOrElse(Try.scala:79)
at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:163)
at org.apache.spark.sql.functions$.typedLit(functions.scala:127)
at org.apache.spark.sql.functions$.lit(functions.scala:110)
at org.apache.spark.sql.functions.lit(functions.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
解决方案
您可以使用collect_list
而不是collect
转换dates
为单元素数组并将其传递给isin
无lit
函数。
dates = df.select('date').distinct().orderBy(F.desc('date')).limit(3).select(F.collect_list('date').alias('dates'))[0]
# ['2019-12-01', '2019-11-30', '2019-11-29']
df.filter(df.date.isin(dates)).show(10,False)
+---+----------------------------------------------------------------------------------+----------+
|id |partition |date |
+---+----------------------------------------------------------------------------------+----------+
|1 |/raw/gsec/qradar/flows/dt=2019-12-01/hour=00/1585218406613_flows_20191201_00.jsonl|2019-12-01|
|2 |/raw/gsec/qradar/flows/dt=2019-11-30/hour=00/1585218406613_flows_20191201_00.jsonl|2019-11-30|
|3 |/raw/gsec/qradar/flows/dt=2019-11-29/hour=00/1585218406613_flows_20191201_00.jsonl|2019-11-29|
+---+----------------------------------------------------------------------------------+----------+
获取前 n 个函数
def get_top(df, column ,n):
dates = df.select(column).distinct().orderBy(F.desc(column)).limit(n).select(F.collect_list(column)).first()[0]
return dates
dates = get_top(df, 'date', 3)
# ['2019-12-01', '2019-11-30', '2019-11-29']
推荐阅读
- java - If My application Installed in device then after Any Application install from play store then show Notification using my application
- c - Printing multiple variables in single printf() statement
- symfony - Expected value of type for association field , got "double" instead
- oauth-2.0 - Cortana 技能 Oauth2 错误 AADSTS90014:请求正文必须包含以下参数:“范围”
- node.js - npm 运行生产错误我无法破译
- matlab - 如何在 Matlab 中创建动态约束
- java - 生成连通图 java
- python - 如何在 python 中加载 R 的 .rda 神经网络模型
- python - 直接通过 Python Shell 从文件中读取
- audio - WAV“数据”子块是如何构成的?