首页 > 解决方案 > 如何使用 debezium mysql 连接器 kafka 进行初始快照加载?

问题描述

我正在使用 Kafka 集群和 Debezium MySql 连接器从数据库更新中获取消息到 Elasticsearch。有一段时间我在做一些测试,最后我得到了一个混合的 ES,所以我想对 Elasticsearch 做一个全新的满载。

所以我想停止 debezium 连接器以停止馈送 ES 并删除所有索引,以便当我再次启动连接器时它可以满载。据我所知,连接器仅适用于pause/resume动作,而那些不执行初始加载。

此时,我将手动删除连接器和创建的主题并重新创建它们,以便它可以按初始状态加载,但是关于如何在正确的步骤中执行此清理过程的任何想法?

标签: mysqlapache-kafkaapache-kafka-connectdebezium

解决方案


我不知道是否有更好的方法,但这个解决方案对我有用:

  1. 停止/etc/kafka/connect-distributed.properties守护进程的服务。
sudo systemctl stop confluent-connect-distributed.service
  1. 删除 Debezium 的连接器
curl -X DELETE http://localhost:8083/connectors/<connector name>
  1. 删除与我的工作相关的所有主题,在这种情况下,这个 kafka 集群是一个开发者,所以我删除了所有主题,甚至是默认主题。
__consumer_offsets
_confluent-command
_schemas
connect-configs
connect-offsets
connect-status
kafka-topics --bootstrap-server <kafka bootstrap> --delete --topic <topic name>
  1. 再次创建默认主题
kafka-topics --create --bootstrap-server <boostrap kafka> --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
kafka-topics --create --bootstrap-server <boostrap kafka> --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
kafka-topics --create --bootstrap-server <boostrap kafka> --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
  1. 再次启动服务
sudo systemctl start confluent-connect-distributed.service
  1. 再次创建连接器
curl -s -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/<connector name>/config \
    -d '<json>'

这样,您最终将在 elasticsearch 中获得初始快照。


推荐阅读