首页 > 解决方案 > 在apache flink中加入两个数据集后如何保留排序顺序?

问题描述

我正在使用 apache flink 进行批处理,我有一个排序的 10 元素数据集,其中包含电影 id 及其评级,它看起来像 (movie_id, rating)

(318,4.429022082018927)
(858,4.2890625)
(2959,4.272935779816514)
(1276,4.271929824561403)
(750,4.268041237113402)
(904,4.261904761904762)
(1221,4.25968992248062)
(48516,4.252336448598131)
(1213,4.25)
(912,4.24)

我必须将此数据集与具有电影 ID 和电影名称的电影数据集连接起来

(1285,Heathers (1989))
(1286,Somewhere in Time (1980))
(1287,Ben-Hur (1959))
(1288,This Is Spinal Tap (1984))

我用来加入的代码是

movies.join(sorted)  // sorted represents sorted ratings dataset
                .where(0) // joining movie_id of movies to movie_id of ratings
                .equalTo(0)
                .with(new JoinFunction<Tuple2<Long, String>, Tuple2<Long, Double>, Tuple3<Long, String, Double>>() {
                    @Override
                    public Tuple3<Long, String, Double> join(
                            Tuple2<Long, String> movie,
                            Tuple2<Long, Double> rating) throws Exception {
                        return new Tuple3<>(movie.f0, movie.f1, rating.f1);
                    }
                }).print();
//movies row : movie_id, movie name
//sorted row: movie_id, rating

当我打印这个数据集时,我的输出是随机的,比如,

(904,Rear Window (1954),4.261904761904762)
(318,Shawshank Redemption, The (1994),4.429022082018927)
(48516,Departed, The (2006),4.252336448598131)
(858,Godfather, The (1972),4.2890625)
(2959,Fight Club (1999),4.272935779816514)
(912,Casablanca (1942),4.24)
(1276,Cool Hand Luke (1967),4.271929824561403)

在这里,我希望电影 Shawshank Redemption id 318 根据评级的降序排列在 Rear Window id 904 之上。我正在在线学习一门课程,数据集可在此处获得。谁能帮我纠正我的逻辑?

标签: apache-flink

解决方案


推荐阅读