首页 > 解决方案 > 使用 Admin API 安全地删除和重新创建 Kaka 主题,而不会出现 TopicExistsException

问题描述

我的用例是为针对本地 Kafka 集群的自动化测试准备环境。从 Kafka 自己的测试(例如 this)中汲取灵感,我想删除一个主题并重新创建它。由于我没有运行嵌入在测试 jvm 中的代理,因此我需要使用 Admin API 连接到正在运行的代理。

代码大致如下:

admin.deleteTopics(listWithTopicName).all().get();
probeUntil(() -> {
  doesNotContain(admin.listTopics().names().get(), listWithTopicName);
});
admin.createTopics(new NewTopic(topicName, ...));

问题是调用会createTopics引发 TopicExistsException,例如

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'test-topic' already exists.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at clojure.core$deref_future.invokeStatic(core.clj:2300)
    at clojure.core$deref.invokeStatic(core.clj:2320)
    at clojure.core$deref.invoke(core.clj:2306)
    at playground$create_topics.invokeStatic(playground.clj:113)
    at playground$create_topics.invoke(playground.clj:108)
    at playground$delete_and_recreate_topics.invokeStatic(playground.clj:120)
    at playground$delete_and_recreate_topics.invoke(playground.clj:118)
    at playground$eval26601.invokeStatic(playground.clj:141)
    at playground$eval26601.invoke(playground.clj:141)
    (snip)
    at clojure.main.main(main.java:40)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test-topic' already exists.

delete我希望在和之间进行探测create以避免该问题,但似乎即使listTopics返回一个没有最近删除的主题的列表,随后createTopic对同名的调用也会失败,并显示TopicExistsException. 如果我在and之间插入一个Thread.sleep(100)电话,问题就会消失。deleteTopicscreateTopics

检查代理日志,我看到一系列与删除操作相关的日志,但与创建调用无关。

顺便说一句,我的逻辑基于 kafka 测试做类似的操作,但是探测是基于通过 zookeeper 迭代注册的主题,这需要为我的用例添加更多的依赖关系和复杂性。

标签: javaapache-kafka

解决方案


我发现没有办法改进探测以避免竞争条件,因此解决方案是检测 try 块中的特定情况,然后重试几次。大致如下:

void deleteAndRecreate(String topicName, Admin admin) {
  admin.deleteTopics(listWithTopicName).all().get();
  probeUntil(() -> { 
    !topicExists(topicName, admin);
  });
  tryCreateTopic(topicName, ..., admin, 0);
}

boolean topicExists(String topicName, Admin admin) {
  return admin.listTopics().names().get().contains(asList(topicName));
}

void tryCreateTopic(String topicName, ... Admin admin, int attempts) {
  try {
    admin.createTopics(asList(new NewTopic(topicName,...))).all().get()
    probeUntil(() -> topicIsCorrect(topicName, ...));
  } catch (ExcecutionException e) {
    if ((e.getCause() instanceOf TopicExistsException) && !topicExists(topicName, admin) && attempts <= MAX_ATTEMPTS) {
      Thread.sleep(30 * attempts);
      tryCreateTopic(...., attempts + 1);
    }
    else {
      throw e;
    } 
  }
}

实际代码在 Clojure 中,因此在转写为 Java 时可能存在错误,但基本思想有效。不漂亮,但有效。


推荐阅读