首页 > 解决方案 > 带有 pyspark 的字计数器

问题描述

我有一个 pyspark 数据框,其中包含三列 user_id、follower_count 和 tweet,其中 tweet 是字符串类型。

首先,我需要执行以下预处理步骤: - 小写所有文本 - 删除标点符号(和任何其他非 ascii 字符) - 标记单词(由 ' ' 分割)

然后我需要在所有推文值中汇总这些结果: - 查找每个单词出现的次数 - 按频率排序 - 提取前 n 个单词及其各自的计数

我在 GitHub 上找到了以下资源 wordcount.py;但是,我不明白代码在做什么;因此,我在笔记本中调整它时遇到了一些困难。

https://github.com/apache/spark/blob/master/examples/src/main/python/wordcount.py

lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))

编辑1:我认为我没有明确表示我正在尝试将此分析应用于推文专栏。

编辑 2:我更改了上面的代码,插入 df.tweet 作为传递给第一行代码的参数并触发了错误。所以我想列不能传递到这个工作流程中;而且我不确定如何解决这个问题。

TypeError: Column is not iterable

我按照建议添加了一些调整。奇迹般有效!我不知道我可以将用户定义的函数发送到 lambda 函数中。事实证明,这是将此步骤添加到工作流程中的一种简单方法。

import re
def process_text(text):
  text = text.lower()
  text = re.sub(pattern='[^A-z ^\s]',repl='',string=text).split(' ')
  return [word for word in text if word != '']

process_text('hi 343')
>>>> ['hi']

count_rdd = df.select("tweet").rdd.flatMap(lambda x: process_text(x[0])) \
              .map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)

count_rdd.collect()

标签: pythonpyspark

解决方案


不确定错误是由于for (word, count) in output:列上的 RDD 操作还是由于 RDD 操作。

但是,您可以简单地使用:

对于 RDD 风格:

count_rdd = df.select("tweets").rdd.flatMap(lambda x: x[0].split(' ')) \
              .map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)

您正在尝试做的是对pyspark.sql.column.Column对象的 RDD 操作。以上是列中所有单词的简单字数。

如果您想在列本身上使用它,可以使用explode()执行此操作:

对于列样式:

import pyspark.sql.functions as F

count_df = df.withColumn('word', F.explode(F.split(F.col('tweets'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)

您将能够使用regexp_replace()lower()frompyspark.sql.functions来执行预处理步骤。


推荐阅读