首页 > 解决方案 > Python Spark 键的元组值平均值

问题描述

我正在尝试按段落查找单词的平均长度。数据从文本文件中提取,格式为 1|超过五年... 其中每一行都有一个段落编号。

到目前为止,这是我的代码:

from pyspark import SparkContext, SparkConf
sc = SparkContext('local', 'longest')
text = sc.textFile("walden.txt")
lines = text.map(lambda line: (line.split("|")[0],line))
lines = lines.filter(lambda kv: len(kv[1]) > 0)
words = lines.mapValues(lambda x: x.replace("1|","").replace("2|","").replace("3|",""))
words = words.mapValues(lambda x: x.split())
words = words.mapValues(lambda x: [(len(i),1) for i in x])
words = words.reduceByKey(lambda a,b: a+b)
words.saveAsTextFile("results")

当前输出遵循以下格式:

('1', [(2,1),(6,1),(1,1)..etc)]),('2', [(2,1),(6,1),(1,1)..etc)]),('3', [(2,1),(6,1),(1,1)..etc)])

其中 '1'/'2'/'3' 是段落 ID,元组遵循 (word length, 1) 格式。

我需要做的是对元组的值求和(通过键/段落 ID),使 (2,1),(6,1),(1,1) 变为 (9,3),然后将这些值相除( 9/3) 找出每个段落中单词的平均长度。

我尝试了很多不同的东西,但无法让它发挥作用。非常感谢您的帮助!

标签: pythonpysparkrdd

解决方案


对于您的 rdd 案例,试试这个。

text = sc.textFile("test.txt")
lines = text.map(lambda line: (line.split("|")[0],line))
lines = lines.filter(lambda kv: len(kv[1]) > 0)
words = lines.mapValues(lambda x: x.replace("1|","").replace("2|","").replace("3|",""))
words = words.mapValues(lambda x: x.split())
words = words.mapValues(lambda x: [len(i) for i in x])
words = words.mapValues(lambda x: sum(x) / len(x))
words.collect()

[('1', 4.0), ('2', 5.4), ('3', 7.0)]

我使用数据框并得到了这个。

import pyspark.sql.functions as f

df = spark.read.option("inferSchema","true").option("sep","|").csv("test.txt").toDF("col1", "col2")
df.show(10, False)

+----+---------------------------------------+
|col1|col2                                   |
+----+---------------------------------------+
|1   |For more than five years               |
|2   |For moasdre than five asdfyears        |
|3   |Fasdfor more thasdfan fidafve yearasdfs|
+----+---------------------------------------+

df.withColumn('array', f.split('col2', r'[ ][ ]*')) \
  .withColumn('count_arr', f.expr('transform(array, x -> LENGTH(x))')) \
  .withColumn('sum', f.expr('aggregate(array, 0, (sum, x) -> sum + LENGTH(x))')) \
  .withColumn('size', f.size('array')) \
  .withColumn('avg', f.col('sum') / f.col('size')) \
  .show(10, False)

+----+---------------------------------------+---------------------------------------------+---------------+---+----+---+
|col1|col2                                   |array                                        |count_arr      |sum|size|avg|
+----+---------------------------------------+---------------------------------------------+---------------+---+----+---+
|1   |For more than five years               |[For, more, than, five, years]               |[3, 4, 4, 4, 5]|20 |5   |4.0|
|2   |For moasdre than five asdfyears        |[For, moasdre, than, five, asdfyears]        |[3, 7, 4, 4, 9]|27 |5   |5.4|
|3   |Fasdfor more thasdfan fidafve yearasdfs|[Fasdfor, more, thasdfan, fidafve, yearasdfs]|[7, 4, 8, 7, 9]|35 |5   |7.0|
+----+---------------------------------------+---------------------------------------------+---------------+---+----+---+

我知道这确实是不同的方法,但会有所帮助。


推荐阅读