apache-flink - 全局处理数据管道的异常
问题描述
我有 5 个不同任务的数据管道。如果任何任务中有任何异常,则将其移至错误 kafka 主题。是否有任何异常处理程序挂钩
解决方案
我建议使用 Flink 的侧输出功能来收集异常,然后将它们输出到 Kafka 主题。
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> task1 = ...;
SingleOutputStreamOperator<Integer> task2 = ...;
SingleOutputStreamOperator<Integer> task3 = ...;
DataStream<String> exceptions1 = task1.getSideOutput(outputTag);
DataStream<String> exceptions2 = task2.getSideOutput(outputTag);
DataStream<String> exceptions3 = task3.getSideOutput(outputTag);
DataStream<String> exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));
更新
您还可以将结果包装到类型Left
中并将异常包装Right
到Either
类型中。在管道结束时,您需要通过split/select
函数将流拆分为有效负载和异常。
DataStream<Either<Payload, Exception>> stage2 = stage1.flatMap(...);
DataStream<Either<Payload2, Exception>> stage3 = stage2.flatMap((Either<Payload, Exception> payload, Collector out) -> {
if (payload.isLeft()) {
out.collect(Left.of(map(payload.left)));
} else {
out.collect(Right.of(payload.right()));
}
});
SplitStream<Either<Payload2, Exception>> split = stage3.split((Either<Payload2, Exception> value) -> {
if (value.isLeft()) {
return Colletions.singleton("left");
} else {
return Collections.singleton("right");
}
});
DataStream<Either<Payload2, Exception>> payloads = split.select("left");
DataStream<Either<Payload2, Exception>> exceptions = split.select("right");
推荐阅读
- google-bigquery - 使用云控制台将 csv 文件加载到 BQ 表
- html - 如何在 CSS 伪元素之前/之后控制图像
- python-3.x - 未找到模块没有名为“安全”烧瓶的模块
- watchos - 在应用程序打开时在 watchOS 上检测新的一天
- pandas - 如何在 Pandas 中对面板数据集的行求和和平均?
- r - 如何使用闪亮和传单创建空间数据
- windows - SQLite 错误参数或其他 API 滥用
- python - Selenium TimeoutException 与 Flask 和 Python
- bash - macOS终端配置问题,设置颜色和git分支信息
- sql - Rails:如何使用 has_many 相关表进行高级查询?