apache-kafka - Kafka KStream-KTable 加入竞争条件
问题描述
我有以下内容:
KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");
streamB 中的消息需要使用 tableA 中的数据进行丰富。
示例数据:
Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})
在一个完美的世界里,我想做
streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
.selectKey((k,b) -> b.name)
.to("C");
不幸的是,这对我不起作用,因为我的数据是这样的,每次将消息写入主题 A 时,也会将相应的消息写入主题 B(源是单个 DB 事务)。现在,在这个初始“创建”事务之后,主题 B 将继续接收更多消息。有时每秒会在主题 B 上显示几个事件,但对于给定的键,也可能有几个小时相隔的连续事件。
简单解决方案不起作用的原因是原始“创建”事务导致竞争条件:主题 A 和 B 几乎同时获取消息,如果 B 消息首先到达拓扑的“加入”部分(比如几毫秒在 A 消息到达之前)tableA 将不包含相应的条目。此时事件丢失。我可以看到在主题 C 上发生了这种情况:有些事件出现了,有些则没有(如果我使用 leftJoin,所有事件都会出现,但有些事件有空键,这相当于丢失)。这只是初始“创建”事务的问题。此后,每当有事件到达主题 B 时,对应的条目就存在于表 A 中。
所以我的问题是:你如何解决这个问题?
我目前的解决方案很丑陋。我所做的是我创建了一个“B 集合”并使用
B.groupByKey()
.aggregate(() -> new CollectionOfB(), (id, b, agg) -> agg.add(b));
.join(tableA, ...);
现在我们有一个 KTable-KTable 连接,它不会受到这种竞争条件的影响。我认为这个“丑陋”的原因是因为在每次加入后,我必须向主题 B 发送一条特殊消息,该消息基本上说“从集合中删除我刚刚处理的事件”。如果此特殊消息未发送到主题 B,则集合将继续增长,并且集合中的每个事件都将在每次加入时报告。
目前我正在调查窗口连接是否可行(将 A 和 B 读入 KStreams 并使用窗口连接)。我不确定这是否会起作用,因为窗口大小没有上限。我想说,“窗口在'之前'开始 1 秒并在'之后'结束无穷秒”。即使我能以某种方式完成这项工作,我还是有点担心拥有无界窗口的空间需求。
任何建议将不胜感激。
解决方案
不确定您使用的是什么版本,但最新的 Kafka 2.1 改进了流表连接。即使在 2.1 之前,以下内容也成立:
- 流表连接基于事件时间
- Kafka Streams 基于事件时间处理消息,但是以偏移顺序(对于两个输入流,记录时间戳较小的流首先处理)
- 如果要确保先更新表,则表更新记录的时间戳应小于流记录
从 2.1 开始:
- 为了允许一些延迟,您可以配置
max.task.idle.ms
配置以延迟处理只有一个输入主题具有输入数据的情况
事件时间处理顺序在 2.0 和更早版本中实现为尽力而为,这可能导致您描述的竞争条件。max.task.idle.ms
在 2.1 中,处理顺序是有保证的,只有在命中时才可能被违反。
推荐阅读
- windows - 柯南无法再次链接已安装的库,因为它的名称与所需的字母大小不同
- python - pd.to_datetime 创建小时自动添加非真实日期
- android-viewpager - 通过滑动而不是在 Android 中单击按钮来保存表单数据
- amazon-web-services - 授予 S3 跨账户完全访问权限
- c# - Azure 函数 - 你可以在函数范围之前访问 IServiceProvider 吗?
- javascript - 如何从 fs.readFile node.js 返回值
- android - 添加新行时,EditText 会移动整个 gui,如何停止?
- php - 如何在 PHP 的循环中递归地加入字符串
- svg - svg matrixTransform 和 svg.getScreenCTM 是如何工作的?
- java - startsWith(String word, char c) 中的参数