首页 > 解决方案 > 如何在火花数据框列中按值排序字典或元组?

问题描述

我需要按火花数据框中的值对字典进行降序排序。我尝试了许多不同的方法,包括下面未显示的方法。我发现了很多关于订购 python 字典的回复,但在我的情况下它们不起作用。

我试过 Ordered Dict 和 Sorted。

我对输出是字典并不挑剔,它也可以是 tuple

样本数据:

a = ["This is dummy data this dummy data is being used for word counts","See if this will work see if working not working", "Is this working is this working maybe it is maybe it isnt", "hopefully this works"]
b = [1,2,1,2]
df = sqlContext.createDataFrame(zip(b, a), schema=['id', 'text'])

我为准备数据而编写的代码:

def MostCommonWords(data):

  #agg text by id
  GroupedText = data.groupby("id").agg(F.concat_ws(", ", F.collect_list(data.text)).alias('aggText'))

  #tokenizing text to count in the next step
  tokenizer = Tokenizer(inputCol='aggText', outputCol='textTokenized')
  GroupedText = tokenizer.transform(GroupedText)

  #creating udf from counter function and applying udf to tokenized text
  CounterUDF = F.udf(lambda x: dict(Counter(x)), MapType(StringType(), IntegerType()))
  GroupedText = GroupedText.withColumn('WordFrequency', CounterUDF(F.col("textTokenized")))

  #Top 10 most frequent words for each id
  Nlargest_UDF = F.udf(lambda x: dict(heapq.nlargest(10, x.items(), key=itemgetter(1))), MapType(StringType(), IntegerType()))
  MostCommon = GroupedText.withColumn('MostCommon', Nlargest_UDF(F.col("WordFrequency")))

  MostCommon = MostCommon.select('id','MostCommon')
  return MostCommon

MostCommon = MostCommonWords(df)

我尝试对每一行中的字典进行排序的不同方式:

naming = collections.namedtuple('Word', 'Count')
#SorterUDF = F.udf(lambda x: sorted([naming(v,k) for (k,v) in x.items(), key=itemgetter(1)], MapType(StringType(), IntegerType(), reverse=True)))
#SorterUDF = F.udf(lambda x: {k: v for k, v in sorted(x.items(), key=itemgetter(1), reverse = True)})
#SorterUDF = F.udf(lambda x: dict(sorted(x.items(), key=itemgetter(1))), MapType(StringType(), IntegerType()))
SorterUDF = F.udf(lambda x: OrderedDict(sorted(x.items(), key=itemgetter(1))), MapType(StringType(), IntegerType()))
Sortedd = MostCommon.withColumn('SortedMostCommon', SorterUDF(F.col("MostCommon")))

我没有收到错误,只是没有排序。预期结果:每行中的字典或元组按值排序。

标签: pythondictionarypysparkapache-spark-sql

解决方案


Spark 没有订购MapType. 电流在内部MapType转换为dict类型。所以我想我们必须使用ArrayTypeofStructType来代替。

def sort_dict_f(x):
    sorted_x = sorted(x.items(), key=operator.itemgetter(1))
    return sorted_x

schema = ArrayType(StructType([
    StructField("word", StringType(), False), StructField("count", IntegerType(), False)
]))

SorterUDF = F.udf(sort_dict_f, schema)
df = MostCommon.withColumn('SortedMostCommon', SorterUDF("MostCommon"))
df.show()
print(df.take(1)[0]['SortedMostCommon'])

输出:

+---+--------------------+--------------------+
| id|          MostCommon|    SortedMostCommon|
+---+--------------------+--------------------+
|  1|[dummy -> 2, isnt...|[[isnt,, 1], [bei...|
|  2|[not -> 1, see ->...|[[will, 1], [work...|
+---+--------------------+--------------------+

[Row(word='isnt,', count=1), Row(word='being', count=1), Row(word='used', count=1), Row(word='working', count=2), Row(word='maybe', count=2), Row(word='it', count=2), Row(word='dummy', count=2), Row(word='data', count=2), Row(word='this', count=4), Row(word='is', count=5)]

您可以看到单词现在按其计数正确排序。


推荐阅读