首页 > 解决方案 > Spark Group By Key to (String, Iterable)

问题描述

我正在尝试按键对 urldata 进行分组,其中值是字符串

样本数据 :

url_3 url_2
url_3 url_2
url_3 url_1
url_4 url_3
url_4 url_1

预期结果:

(url_3,(url_2,url_1))
(url_4,(url_3,url_1))

1)加载urldata:

Dataset<String> lines = spark.read()
    .textFile("C:/Users/91984/workspace/myApp/src/test/resources/in/urldata.txt");

2)使用空间分割数据集

Encoder<Tuple2<String, String>> encoder2 = 
    Encoders.tuple(Encoders.STRING(), Encoders.STRING());
Dataset<Tuple2<String, String>> tupleRDD = lines.map(f->{
    Tuple2<String, String> m = 
        new Tuple2<String, String>(f.split(" ")[0], f.split(" ")[1]);
    return m;
},encoder2);

3) 使用 groupbyKey 将 tupleRDD 数据库按 key 分组

KeyValueGroupedDataset<String, Tuple2<String, String>> keygrpDS = 
    tupleRDD.groupByKey(f->f._1, Encoders.STRING());

有人可以向我解释为什么第 3 步的 groupByKey 会返回 KeyValueGroupedDataset<String, Tuple2<String, String>>而不是返回,KeyValueGroupedDataset<String, Iterable<String>>以及要进行哪些更改才能获得预期的结果。

标签: javaapache-sparkapache-spark-sql

解决方案


That's the way it works with datasets in spark. When you have a dataset of type Dataset<T>, you can group it by some mapping function that takes an object of type T and returns an object of type K (the key). What you get is a KeyValueGroupedDataset<K,T> on which you can call an aggregation function (See the javadoc). In your case, you could use mapGroups to which you can provide a function that maps a key K and an iterable Iterable<T> to a new object R of your choosing. If it helps, in your code, T is a Tuple2 and K a URL.


推荐阅读