apache-kafka - 使用 kafka 连接器在 kafka 主题之间复制数据
问题描述
我是 Kafka 的新手,现在我需要将数据从一个 kafka 主题复制到另一个主题。我想知道这样做的可能方法是什么?我能想到的方法如下:
- Kakfa 消费者 + Kafka 生产者
- 卡夫卡流
- Kafka sink 连接器 + 生产者
- Kafka消费者+源连接器
我的问题是:是否可以在两者之间使用两个 kafka 连接器?例如接收器连接器+源连接器。是这样吗,你能给我一些好的例子吗?或者一些提示如何做到这一点?
提前致谢!
解决方案
您列出的所有方法都是可能的。哪一个是最好的实际上取决于您想要对过程进行控制,或者它是一次性操作还是您想要继续运行的东西。
Kafka Streams 提供了一种通过 DSL 将一个主题流入另一个主题的简单方法
您可以执行以下操作(演示代码显然不适用于生产!):
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final Serde<byte[]> bytesSerdes = Serdes.ByteArray();
final StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], byte[]> input = builder.stream(
"input-topic",
Consumed.with(bytesSerdes, bytesSerdes)
);
input.to("output-topic", Produced.with(bytesSerdes, bytesSerdes));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
try {
streams.start();
Thread.sleep(60000L);
} catch (Exception e) {
e.printStackTrace();
} finally {
streams.close();
}
推荐阅读
- octobercms - Pikaday 没有为其他 OctoberCMS 管理员加载
- angular - 我只是在运行“npm install -g @angular/cli”,我得到了这个错误
- python - 如何在 python 中打开“文件”类型的文件?
- sql - 如何在Oracle中进行连续删除语句
- java - 使用 Apache Ozone FileSystem API 会导致错误
- adsense - Kaspersky Anti-Banner System is blocking AdSense in my website
- python - Error in Sckit learn code: " digits: Bunch Instance of 'tuple' has no 'target' memberpylint(no-member) " .How to fix?
- dart - 如何在不同持续时间的颤动中生成定时器?
- android - Flutter - Text not visible inside FlatButton with FittedBox
- android - Unable to handle volley response as JsonObject