apache-flink - 将自定义动态分区器从 Flink 1.7 迁移到 Flink 1.9
问题描述
我正在尝试将自定义动态分区器从 Flink 1.7 迁移到 Flink 1.9。原始分区器selectChannels
在接口中实现了该方法,StreamPartitioner
如下所示:
// Original: working for Flink 1.7
//@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> streamRecordSerializationDelegate,
int numberOfOutputChannels) {
T value = streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
// send to all channels
int[] channels = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; ++i) {
channels[i] = i;
}
return channels;
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfOutputChannels);
} else {
returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
}
return returnChannels;
}
我不确定如何将其迁移到 Flink 1.9,因为StreamPartitioner
界面已更改,如下图所示:
// New: required by Flink 1.9
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> streamRecordSerializationDelegate) {
T value = streamRecordSerializationDelegate.getInstance().getValue();
if (value.f0.isBroadCastPartitioning()) {
/*
It is illegal to call this method for broadcast channel selectors and this method can remain not
implemented in that case (for example by throwing UnsupportedOperationException).
*/
} else if (value.f0.getPartitionKey() == -1) {
// random partition
returnChannels[0] = random.nextInt(numberOfChannels);
} else {
returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
}
//return returnChannels;
return returnChannels[0];
}
请注意,selectChannels
已替换为selectChannel
. 因此,对于广播元素的情况,不再可能像上面最初所做的那样返回多个输出通道。事实上,selectChannel
不应该针对这种特殊情况调用。关于如何解决这个问题的任何想法?
解决方案
使用 Flink 1.9,您不能再动态广播到所有频道。您StreamPartitioner
必须静态指定它是否是带有isBroadcast
. 然后,selectChannel
永远不会被调用。
您是否有需要动态切换的特定用例?
推荐阅读
- mysql - MySQL,SP 中的外键约束失败,但手动执行时没有
- c++ - 就地文件编辑
- postgresql - 如何使用 windows 或 linux 系统中的数据备份 postgresql 数据库?
- c# - 计算另一个二维平面的对应点
- jquery - 使用ajax将数据传递给django视图
- javascript - 在 html 文件中调用 Angular 函数时会执行两次
- google-apps-script - 从 SQL 数据库中检索数据到 Google 表格
- regex - 如何在此 RegEx 中允许使用连字符
- javascript - React Typescript prop 工作正常,但返回错误
- bash - BASH - 字符串输出不是我所期望的