python - 如何在火花数据框列中按值排序字典或元组?
问题描述
我需要按火花数据框中的值对字典进行降序排序。我尝试了许多不同的方法,包括下面未显示的方法。我发现了很多关于订购 python 字典的回复,但在我的情况下它们不起作用。
我试过 Ordered Dict 和 Sorted。
我对输出是字典并不挑剔,它也可以是 tuple。
样本数据:
a = ["This is dummy data this dummy data is being used for word counts","See if this will work see if working not working", "Is this working is this working maybe it is maybe it isnt", "hopefully this works"]
b = [1,2,1,2]
df = sqlContext.createDataFrame(zip(b, a), schema=['id', 'text'])
我为准备数据而编写的代码:
def MostCommonWords(data):
#agg text by id
GroupedText = data.groupby("id").agg(F.concat_ws(", ", F.collect_list(data.text)).alias('aggText'))
#tokenizing text to count in the next step
tokenizer = Tokenizer(inputCol='aggText', outputCol='textTokenized')
GroupedText = tokenizer.transform(GroupedText)
#creating udf from counter function and applying udf to tokenized text
CounterUDF = F.udf(lambda x: dict(Counter(x)), MapType(StringType(), IntegerType()))
GroupedText = GroupedText.withColumn('WordFrequency', CounterUDF(F.col("textTokenized")))
#Top 10 most frequent words for each id
Nlargest_UDF = F.udf(lambda x: dict(heapq.nlargest(10, x.items(), key=itemgetter(1))), MapType(StringType(), IntegerType()))
MostCommon = GroupedText.withColumn('MostCommon', Nlargest_UDF(F.col("WordFrequency")))
MostCommon = MostCommon.select('id','MostCommon')
return MostCommon
MostCommon = MostCommonWords(df)
我尝试对每一行中的字典进行排序的不同方式:
naming = collections.namedtuple('Word', 'Count')
#SorterUDF = F.udf(lambda x: sorted([naming(v,k) for (k,v) in x.items(), key=itemgetter(1)], MapType(StringType(), IntegerType(), reverse=True)))
#SorterUDF = F.udf(lambda x: {k: v for k, v in sorted(x.items(), key=itemgetter(1), reverse = True)})
#SorterUDF = F.udf(lambda x: dict(sorted(x.items(), key=itemgetter(1))), MapType(StringType(), IntegerType()))
SorterUDF = F.udf(lambda x: OrderedDict(sorted(x.items(), key=itemgetter(1))), MapType(StringType(), IntegerType()))
Sortedd = MostCommon.withColumn('SortedMostCommon', SorterUDF(F.col("MostCommon")))
我没有收到错误,只是没有排序。预期结果:每行中的字典或元组按值排序。
解决方案
Spark 没有订购MapType
. 电流在内部MapType
转换为dict
类型。所以我想我们必须使用ArrayType
ofStructType
来代替。
def sort_dict_f(x):
sorted_x = sorted(x.items(), key=operator.itemgetter(1))
return sorted_x
schema = ArrayType(StructType([
StructField("word", StringType(), False), StructField("count", IntegerType(), False)
]))
SorterUDF = F.udf(sort_dict_f, schema)
df = MostCommon.withColumn('SortedMostCommon', SorterUDF("MostCommon"))
df.show()
print(df.take(1)[0]['SortedMostCommon'])
输出:
+---+--------------------+--------------------+
| id| MostCommon| SortedMostCommon|
+---+--------------------+--------------------+
| 1|[dummy -> 2, isnt...|[[isnt,, 1], [bei...|
| 2|[not -> 1, see ->...|[[will, 1], [work...|
+---+--------------------+--------------------+
[Row(word='isnt,', count=1), Row(word='being', count=1), Row(word='used', count=1), Row(word='working', count=2), Row(word='maybe', count=2), Row(word='it', count=2), Row(word='dummy', count=2), Row(word='data', count=2), Row(word='this', count=4), Row(word='is', count=5)]
您可以看到单词现在按其计数正确排序。
推荐阅读
- css - 在 React 中使用 SASS 子类
- flutter - 无法显示网络图像
- pine-script - 变量似乎保持不变
- windows - 在 Windows 服务启动后运行命令行
- c++ - 使用 WASAPI 录制音频流
- python - How to load HTML file with external link to CSS file with wxPython?
- sql - MySQL 错误 1064:您的 SQL 语法工作台 8.0 中有错误
- javascript - 如何自定义嵌入式推特推文的高度和宽度
- oracle-sqldeveloper - 为什么Oracle SQL Developer 打开一个大脚本时那么慢?
- python - 为什么 networkx 的 read_edgelist 不接受数组/列表作为权重?