java - 使用 Admin API 安全地删除和重新创建 Kaka 主题,而不会出现 TopicExistsException
问题描述
我的用例是为针对本地 Kafka 集群的自动化测试准备环境。从 Kafka 自己的测试(例如 this)中汲取灵感,我想删除一个主题并重新创建它。由于我没有运行嵌入在测试 jvm 中的代理,因此我需要使用 Admin API 连接到正在运行的代理。
代码大致如下:
admin.deleteTopics(listWithTopicName).all().get();
probeUntil(() -> {
doesNotContain(admin.listTopics().names().get(), listWithTopicName);
});
admin.createTopics(new NewTopic(topicName, ...));
问题是调用会createTopics
引发 TopicExistsException,例如
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'test-topic' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at clojure.core$deref_future.invokeStatic(core.clj:2300)
at clojure.core$deref.invokeStatic(core.clj:2320)
at clojure.core$deref.invoke(core.clj:2306)
at playground$create_topics.invokeStatic(playground.clj:113)
at playground$create_topics.invoke(playground.clj:108)
at playground$delete_and_recreate_topics.invokeStatic(playground.clj:120)
at playground$delete_and_recreate_topics.invoke(playground.clj:118)
at playground$eval26601.invokeStatic(playground.clj:141)
at playground$eval26601.invoke(playground.clj:141)
(snip)
at clojure.main.main(main.java:40)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test-topic' already exists.
delete
我希望在和之间进行探测create
以避免该问题,但似乎即使listTopics
返回一个没有最近删除的主题的列表,随后createTopic
对同名的调用也会失败,并显示TopicExistsException
. 如果我在and之间插入一个Thread.sleep(100)
电话,问题就会消失。deleteTopics
createTopics
检查代理日志,我看到一系列与删除操作相关的日志,但与创建调用无关。
顺便说一句,我的逻辑基于 kafka 测试做类似的操作,但是探测是基于通过 zookeeper 迭代注册的主题,这需要为我的用例添加更多的依赖关系和复杂性。
解决方案
我发现没有办法改进探测以避免竞争条件,因此解决方案是检测 try 块中的特定情况,然后重试几次。大致如下:
void deleteAndRecreate(String topicName, Admin admin) {
admin.deleteTopics(listWithTopicName).all().get();
probeUntil(() -> {
!topicExists(topicName, admin);
});
tryCreateTopic(topicName, ..., admin, 0);
}
boolean topicExists(String topicName, Admin admin) {
return admin.listTopics().names().get().contains(asList(topicName));
}
void tryCreateTopic(String topicName, ... Admin admin, int attempts) {
try {
admin.createTopics(asList(new NewTopic(topicName,...))).all().get()
probeUntil(() -> topicIsCorrect(topicName, ...));
} catch (ExcecutionException e) {
if ((e.getCause() instanceOf TopicExistsException) && !topicExists(topicName, admin) && attempts <= MAX_ATTEMPTS) {
Thread.sleep(30 * attempts);
tryCreateTopic(...., attempts + 1);
}
else {
throw e;
}
}
}
实际代码在 Clojure 中,因此在转写为 Java 时可能存在错误,但基本思想有效。不漂亮,但有效。
推荐阅读
- rna-seq - 如何在批量 RNA seq 数据中选择高度可变的基因?
- asynchronous - 如何在使用异步消息(例如 SQS 或任何其他消息服务)时保持秩序
- amazon-web-services - 如何使用 AWS DMS 中的二进制读取器以 s3 作为复制目标从 Oracle 表中捕获 default_values
- sql - 改进 SQL teradata 查询
- python - 我怎样才能在这个字典列表中获得“价格”的价值?
- c# - 有没有办法在 Xamarin.Forms 的 CollectionView 中让 2 个不同的对象相互跟随?
- jquery - Jquery Autocomplete:- 如何只允许特定信息填充框
- javascript - 在 Angular 中使用 fetch 下载 .msg
- html - 如何将动态重复的元素定位在 div 的底部或重复“向上”?
- php - 如何模糊实时结果,使其无法通过浏览器开发工具进行调整