apache-kafka - Flink 流未完成
问题描述
我正在使用 kafka 和 elasticsearch 设置一个 flink 流处理器。我想重播我的数据,但是当我将并行度设置为大于 1 时,它没有完成程序我相信这是因为 kafka 流只看到一条消息被识别为流的结尾。
public CustomSchema(Date _endTime) {
endTime = _endTime;
}
@Override
public boolean isEndOfStream(CustomTopicWrapper nextElement) {
if (this.endTime != null && nextElement.messageTime.getTime() >= this.endTime.getTime()) {
return true;
}
return false;
}
有没有办法告诉 flink 消费者组上的所有线程在一个线程完成后结束?
解决方案
如果您实现了自己的 SourceFunction ,请使用Flink SourceFunctioncancel
中显示的此示例中的方法。FlinkKafkaConsumerBase类也有 cancel 方法。
推荐阅读
- swift - 文档类型关联在应用程序未运行时崩溃
- java - 在 Java 中的线程之间共享数据
- javascript - 为什么我不能再点击我的标记来拉出谷歌地图中的信息窗口了?
- node.js - 如何运行“npm install”为较旧的 Angular 项目创建 node_modules 文件夹?
- bash - 从日志文件中读取的 AWK 脚本
- php - 在地址和用户之间建立关系。在 Laravel 的视图上返回它
- php - 如何在 CakePHP 3.6 中显示验证错误?
- python - 如何在 SBML 中为基因添加注释?
- java - 如何在显示文本为动态的网页上定位元素
- bigcommerce - bigcommerce 模板展示品牌形象