首页 > 解决方案 > 列中的 dask 计数/频率项

问题描述

我有一个非常大的数据集(> 1000 万行)。下面显示了一个小的 5 行示例 - 我可以让 Pandas 在有术语列表的列中对某些给定术语进行计数。对于运行 Pandas 的单核机器,一切都很好。我得到了预期的结果(10 行)。但是,在同一个小数据集(我在这里展示)上 - 有 5 行,当用 Dask 进行实验时,会计算出超过 10 行(基于分区数)。这是代码。如果有人能指导我误解/出错的地方,我将不胜感激。

熊猫实施:

def compute_total(df, term_list, cap_list):
    terms_counter = Counter(chain.from_iterable(df['Terms']))
    terms_series = pd.Series(terms_counter)
    terms_df = pd.DataFrame({'Term': terms_series.index, 'Count': terms_series.values})
    df1 = terms_df[terms_df['Term'].isin(term_list)]
    product_terms = product(term_list, cap_list)
    df_cp = pd.DataFrame(product_terms, columns=['Terms', 'Capability'])
    tjt_df = df_cp.set_index('Terms').combine_first(df1.set_index('Term')).reset_index()
    tjt_df.rename(columns={'index': 'Term'}, inplace=True)
    tjt_df['Count'] = tjt_df['Count'].fillna(0.0)  # convert all NaN to 0.0
    return tjt_df


d = {'Title': {0: 'IRC do consider this.',
               1: 'we’re simply taking screenshot',
               2: 'Why does irc select topics?',
               3: 'Is this really a screenshot?',
               4: 'how irc is doing this?'},
     'Terms': {0: ['tech', 'channel', 'tech'],
               1: ['channel', 'findwindow', 'Italy', 'findwindow'],
               2: ['Detroit', 'topic', 'seats', 'topic'],
               3: ['tech', 'topic', 'printwindow', 'Boston', 'window'],
               4: ['privmsg', 'wheel', 'privmsg']}}

df = pd.DataFrame.from_dict(d)
term_list = ['channel', 'topic', 'findwindow', 'printwindow', 'privmsg']
cap_list = ['irc', 'screenshot']

熊猫输出:

          Term  Capability  Count
0      channel         irc  2.0
1      channel  screenshot  2.0
2   findwindow         irc  2.0
3   findwindow  screenshot  2.0
4  printwindow         irc  1.0
5  printwindow  screenshot  1.0
6      privmsg         irc  2.0
7      privmsg  screenshot  2.0
8        topic         irc  3.0
9        topic  screenshot  3.0

达斯克实施:

注意:对于 npartition,我尝试了 num_cores = 1,得到了预期的结果。如果我将 num_cores 更改为大于 1 的任何值,我会得到我不理解的结果。例如:当 num_cores = 2 时,生成的 df 有 20 行(好的......我明白了)。当 num_cores = 3 或 4 时,我仍然得到 20 行。当 num_cores = 5...16 时,我得到 40 行!没有尝试更多...

num_cores = 8
ddf = dd.from_pandas(df, npartitions=num_cores * 1)
meta = make_meta({'Term': 'U', 'Capability': 'U', 'Count': 'i8'}, index=pd.Index([], 'i8'))
count_df = ddf.map_partitions(compute_total, term_list, cap_list, meta=meta).compute(scheduler='processes')
print(count_df)
print(count_df.shape)

仪表输出:

          Term  Capability  Count
0      channel         irc    1.0
1      channel  screenshot    1.0
2   findwindow         irc    0.0
3   findwindow  screenshot    0.0
4  printwindow         irc    0.0
5  printwindow  screenshot    0.0
6      privmsg         irc    0.0
7      privmsg  screenshot    0.0
8        topic         irc    0.0
9        topic  screenshot    0.0
0      channel         irc    1.0
1      channel  screenshot    1.0
2   findwindow         irc    2.0
3   findwindow  screenshot    2.0
4  printwindow         irc    0.0
5  printwindow  screenshot    0.0
6      privmsg         irc    0.0
7      privmsg  screenshot    0.0
8        topic         irc    0.0
9        topic  screenshot    0.0
0      channel         irc    0.0
1      channel  screenshot    0.0
2   findwindow         irc    0.0
3   findwindow  screenshot    0.0
4  printwindow         irc    0.0
5  printwindow  screenshot    0.0
6      privmsg         irc    0.0
7      privmsg  screenshot    0.0
8        topic         irc    2.0
9        topic  screenshot    2.0
0      channel         irc    0.0
1      channel  screenshot    0.0
2   findwindow         irc    0.0
3   findwindow  screenshot    0.0
4  printwindow         irc    1.0
5  printwindow  screenshot    1.0
6      privmsg         irc    2.0
7      privmsg  screenshot    2.0
8        topic         irc    1.0
9        topic  screenshot    1.0
(40, 3)

观察:在查看了这个相当长的结果数据框之后,我想我可以对其进行最后一次计算以获得我想要的。只是 groupby 术语和能力和总和。我会得到预期的结果(有点)。

df1 = df.groupby(['Term', 'Capability'])['Count'].sum()

但是,想知道这是否可以使用 Dask 以干净的方式完成。我知道这个问题不是一个“令人尴尬的并行”问题 - 意思是,需要对整个数据集有一个全局视图才能获得计数。所以,必须以我现在正在做的“地图->减少”方式处理它。有没有更清洁的方法?

标签: pythonpandasdask

解决方案


推荐阅读