mysql - 如何使用 debezium mysql 连接器 kafka 进行初始快照加载?
问题描述
我正在使用 Kafka 集群和 Debezium MySql 连接器从数据库更新中获取消息到 Elasticsearch。有一段时间我在做一些测试,最后我得到了一个混合的 ES,所以我想对 Elasticsearch 做一个全新的满载。
所以我想停止 debezium 连接器以停止馈送 ES 并删除所有索引,以便当我再次启动连接器时它可以满载。据我所知,连接器仅适用于pause/resume
动作,而那些不执行初始加载。
此时,我将手动删除连接器和创建的主题并重新创建它们,以便它可以按初始状态加载,但是关于如何在正确的步骤中执行此清理过程的任何想法?
解决方案
我不知道是否有更好的方法,但这个解决方案对我有用:
- 停止
/etc/kafka/connect-distributed.properties
守护进程的服务。
sudo systemctl stop confluent-connect-distributed.service
- 删除 Debezium 的连接器
curl -X DELETE http://localhost:8083/connectors/<connector name>
- 删除与我的工作相关的所有主题,在这种情况下,这个 kafka 集群是一个开发者,所以我删除了所有主题,甚至是默认主题。
__consumer_offsets
_confluent-command
_schemas
connect-configs
connect-offsets
connect-status
kafka-topics --bootstrap-server <kafka bootstrap> --delete --topic <topic name>
- 再次创建默认主题
kafka-topics --create --bootstrap-server <boostrap kafka> --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
kafka-topics --create --bootstrap-server <boostrap kafka> --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
kafka-topics --create --bootstrap-server <boostrap kafka> --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
- 再次启动服务
sudo systemctl start confluent-connect-distributed.service
- 再次创建连接器
curl -s -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/<connector name>/config \
-d '<json>'
这样,您最终将在 elasticsearch 中获得初始快照。
推荐阅读
- angular - Angular routerLink 不适用于 Electron
- jenkins - 对多个作业使用单个 Jenkins 管道
- arduino - 读取 4 线电阻式触摸屏的坐标
- php - MySQL:以这种方式连接表,以便具有相同外键的行合并为一个
- android - 在上传到 Firebase 存储之前减小图像大小
- c# - 在 List 上使用匿名返回时 Linq 查询出错
- computer-vision - 从头开始训练网络的准确性极差
- python - 如何在postgres中将两个相似的JSON树合并成一个带有数组的大胖树?
- flutter - 如何通过在 Windows 中打开该文件夹来获取 Flutter 中的特定路径?
- c - 如何使用 c api 在 LLVM IR 中读取/写入数组中的值?