首页 > 解决方案 > 使用 spark.ml 库的 N-gram 计数和唯一值

问题描述

我在这里尝试做的是使用此处提供的代码计算 N-gram Stack Overflow Answer for N-gram

以下数据为测试数据,实际计算将基于大型分布式数据

+--------------+
|        author|
+--------------+
|Test Data Five|
|Test Data Five|
|Data Test Five|
|Test data Five|
|Test Data Five|
|          Jack|
+--------------+

from pyspark.ml.feature import NGram
from pyspark.ml import Pipeline
from pyspark.sql import functions as F

def build_ngrams(name,n=3):
    ngrams = [
        NGram(n=i, inputCol=name, outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    return Pipeline(stages=ngrams)

temp_kdf = author_df.withColumn("author", F.split("author", "\s+"))
temp_kdf = temp_kdf.groupby().agg(F.collect_list('author').alias('author'))
data = temp_kdf.select(F.flatten(temp_kdf.author).alias('author'))
temp_kdf = build_ngrams('author).fit(data).transform(data)

我得到的结果如下

+--------------------+
|             2_grams|
+--------------------+
|[Test Data, Data Five, Five Test, Test Data, Data Five, Five Data, Data Test, Test Five, Five Test, Test data, data Five, Five Test, Test Data, Data Five, Five Jack]|
+--------------------+

我想要的结果是“n_gram”中的前“N”行,它们的频率计数如下

+---------+--------+
|  1_grams|1_counts|
+---------+--------+
|Test Data|       3|
|Data Five|       3|
|Five Test|       3|
|Five Data|       1|
+---------+--------+

标签: pysparkapache-spark-mllibn-gram

解决方案


temp_data
    .select(col)
    .rdd
    .flatMap(lambda doc: [(x, 1) for x in doc[0]])
    .reduceByKey(Lambda x, y: x + y)

有人可以提供比这更好的优化解决方案吗


推荐阅读