首页 > 解决方案 > 是否可以使用 MirrorMaker2 复制没有别名前缀的 kafka 主题

问题描述

我正在尝试在 2 个集群之间设置复制,但不希望更改主题名称。例如,如果我有一个名为“some_topic”的主题,它会自动复制到“cluster1.some_topic”,我很确定这可以完成,但还没有找到正确的配置来更改它

我当前的配置“mirrormaker2.properties”

# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 

# specify any number of cluster aliases
clusters = cluster1, cluster2

# connection information for each cluster
cluster1.bootstrap.servers = host1:9092,host2:9092,host3:9092
cluster2.bootstrap.servers = rep_host1:9092,rep_host2:9092,rep_host3:9092

# enable and configure individual replication flows
cluster1->cluster2.enabled = true
cluster1->cluster2.topics = sometopic.*

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5

以供参考:

标签: apache-kafkaapache-kafka-mirrormaker

解决方案


要“禁用”主题前缀并同时正确镜像主题属性,我必须提供一个自定义的复制策略,该策略也覆盖该topicSource方法。否则非默认主题属性(例如,"cleanup.policy=compact")没有被镜像,即使在重新启动镜像制造商之后也是如此。

这是对我有用的完整程序:

  1. 将以下自定义复制策略编译并打包到 .jar 文件中(完整的源代码可以在这里找到):
public class PrefixlessReplicationPolicy extends DefaultReplicationPolicy {

  private static final Logger log = LoggerFactory.getLogger(PrefixlessReplicationPolicy.class);

  private String sourceClusterAlias;

  @Override
  public void configure(Map<String, ?> props) {
    super.configure(props);
    sourceClusterAlias = (String) props.get(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
    if (sourceClusterAlias == null) {
      String logMessage = String.format("Property %s not found", MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS);
      log.error(logMessage);
      throw new RuntimeException(logMessage);
    }
  }

  @Override
  public String formatRemoteTopic(String sourceClusterAlias, String topic) {
    return topic;
  }

  @Override
  public String topicSource(String topic) {
    return topic == null ? null : sourceClusterAlias;
  }

  @Override
  public String upstreamTopic(String topic) {
    return null;
  }
}
  1. 将 .jar 复制到${KAFKA_HOME/libs目录中
  2. 通过设置以下replication.policy.class属性,将 Mirror Maker 2 配置为使用该复制策略${KAFKA_HOME}/config/mm2.properties
  replication.policy.class=ch.mawileo.kafka.mm2.PrefixlessReplicationPolicy

推荐阅读