首页 > 解决方案 > Spark SQL 场景没有给出正确的结果

问题描述

我有一个 df 如下:

  1. 手机号码在哪里different

    |applicantkey|     first_reg_date|utmcontent| latest_signin_date|mobilenumber|
     +------------+-------------------+----------+-------------------+------------+
     |        1234|2021-01-03 06:05:43|   Android|2021-01-03 06:05:43|         987|
     |        1234|2021-04-03 07:05:43|   Android|2021-10-03 06:05:43|         986|
     +------------+-------------------+----------+-------------------+------------+
    
  2. 手机号码在哪里same

    |applicantkey|     first_reg_date|utmcontent| latest_signin_date|mobilenumber|
     +------------+-------------------+----------+-------------------+------------+
     |        1234|2021-01-03 06:05:43|   Android|2021-01-03 06:05:43|         987|
     |        1234|2021-04-03 07:05:43|   Android|2021-10-03 06:05:43|         987|
     +------------+-------------------+----------+-------------------+------------+
    

现在,我想获取minoffirst_reg_datemaxoflatest_signin_date并替换数据集中这两列的值。所以我的预期输出应该如下所示:

+------------+-------------------+----------+-------------------+------------+
|applicantkey|first_reg_date     |utmcontent|latest_signin_date |mobilenumber|
+------------+-------------------+----------+-------------------+------------+
|1234        |2021-01-03 06:05:43|Android   |2021-10-03 06:05:43|987         |
|1234        |2021-01-03 06:05:43|Android   |2021-10-03 06:05:43|986         |

+------------+-------------------+----------+-------------------+------------+  

我尝试了以下查询,但它给出的输出如下所示:

spark.sql(
    "select applicantkey,min(first_reg_date) first_reg_date,utmcontent,max(latest_signin_date) latest_signin_date,mobilenumber from df group by applicantkey,utmcontent,mobilenumber").show(truncate=False)

+------------+-------------------+----------+-------------------+------------+
|applicantkey|first_reg_date     |utmcontent|latest_signin_date |mobilenumber|
+------------+-------------------+----------+-------------------+------------+
|1234        |2021-01-03 06:05:43|Android   |2021-01-03 06:05:43|987         |
|1234        |2021-04-03 07:05:43|Android   |2021-10-03 06:05:43|986         |
+------------+-------------------+----------+-------------------+------------+  

AND

+------------+-------------------+----------+-------------------+------------+
|applicantkey|first_reg_date     |utmcontent|latest_signin_date |mobilenumber|
+------------+-------------------+----------+-------------------+------------+
|1234        |2021-01-03 06:05:43|Android   |2021-10-03 06:05:43|987         |
+------------+-------------------+----------+-------------------+------------+

第二个输出是正确的,但第一个输出是错误的。

因此,我尝试了以下方式,它帮助我获得了正确的结果,但是当手机号码相同时,我得到了重复:

df1 = spark.sql(
    "select applicantkey,min(first_reg_date) first_reg_date, max(latest_signin_date) latest_signin_date from df group by applicantkey")
df2 = spark.sql("select applicantkey,utmcontent,mobilenumber from df")
df3 = df1.join(df2, "applicantkey", "left_outer")
df3.show(truncate=False)  

+------------+-------------------+-------------------+----------+------------+
|applicantkey|first_reg_date     |latest_signin_date |utmcontent|mobilenumber|
+------------+-------------------+-------------------+----------+------------+
|1234        |2021-01-03 06:05:43|2021-10-03 06:05:43|Android   |987         |
|1234        |2021-01-03 06:05:43|2021-10-03 06:05:43|Android   |987         |
+------------+-------------------+-------------------+----------+------------+  

DISTINCT()最后不想用 那么,我到底做错了什么?

标签: sqlapache-sparkpysparkapache-spark-sql

解决方案


根据您的评论,您希望为数据框中的所有行选择min(first_reg_date)and max(latest_signin_date)per applicantkey。您可以通过window functions. 我们将applicantkey按此分区中的每一行对数据帧进行分区,我们将填充min(first_reg_date)and的值max(latest_signin_date);我们将保持其他列不变。

最小工作示例

from datetime import datetime

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Window

data = [(1234,datetime.strptime("2021-01-03 06:05:43", "%Y-%m-%d %H:%M:%S"),"Android",datetime.strptime("2021-01-03 06:05:43", "%Y-%m-%d %H:%M:%S"),987),
         (1234,datetime.strptime("2021-04-03 07:05:43", "%Y-%m-%d %H:%M:%S"),"Android",datetime.strptime("2021-10-03 06:05:43", "%Y-%m-%d %H:%M:%S"),986),
         (1234,datetime.strptime("2021-01-03 06:05:43", "%Y-%m-%d %H:%M:%S"),"Android",datetime.strptime("2021-01-03 06:05:43", "%Y-%m-%d %H:%M:%S"),987),
         (1234,datetime.strptime("2021-04-03 07:05:43", "%Y-%m-%d %H:%M:%S"),"Android",datetime.strptime("2021-01-03 06:05:43", "%Y-%m-%d %H:%M:%S"),987),
  ]

schema = StructType([
    StructField("applicantkey",IntegerType(),True),
    StructField("first_reg_date",TimestampType(),True),
    StructField("utmcontent",StringType(),True),
    StructField("latest_signin_date", TimestampType(), True),
    StructField("mobilenumber", IntegerType(), True),
  ])
 
df = spark.createDataFrame(data=data,schema=schema)

窗口逻辑

window_spec = Window.partitionBy("applicantkey")
df.withColumn("first_reg_date", F.min(F.col("first_reg_date")).over(window_spec)).withColumn("latest_signin_date", F.max(F.col("latest_signin_date")).over(window_spec)).show()

输出

+------------+-------------------+----------+-------------------+------------+
|applicantkey|     first_reg_date|utmcontent| latest_signin_date|mobilenumber|
+------------+-------------------+----------+-------------------+------------+
|        1234|2021-01-03 06:05:43|   Android|2021-10-03 06:05:43|         987|
|        1234|2021-01-03 06:05:43|   Android|2021-10-03 06:05:43|         986|
|        1234|2021-01-03 06:05:43|   Android|2021-10-03 06:05:43|         987|
|        1234|2021-01-03 06:05:43|   Android|2021-10-03 06:05:43|         987|
+------------+-------------------+----------+-------------------+------------+

PS。在将来提出问题时,请务必包含一个最低限度的工作示例。


推荐阅读