首页 > 解决方案 > Kafka KStream-KTable 加入竞争条件

问题描述

我有以下内容:

KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");  

streamB 中的消息需要使用 tableA 中的数据进行丰富。

示例数据:

Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})

在一个完美的世界里,我想做

streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
       .selectKey((k,b) -> b.name)
       .to("C");

不幸的是,这对我不起作用,因为我的数据是这样的,每次将消息写入主题 A 时,也会将相应的消息写入主题 B(源是单个 DB 事务)。现在,在这个初始“创建”事务之后,主题 B 将继续接收更多消息。有时每秒会在主题 B 上显示几个事件,但对于给定的键,也可能有几个小时相隔的连续事件。

简单解决方案不起作用的原因是原始“创建”事务导致竞争条件:主题 A 和 B 几乎同时获取消息,如果 B 消息首先到达拓扑的“加入”部分(比如几毫秒在 A 消息到达之前)tableA 将不包含相应的条目。此时事件丢失。我可以看到在主题 C 上发生了这种情况:有些事件出现了,有些则没有(如果我使用 leftJoin,所有事件都会出现,但有些事件有空键,这相当于丢失)。这只是初始“创建”事务的问题。此后,每当有事件到达主题 B 时,对应的条目就存在于表 A 中。

所以我的问题是:你如何解决这个问题?

我目前的解决方案很丑陋。我所做的是我创建了一个“B 集合”并使用

B.groupByKey()
 .aggregate(() -> new CollectionOfB(), (id, b, agg) -> agg.add(b));
 .join(tableA, ...);

现在我们有一个 KTable-KTable 连接,它不会受到这种竞争条件的影响。我认为这个“丑陋”的原因是因为在每次加入后,我必须向主题 B 发送一条特殊消息,该消息基本上说“从集合中删除我刚刚处理的事件”。如果此特殊消息未发送到主题 B,则集合将继续增长,并且集合中的每个事件都将在每次加入时报告。

目前我正在调查窗口连接是否可行(将 A 和 B 读入 KStreams 并使用窗口连接)。我不确定这是否会起作用,因为窗口大小没有上限。我想说,“窗口在'之前'开始 1 秒并在'之后'结束无穷秒”。即使我能以某种方式完成这项工作,我还是有点担心拥有无界窗口的空间需求。

任何建议将不胜感激。

标签: apache-kafkaapache-kafka-streams

解决方案


不确定您使用的是什么版本,但最新的 Kafka 2.1 改进了流表连接。即使在 2.1 之前,以下内容也成立:

  • 流表连接基于事件时间
  • Kafka Streams 基于事件时间处理消息,但是以偏移顺序(对于两个输入流,记录时间戳较小的流首先处理)
  • 如果要确保先更新表,则表更新记录的时间戳应小于流记录

从 2.1 开始:

  • 为了允许一些延迟,您可以配置max.task.idle.ms配置以延迟处理只有一个输入主题具有输入数据的情况

事件时间处理顺序在 2.0 和更早版本中实现为尽力而为,这可能导致您描述的竞争条件。max.task.idle.ms在 2.1 中,处理顺序是有保证的,只有在命中时才可能被违反。

详情见https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization


推荐阅读