首页 > 解决方案 > 将自定义动态分区器从 Flink 1.7 迁移到 Flink 1.9

问题描述

我正在尝试将自定义动态分区器从 Flink 1.7 迁移到 Flink 1.9。原始分区器selectChannels在接口中实现了该方法,StreamPartitioner如下所示:

    // Original: working for Flink 1.7
    //@Override
    public int[] selectChannels(SerializationDelegate<StreamRecord<T>> streamRecordSerializationDelegate,
                                int numberOfOutputChannels) {
        T value = streamRecordSerializationDelegate.getInstance().getValue();
        if (value.f0.isBroadCastPartitioning()) {
            // send to all channels
            int[] channels = new int[numberOfOutputChannels];
            for (int i = 0; i < numberOfOutputChannels; ++i) {
                channels[i] = i;
            }
            return channels;
        } else if (value.f0.getPartitionKey() == -1) {
            // random partition
            returnChannels[0] = random.nextInt(numberOfOutputChannels);
        } else {
            returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfOutputChannels);
        }
        return returnChannels;
    }

我不确定如何将其迁移到 Flink 1.9,因为StreamPartitioner界面已更改,如下图所示:

    // New: required by Flink 1.9
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> streamRecordSerializationDelegate) {
        T value = streamRecordSerializationDelegate.getInstance().getValue();
        if (value.f0.isBroadCastPartitioning()) {
            /* 
            It is illegal to call this method for broadcast channel selectors and this method can remain not 
            implemented in that case (for example by throwing UnsupportedOperationException).
            */
        } else if (value.f0.getPartitionKey() == -1) {
            // random partition
            returnChannels[0] = random.nextInt(numberOfChannels);
        } else {
            returnChannels[0] = partitioner.partition(value.f0.getPartitionKey(), numberOfChannels);
        }
        //return returnChannels;
        return returnChannels[0];
    }

请注意,selectChannels已替换为selectChannel. 因此,对于广播元素的情况,不再可能像上面最初所做的那样返回多个输出通道。事实上,selectChannel不应该针对这种特殊情况调用。关于如何解决这个问题的任何想法?

标签: apache-flink

解决方案


使用 Flink 1.9,您不能再动态广播到所有频道。您StreamPartitioner必须静态指定它是否是带有isBroadcast. 然后,selectChannel永远不会被调用。

您是否有需要动态切换的特定用例?


推荐阅读