首页 > 解决方案 > Pyspark 数据框列计算

问题描述

我正在学习将 Pyspark 数据框用于某些项目,并且对构建结果集有疑问。下面是一个网格,表示登录到 SaaS 平台的用户的逐秒信息。这显然来自我使用 DF 成功完成的用户登录和注销数据。

第 1 列代表一小时内的秒数,第 2 列代表登录的用户总数,第 3 列代表在给定秒内登录的新用户,第 4 列显示退出的用户。

例如:在第 100 秒,共有 1 个用户登录,因此 In = 1,Out = 0 在第 105 秒,10 个新用户登录,因此总共 = 12 和 In = 10 在第 107 秒,11 个现有用户注销因此 Out = 11 并且总计 = 1

SecondsInHour total       In  Out
100           1           1   0
101           1           0   0
102           1           0   0
103           2           1   0
104           2           0   0
105           12          10  0
106           12          0   0
107           1           0   11

……

我试图得出这个结果的尝试如下所示:

df.groupBy('logged_seconds') \
  .agg({'logged_seconds':'count', 'login_offset':'count', 'logout_offset':'count'}) \
  .show()

这是不正确的。如何获得上述结果?谢谢

更新以添加代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, explode, udf
from pyspark.sql.types import ArrayType, IntegerType

def seconds_range(start_date,end_date):
    start_seconds = start_date.minute * 60 + start_date.second
    end_seconds = end_date.minute * 60 + end_date.second
    return list(range(start_seconds, end_seconds+1))

def to_seconds(date):
    return date.minute * 60 + date.second

spark = SparkSession.builder.appName('MyApp').master('local[2]').getOrCreate()

# register udf function with spark
seconds_range_udf = udf(seconds_range, ArrayType(IntegerType()))
to_seconds_udf = udf(to_seconds, IntegerType())

# create dataframe with sample data.
df1 = spark.createDataFrame([('user1', '2019-12-01 9:02:00', '2019-12-01 09:04:00'),\
    ('user2', '2019-12-01 9:02:30', '2019-12-01 09:04:00'),\
    ('user3', '2019-12-01 9:03:23', '2019-12-01 09:03:50')],\
    ['user', 'login_start_dt', 'login_end_dt'])

# assign data types to the columns
df1 = df1.select(df1.iqcckey,\
    to_timestamp(df1.login_start_dt , 'yyyy-MM-dd HH:mm:ss').alias('login_start_dt '),\
    to_timestamp(df1.login_end_dt, 'yyyy-MM-dd HH:mm:ss').alias('login_end_dt'))

# construct a new column that is an array of seconds logged in.
df2 = df1.\
    withColumn('login_offset', to_seconds_udf('login_start_dt ')).\
    withColumn('logout_offset', to_seconds_udf('login_end_dt')).\
    withColumn('arr_logged_seconds', seconds_range_udf('login_start_dt ', 'login_end_dt'))

# convert the 3rd column (array) into rows
df2 = df2.withColumn('logged_seconds', explode(df2.arr_logged_seconds))

# group, count data
df2.groupBy('user','logged_seconds').agg({'logged_seconds':'count', 'login_offset':'count', 'logout_offset':'count'}).show()

print('End program')

标签: apache-sparkpysparkapache-spark-sql

解决方案


构建您的数据框:

添加了更多数据以进行验证。

data=  [[100,1],
        [101,1],  
        [102,1],
        [103,2],
        [104,2],
        [105,12],
        [106,12],
        [107,1],
        [108,2],
        [109,12],
        [110,2],
        [111,0],
        [112,22],
        [113,17],
        [114,20]]
columns= ['SecondsInHour','Total']
df= spark.createDataFrame(data,columns)
df.show()

+-------------+-----+
|SecondsInHour|Total|
+-------------+-----+
|          100|    1|
|          101|    1|
|          102|    1|
|          103|    2|
|          104|    2|
|          105|   12|
|          106|   12|
|          107|    1|
|          108|    2|
|          109|   12|
|          110|    2|
|          111|    0|
|          112|   22|
|          113|   17|
|          114|   20|
+-------------+-----+

定义窗口并清除第一个值的空值:

基本上从滞后中减去,并对一列使用正值,对另一列使用负值,然后处理每个列的第一个值。

w2= Window().orderBy(df.SecondsInHour)
df.withColumn("lagdiff",(F.col("Total")- F.lag("Total").over(w2)))\
.withColumn("lagdiff2", F.col("Total")- F.lag("Total").over(w2))\
.withColumn("lagdiff3", F.when((F.col("lagdiff2")>0),F.lit(0)).otherwise(F.col("lagdiff2")*-1))\
.withColumn("lagdiff4", F.when((F.col("lagdiff") <0), F.lit(0)).otherwise(F.col("lagdiff")))\
.withColumn("In", F.when(F.col("lagdiff").isNull(), F.col("Total")).otherwise(F.col("lagdiff4")))\
.withColumn("Out",F.when(F.col("lagdiff3").isNull(), F.lit(0)).otherwise(F.col("lagdiff3")))\
.orderBy(df.SecondsInHour.asc())\
.drop("lagdiff","lagdiff2","lagdiff3","lagdiff4").show()

+-------------+-----+---+---+
|SecondsInHour|Total| In|Out|
+-------------+-----+---+---+
|          100|    1|  1|  0|
|          101|    1|  0|  0|
|          102|    1|  0|  0|
|          103|    2|  1|  0|
|          104|    2|  0|  0|
|          105|   12| 10|  0|
|          106|   12|  0|  0|
|          107|    1|  0| 11|
|          108|    2|  1|  0|
|          109|   12| 10|  0|
|          110|    2|  0| 10|
|          111|    0|  0|  2|
|          112|   22| 22|  0|
|          113|   17|  0|  5|
|          114|   20|  3|  0|
+-------------+-----+---+---+

推荐阅读