exception - 拓扑发生异常时跳过记录
问题描述
我们正在编写一个 Kafka Streams Topology 来聚合数据并实时显示它们。我们希望使显示尽可能健壮 - 理想情况下记录记录并继续出现任何异常。
根据文档,我们的一些测试和
- 处理 Kafka 流中的异常
- 使用 Kafka 的 Streams API 处理错误消息
- https://groups.google.com/g/confluent-platform/c/p75CleJ9yU0
Kafka Streams 很好地支持处理 Producer 或反序列化期间发生的异常。提供LogAndContinueExceptionHandler
的正是我们想要的行为。然而我们的主要问题是处理过程中发生的异常(例如在.mapValues()
或.leftJoin()
我们的想法基本上是验证先决条件
- 在反序列化过程中,如果未完成,则抛出 DeserializationException(并记录并继续)。
- 如果无法执行计算(
/ by zero error
等),则在处理函数中检查以返回默认值
但是,如果数据中有不可预见的东西,异常仍然可能冒出,并且拓扑将关闭。
Kafka Streams 提供了一个,UncaughtExceptionHandler
但它是在线程已经死亡后调用的,因此它不能用于防止拓扑关闭。
有没有办法编写一个跳过记录的 UncaughtExceptionHandler ?或者另一种机制来跳过我们可以在try-catch
处理函数内部的块中的当前记录?
解决方案
我认为最好的解决方案是以永远不会抛出任何异常的方式编写处理操作(例如:映射器、过滤器等)。为此,您可以使用一个包装器对象,它可以是成功或错误(例如:Either
scala 中的类型)。之后,您可以使用该branch()
方法获取两个流:一个用于成功记录,一个用于错误记录。
下面的代码显示了基本思想:
public static void main(String[] args) {
var builder = new StreamsBuilder();
KStream<Object, Result<Object>> stream = builder.stream("my-topic")
.map((k, v) -> {
try {
// unsafe operation, i.e that may throw an exception
return KeyValue.pair(k, new Success<>(v));
} catch (Exception e) {
return KeyValue.pair(k, new Error<>(e));
}
});
KStream<Object, Result<Object>>[] branch = stream.branch((k, v) -> !v.hasError(), (k, v) -> v.hasError());
// Handle the success steam
KStream<Object, Result<Object>> successStream = branch[0];
// Handle the error steam, e.g: log errors, write errors to a Dead Letter Queue
KStream<Object, Result<Object>> errorStream = branch[1];
}
public interface Result<T> {
T get() throws Exception;
Exception exception();
boolean hasError();
}
public static class Success<T> implements Result<T> {
private final T value;
public Success(T value) {
this.value = value;
}
@Override
public T get() throws Exception {
return value;
}
@Override
public Exception exception() {
return null;
}
@Override
public boolean hasError() {
return false;
}
}
public static class Error<T> implements Result<T> {
private final Exception error;
public Error(Exception error) { this.error = error; }
@Override
public T get() throws Exception{
throw error;
}
@Override
public Exception exception() {
return error;
}
@Override
public boolean hasError() {
return true;
}
}
另外,对于您提到的反序列化异常,Azkarra Streams项目提供了一些方便的 java 类,可以帮助您(例如 SafeSerdes、DeadLetterTopicExceptionHandler):GitHub
推荐阅读
- c - C 预处理器 - 在编译时向 Struct 添加元素
- asp.net-mvc - 如何在 MVC 的部分视图中填充导航选项卡上的数据?
- c++ - C++定义静态成员的正确方法是什么
- haskell - 如何在 Haskell 中为 IO 专门化 mapM
- gatling - Gatling:响应时间百分位数和延迟百分位数随时间的差异
- java - Libgdx 播放/暂停音乐
- android - 避免按下后退按钮时关闭 AlertDialog
- regex - 将正则表达式从 pcre 转换为 sed 以拆分字符串
- c++ - 某些输入的基数排序实现失败
- kubernetes - 取消或撤消删除 Kubernetes 集群中的 Persistent Volumes