首页 > 解决方案 > 将 SQL 代码转换为 PySpark 的问题;我在哪里用 groupby 和 count 创建一个新的 DF

问题描述

我有下面的 sql 代码,我正在尝试将其转换为 PySpark(下面的代码),任何将 sql 逻辑转换为 PySpark 的最佳方式的输入将不胜感激;

SQL 代码: %sql

drop table if exists jrny_map_web_sum;
create table jrny_map_web_sum as select mid_key, completedt, interaction_dt, interaction_type, count(distinct(visit_identifier)) as visits, count(*) as pvs
from web_drvsjoin1
group by mid_key, completedt, interaction_dt, interaction_type;

样品 O/P: 在此处输入图像描述

PySpark 代码:

jrny_map_web_sum1 = web_drvsjoin1.select("mid_key","completedt", "interaction_dt", "interaction_type", "visit_identifier").groupBy("mid_key", "completedt", "interaction_dt", "interaction_type").agg(countDistinct("visit_identifier").alias("Visits"))

标签: python-3.xpysparkdatabricks

解决方案


试试下面的代码:

from pyspark.sql.functions import *

jrny_map_web_sum1 = web_drvsjoin1.\
select("mid_key","completedt", "interaction_dt", "interaction_type", "visit_identifier").\
groupBy("mid_key", "completedt", "interaction_dt", "interaction_type").\
agg(countDistinct(col("visit_identifier")).alias("visits"),count("*").alias("pvs"))

#saving dataframe as table
jrny_map_web_sum1.saveAsTable("<db>.jrny_map_web_sum")

推荐阅读