首页 > 解决方案 > Flink 流未完成

问题描述

我正在使用 kafka 和 elasticsearch 设置一个 flink 流处理器。我想重播我的数据,但是当我将并行度设置为大于 1 时,它没有完成程序我相信这是因为 kafka 流只看到一条消息被识别为流的结尾。


    public CustomSchema(Date _endTime) {
        endTime = _endTime;
    }

@Override
    public boolean isEndOfStream(CustomTopicWrapper nextElement) {
        if (this.endTime != null && nextElement.messageTime.getTime() >= this.endTime.getTime()) {
            return true;
        }
        return false;
    }

有没有办法告诉 flink 消费者组上的所有线程在一个线程完成后结束?

标签: apache-kafkaapache-flink

解决方案


如果您实现了自己的 SourceFunction ,请使用Flink SourceFunctioncancel中显示的此示例中的方法。FlinkKafkaConsumerBase类也有 cancel 方法。


推荐阅读