apache-spark - Pyspark 数据框列计算
问题描述
我正在学习将 Pyspark 数据框用于某些项目,并且对构建结果集有疑问。下面是一个网格,表示登录到 SaaS 平台的用户的逐秒信息。这显然来自我使用 DF 成功完成的用户登录和注销数据。
第 1 列代表一小时内的秒数,第 2 列代表登录的用户总数,第 3 列代表在给定秒内登录的新用户,第 4 列显示退出的用户。
例如:在第 100 秒,共有 1 个用户登录,因此 In = 1,Out = 0 在第 105 秒,10 个新用户登录,因此总共 = 12 和 In = 10 在第 107 秒,11 个现有用户注销因此 Out = 11 并且总计 = 1
SecondsInHour total In Out
100 1 1 0
101 1 0 0
102 1 0 0
103 2 1 0
104 2 0 0
105 12 10 0
106 12 0 0
107 1 0 11
……
我试图得出这个结果的尝试如下所示:
df.groupBy('logged_seconds') \
.agg({'logged_seconds':'count', 'login_offset':'count', 'logout_offset':'count'}) \
.show()
这是不正确的。如何获得上述结果?谢谢
更新以添加代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, explode, udf
from pyspark.sql.types import ArrayType, IntegerType
def seconds_range(start_date,end_date):
start_seconds = start_date.minute * 60 + start_date.second
end_seconds = end_date.minute * 60 + end_date.second
return list(range(start_seconds, end_seconds+1))
def to_seconds(date):
return date.minute * 60 + date.second
spark = SparkSession.builder.appName('MyApp').master('local[2]').getOrCreate()
# register udf function with spark
seconds_range_udf = udf(seconds_range, ArrayType(IntegerType()))
to_seconds_udf = udf(to_seconds, IntegerType())
# create dataframe with sample data.
df1 = spark.createDataFrame([('user1', '2019-12-01 9:02:00', '2019-12-01 09:04:00'),\
('user2', '2019-12-01 9:02:30', '2019-12-01 09:04:00'),\
('user3', '2019-12-01 9:03:23', '2019-12-01 09:03:50')],\
['user', 'login_start_dt', 'login_end_dt'])
# assign data types to the columns
df1 = df1.select(df1.iqcckey,\
to_timestamp(df1.login_start_dt , 'yyyy-MM-dd HH:mm:ss').alias('login_start_dt '),\
to_timestamp(df1.login_end_dt, 'yyyy-MM-dd HH:mm:ss').alias('login_end_dt'))
# construct a new column that is an array of seconds logged in.
df2 = df1.\
withColumn('login_offset', to_seconds_udf('login_start_dt ')).\
withColumn('logout_offset', to_seconds_udf('login_end_dt')).\
withColumn('arr_logged_seconds', seconds_range_udf('login_start_dt ', 'login_end_dt'))
# convert the 3rd column (array) into rows
df2 = df2.withColumn('logged_seconds', explode(df2.arr_logged_seconds))
# group, count data
df2.groupBy('user','logged_seconds').agg({'logged_seconds':'count', 'login_offset':'count', 'logout_offset':'count'}).show()
print('End program')
解决方案
构建您的数据框:
添加了更多数据以进行验证。
data= [[100,1],
[101,1],
[102,1],
[103,2],
[104,2],
[105,12],
[106,12],
[107,1],
[108,2],
[109,12],
[110,2],
[111,0],
[112,22],
[113,17],
[114,20]]
columns= ['SecondsInHour','Total']
df= spark.createDataFrame(data,columns)
df.show()
+-------------+-----+
|SecondsInHour|Total|
+-------------+-----+
| 100| 1|
| 101| 1|
| 102| 1|
| 103| 2|
| 104| 2|
| 105| 12|
| 106| 12|
| 107| 1|
| 108| 2|
| 109| 12|
| 110| 2|
| 111| 0|
| 112| 22|
| 113| 17|
| 114| 20|
+-------------+-----+
定义窗口并清除第一个值的空值:
基本上从滞后中减去,并对一列使用正值,对另一列使用负值,然后处理每个列的第一个值。
w2= Window().orderBy(df.SecondsInHour)
df.withColumn("lagdiff",(F.col("Total")- F.lag("Total").over(w2)))\
.withColumn("lagdiff2", F.col("Total")- F.lag("Total").over(w2))\
.withColumn("lagdiff3", F.when((F.col("lagdiff2")>0),F.lit(0)).otherwise(F.col("lagdiff2")*-1))\
.withColumn("lagdiff4", F.when((F.col("lagdiff") <0), F.lit(0)).otherwise(F.col("lagdiff")))\
.withColumn("In", F.when(F.col("lagdiff").isNull(), F.col("Total")).otherwise(F.col("lagdiff4")))\
.withColumn("Out",F.when(F.col("lagdiff3").isNull(), F.lit(0)).otherwise(F.col("lagdiff3")))\
.orderBy(df.SecondsInHour.asc())\
.drop("lagdiff","lagdiff2","lagdiff3","lagdiff4").show()
+-------------+-----+---+---+
|SecondsInHour|Total| In|Out|
+-------------+-----+---+---+
| 100| 1| 1| 0|
| 101| 1| 0| 0|
| 102| 1| 0| 0|
| 103| 2| 1| 0|
| 104| 2| 0| 0|
| 105| 12| 10| 0|
| 106| 12| 0| 0|
| 107| 1| 0| 11|
| 108| 2| 1| 0|
| 109| 12| 10| 0|
| 110| 2| 0| 10|
| 111| 0| 0| 2|
| 112| 22| 22| 0|
| 113| 17| 0| 5|
| 114| 20| 3| 0|
+-------------+-----+---+---+
推荐阅读
- mdx - MDX 方差平行期
- r - 在一定数量的字符后使用 tidyr 中的单独分隔列?
- python - 加载前即时裁剪视频帧
- memcpy - 使用 memcpy 的 DPDK VLAN 条带代码导致数据损坏
- c# - Kinect v2 在 Windows 10 上不起作用(风扇不跨越)
- java - 在 Eclipse 中创建 Flink 项目
- ubuntu - 在 ubuntu wsl 上安装 Intellij idea 时出错
- php - 如何将条带计划 json 响应放入数组中。我从条带中得到正确的响应,就像这样的 json
- swift - 为什么在解析 swift JSON 时出现此错误?
- android - TableView 中编辑器控件的 Xamarin.Forms 滚动在 Android 中不起作用