首页 > 解决方案 > 我可以在另一个集群中复制已经复制的 Kafka 主题吗?

问题描述

我有 3 个 Kafka 集群,并且我在其他 Clsuter 中复制了一个主题。集群 1 主题“测试”

集群 2 test.replica.. 我在集群 3 中复制了这个主题,但是当我将数据发送到主题“test”时,我可以从主题“test.replica”中读取数据,主题 test.replica.replica 似乎是空的

集群 3test.replica.replica

连接器

{
    "name":"test-z1-z2",
    "config":{
            "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector",
            "tasks.max":"4",
            "key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
            "value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
            "src.kafka.bootstrap.servers":"localhost:9092",
            "src.zookeeper.connect":"localhost:2181",
            "dest.zookeeper.connect":"localhost:2182",
            "topic.whitelist":"test",
            "topic.rename.format":"test.replica",
            "confluent.license":""
    }
}

{
        "name":"test-z2-z3",
        "config":{
                "connector.class":"io.confluent.connect.replicator.ReplicatorSourceConnector",
                "tasks.max":"4",
                "key.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
                "value.converter":"io.confluent.connect.replicator.util.ByteArrayConverter",
                "src.kafka.bootstrap.servers":"localhost:9093",
                "src.zookeeper.connect":"localhost:2182",
                "dest.zookeeper.connect":"localhost:2183",
                "topic.whitelist":"test.replica",
                "topic.rename.format":"test.replica.replica",
                "confluent.license":""
        }
}

集群 1

[root@localhost bin]# ./kafka-topics --list --zookeeper localhost:2181
__confluent.support.metrics
__consumer_offsets
__consumer_timestamps
test

集群 2

[root@localhost bin]# ./kafka-topics --list --zookeeper localhost:2182
__confluent.support.metrics
__consumer_offsets
test.replica
[root@localhost bin]# 
[root@localhost bin]# ./kafka-topics --list --zookeeper localhost:2183
__confluent.support.metrics
__consumer_offsets
test.replica.replica

生产数据

[root@localhost bin]# seq 10 | ./kafka-console-producer --broker-list localhost:9092 --topic test
>>>>>>>>>>>

消费副本

[root@localhost bin]# ./kafka-console-consumer --bootstrap-server localhost:9093 --topic test.replica
1
2
3
4
5
6
7
8
9
10

尝试另一个副本

[root@localhost bin]# ./kafka-console-consumer --bootstrap-server localhost:9094 --topic test.replica.replica

(no Data)

我想将数据发送到一个集群,然后从那里再次将其复制到其他集群。

标签: apache-kafkareplicationconfluent-platform

解决方案


复制的数据可能已经到达代理,并且您指向的是最新的偏移量。

您可以--from-beginning从头开始添加消耗。

真正的检查将OffsetShell用于查看主题的实际偏移量。此外,目标主题已正确创建,因此复制器至少正在处理该部分。

也不清楚您的连接器配置是否属于同一个 Connect 集群。您是否connect-disributed在不同的端口上运行,bootstrap.servers也不同?


推荐阅读