首页 > 解决方案 > pandas 和 pyspark 中 groupBy 函数的结果不一致

问题描述

我目前正在将我的脚本从 pandas 迁移到 pyspark。我从groupBypandas 和 pyspark 中的一个简单函数得到的结果不一致,我很困惑。如果有人可以帮助我,我将不胜感激。

我的面板数据集看起来像

RIC     date     hour   minute     volume
VOD      01-01    9       11        55    
VOD      01-01    9       11        55
VOD      01-01    10      12        79
VOD      01-01    10      13        55
VOD      01-01    10      15        245  
VOD      01-01    11      14        356
VOD      01-02    11      15        6798
...       ...     ...     ...       ...
BAT      01-01    9       11        556   
BAT      01-02    9       12        552   
BAT      01-02    9       14        551   
...       ...     ...     ....     ...  

在 pandas 中,我使用以下代码获取每只股票每分钟的总交易量

    df=pd.read_csv(r'/home/user/stock.csv', parse_dates=[1])
    df_volume=df.groupby(['RIC','date','hour','minute']).sum().reset_index()
    df_volume.head(5)

然后我得到了正确的输出

RIC     date     hour   minute     volume
VOD      01-01    9       11        110    
VOD      01-01    10      12        79
VOD      01-01    10      13        55
...       ...     ...     ....     ...  

但是,当我在 spark 中编码时,我使用了以下内容

    df=spark.read.format('csv').option('header','true').load('/home/user/stock.csv')
    df.printSchema()
    根
      |-- RIC:字符串(可为空=真)
      |-- 日期:日期(可为空=真)  
      |-- 小时:浮点数(可为空=真)
      |-- 分钟:浮动(可为空=真)  
      |-- 音量:浮动(可为空=真)

然后我编码

    from pyspark.sql.functions import countDistinct, avg,stddev,count, sum
    df_volume=df.groupBy(['RIC','date','hour','minute']).agg(sum(volume))
    df_volume.orderBy(['RIC','date','hour','minute'],ascending=[True,True,True])
    df_volume.show()

然后我得到了不正确的输出

+----+--------+-------+----------+----------+
 RIC   date     hour    minute     volume
+----+--------+-------+----------+----------+
 VOD | 01-02  |  10   |   13     |   355    |
 VOD | 01-03  |  14   |   03     |   357    |
 VOD | 01-05  |  15   |   45     |   683    | 
 ...    ...     ...     ....     ... 

这次在火花输出中缺少一些观察结果。我想我编码的一切都是正确的。有人可以帮忙吗?谢谢

标签: pandasapache-sparkpysparkgroup-bypandas-groupby

解决方案


我可以帮助你,但需要知道你已经: - 完成计数以检查两个 dfs 中的行数 - 检查整个 df 的缺失值,例如按缺失值过滤以查看它是否是在 df 中,而不仅仅是检查 head()。


推荐阅读