apache-kafka - 是否可以使用 MirrorMaker2 复制没有别名前缀的 kafka 主题
问题描述
我正在尝试在 2 个集群之间设置复制,但不希望更改主题名称。例如,如果我有一个名为“some_topic”的主题,它会自动复制到“cluster1.some_topic”,我很确定这可以完成,但还没有找到正确的配置来更改它
我当前的配置“mirrormaker2.properties”
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = cluster1, cluster2
# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092
# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*
# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5
以供参考:
解决方案
要“禁用”主题前缀并同时正确镜像主题属性,我必须提供一个自定义的复制策略,该策略也覆盖该topicSource
方法。否则非默认主题属性(例如,"cleanup.policy=compact"
)没有被镜像,即使在重新启动镜像制造商之后也是如此。
这是对我有用的完整程序:
- 将以下自定义复制策略编译并打包到 .jar 文件中(完整的源代码可以在这里找到):
public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {
private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);
private String sourceClusterAlias;
@Override
public void configure(Map<String, ?> props) {
super.configure(props);
sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
if (sourceClusterAlias == null) {
String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
log.error(logMessage);
throw new RuntimeException(logMessage);
}
}
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
return topic;
}
@Override
public String topicSource(String topic) {
return topic == null ? null : sourceClusterAlias;
}
@Override
public String upstreamTopic(String topic) {
return null;
}
}
- 将 .jar 复制到
${KAFKA_HOME/libs
目录中 - 通过设置以下
replication.policy.class
属性,将 Mirror Maker 2 配置为使用该复制策略${KAFKA_HOME}/config/mm2.properties
:
replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy
推荐阅读
- ios - NSURLSession 发布一个具有 url 编码字符串的参数
- c++ - 在较小的约束下使用位掩码找到 Bin 打包问题的解决方案
- ios - 某些编码的“Mp4”视频似乎无法正常工作| 视频编解码器
- c# - 当我尝试注册 TypingDNA 集成时,我不断收到错误“无法验证提供的信息”
- python - 尝试使用联接不工作输出 2D 列表
- jekyll - 我使用带有 jekyll 的 github 页面使用流式语法。有奇怪的事情
- android - 使用 Android 摄像头读取非 HTTP 二维码发送意图?
- python - 如何将另一个数据框附加到相同结构的数据框并消除重复项?
- angular - 不能在另一个compounanet(角度)中使用compunat使用API
- python - Codechef Runtime Error-减少内存使用