apache-flink - KeyBy 不会为不同的键创建不同的键控流
问题描述
我正在读取一个简单的 JSON 字符串作为输入,并基于两个字段A
和B
. 但是 KeyBy 正在为不同的值生成相同的键控流,但对于和B
的特定组合。A
B
输入:
{
"A": "352580084349898",
"B": "1546559127",
"C": "A"
}
这是我的 Flink 代码的核心逻辑:
DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
.map(new MapFunction<String, GenericDataObject>() {
@Override
public GenericDataObject map(String s) throws Exception {
JSONObject jsonObject = new JSONObject(s);
GenericDataObject genericDataObject = new GenericDataObject();
genericDataObject.setA(jsonObject.getString("A"));
genericDataObject.setB(jsonObject.getString("B"));
genericDataObject.setC(jsonObject.getString("C"));
return genericDataObject;
}
});
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
.keyBy("A", "B")
.map(new MapFunction<GenericDataObject, GenericDataObject>() {
@Override
public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {
return genericDataObject;
}
});
testStream.print();
GenericDataObject 是一个具有三个字段的A
POJOB
和C
.
这是不同字段值的控制台输出B
。
5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}
5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}
4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}
3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}
注意第 1 行和第 2 行。即使它们具有不同的 B 值,它们也被放入同一个键控流 (5)。我必须在这里做一些根本错误的事情,有人可以指出我正确的方向吗?
解决方案
首先,你没有做错任何事。
为什么他们在同一个子任务中?
假设您有数千个键,Apache Flink 不可能为每个键创建数千个线程。因此,必须有另一种机制来确保一组键在一个线程中单独处理。
因此,在 Apache Flink 中,每个子任务都有自己的键组,具有相同键组索引的不同键将在同一个子任务中处理。并且一个子任务通常处理几个具有单独键控状态的键,以保持不同键的状态分开。
keyBy 并不是说将不同的key分配给不同的子任务(或分区),而是将所有具有相同key的记录分配给同一个子任务。因此,您只能通过编写 KeySelector 实例来决定不同的键是否在同一组中。
更多详细信息,您可以查看 Apache Flink 官网中的这篇文章。
推荐阅读
- lisp - Portacle - 为什么当我访问文件时智能感知功能停止工作(它在 repl 中工作)
- dataframe - pyspark:删除重复值大于一个值
- javascript - 尝试开发 Chrome 扩展以自由更改 TimeZone,但它不起作用
- mongodb - 如何在 mongo db 中交换数组中的对象
- javascript - 树莓派 3 上的 Node.js 服务器未在 Web 浏览器中加载
- docker - 在远程 docker swarm 集群上运行手动 GitLab CI 作业时遇到问题
- python - 如何在 Kivy 中获取大小提示以应用于布局元素而不是窗口
- java - 一个 .html 文件中的所有 Thymeleaf 片段,选择一个使用
- javascript - HTML导航栏向下推送页面内容
- python - 如何使用 Selenium 存储有关滚动时动态添加的 div 的信息?