apache-flink - 在apache flink中加入两个数据集后如何保留排序顺序?
问题描述
我正在使用 apache flink 进行批处理,我有一个排序的 10 元素数据集,其中包含电影 id 及其评级,它看起来像 (movie_id, rating)
(318,4.429022082018927)
(858,4.2890625)
(2959,4.272935779816514)
(1276,4.271929824561403)
(750,4.268041237113402)
(904,4.261904761904762)
(1221,4.25968992248062)
(48516,4.252336448598131)
(1213,4.25)
(912,4.24)
我必须将此数据集与具有电影 ID 和电影名称的电影数据集连接起来
(1285,Heathers (1989))
(1286,Somewhere in Time (1980))
(1287,Ben-Hur (1959))
(1288,This Is Spinal Tap (1984))
我用来加入的代码是
movies.join(sorted) // sorted represents sorted ratings dataset
.where(0) // joining movie_id of movies to movie_id of ratings
.equalTo(0)
.with(new JoinFunction<Tuple2<Long, String>, Tuple2<Long, Double>, Tuple3<Long, String, Double>>() {
@Override
public Tuple3<Long, String, Double> join(
Tuple2<Long, String> movie,
Tuple2<Long, Double> rating) throws Exception {
return new Tuple3<>(movie.f0, movie.f1, rating.f1);
}
}).print();
//movies row : movie_id, movie name
//sorted row: movie_id, rating
当我打印这个数据集时,我的输出是随机的,比如,
(904,Rear Window (1954),4.261904761904762)
(318,Shawshank Redemption, The (1994),4.429022082018927)
(48516,Departed, The (2006),4.252336448598131)
(858,Godfather, The (1972),4.2890625)
(2959,Fight Club (1999),4.272935779816514)
(912,Casablanca (1942),4.24)
(1276,Cool Hand Luke (1967),4.271929824561403)
在这里,我希望电影 Shawshank Redemption id 318 根据评级的降序排列在 Rear Window id 904 之上。我正在在线学习一门课程,数据集可在此处获得。谁能帮我纠正我的逻辑?
解决方案
推荐阅读
- python - 如何将数据框导出到不带引号的 csv?
- wireguard - 在 macOS 中启动时,wireguard 停止连接到互联网
- javascript - 为什么在使用书签 API 制作 chrome 扩展时出现错误?
- mysql - 无法理解连接表 SQL
- laravel - 用于搜索的 Laravel 动态路由
- julia - 当运行时类型是 Julia 而不是 Python 时,如何将我的 Google 驱动器挂载到 Colab?
- c - 使用 scanf 和 isdigit 进行 int 验证
- python - Python将条件数组转换为数据框
- ios - 处理备用 API 响应类型 (typeMismatch)
- python-3.x - 通过将前缀附加到文件名来重命名