首页 > 解决方案 > 使用 kafka 连接器在 kafka 主题之间复制数据

问题描述

我是 Kafka 的新手,现在我需要将数据从一个 kafka 主题复制到另一个主题。我想知道这样做的可能方法是什么?我能想到的方法如下:

  1. Kakfa 消费者 + Kafka 生产者
  2. 卡夫卡流
  3. Kafka sink 连接器 + 生产者
  4. Kafka消费者+源连接器

我的问题是:是否可以在两者之间使用两个 kafka 连接器?例如接收器连接器+源连接器。是这样吗,你能给我一些好的例子吗?或者一些提示如何做到这一点?

提前致谢!

标签: apache-kafkaapache-kafka-connect

解决方案


您列出的所有方法都是可能的。哪一个是最好的实际上取决于您想要对过程进行控制,或者它是一次性操作还是您想要继续运行的东西。

Kafka Streams 提供了一种通过 DSL 将一个主题流入另一个主题的简单方法

您可以执行以下操作(演示代码显然不适用于生产!):

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

final Serde<byte[]> bytesSerdes = Serdes.ByteArray();
final StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], byte[]> input = builder.stream(
        "input-topic",
        Consumed.with(bytesSerdes, bytesSerdes)
);
input.to("output-topic", Produced.with(bytesSerdes, bytesSerdes));

final KafkaStreams streams = new KafkaStreams(builder.build(), props);
try {
    streams.start();
    Thread.sleep(60000L);
} catch (Exception e) {
    e.printStackTrace();
} finally {
    streams.close();
}

推荐阅读