apache-spark - 如何保证debezium生成的topic事件顺序,存储在kafka中,发送到spark?
问题描述
我正在从事变更数据捕获项目。我有一个mysql数据库。我使用 debezium 捕获所有更改并将其发送到 kafka。后来我从 Spark 中读取了所有信息,并使用 jdbc 将其发送到 Apache Phoenix。
我正在使用带有重新路由选项的 debezium,它将所有表的更改仅发送到一个 kafka 主题。有了这个配置,我确信我可以按顺序从 spark 中读取独特的 kafka 主题。
但我的问题是:如果我在没有重新路由选项的情况下使用 debezium,并且我在不同的 kafka 主题中更改了每个表,我怎么能保证我以正确的顺序阅读所有主题的事件?
我知道我可以使用 Spark 例如按时间戳对其进行排序,但如果说一个 kafka 主题因出现问题而离线 10 分钟,但其他 kafka 主题继续工作,我将在 Spark 中遇到排序问题。
我该如何面对这个问题?
解决方案
我在 Debezium 上用这个配置解决了这个问题
{
"name": "name-connector",
"config": {
"plugin.name": "pgoutput",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "0.0.0.0",
"database.port": "5433",
"database.user": "postgres",
"database.password": "******",
"database.dbname" : "database",
"database.server.name": "database",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.database",
"decimal.handling.mode" : "string",
"time.precision.mode" : "connect",
"tombstones.on.delete" : false,
"transforms":"routerTopic",
"transforms.routerTopic.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.routerTopic.topic.regex":"database.public.(.*)",
"transforms.routerTopic.topic.replacement":"database.public",
}
}
使用 transforms.routerTopic.topic.regex 和 transforms.routerTopic.topic.replacement 配置主题路由
https://debezium.io/documentation/reference/0.10/configuration/topic-routing.html
推荐阅读
- angular - 显示 dataSource 表等待 mouseEvent
- excel - Power App 出现问题(将数据从 Power App 加载到 Excel)
- r - 根据数据帧 R 上连续第一个值的条件替换第二个值
- react-native - 根据条件使用 json 填充选择器
- python - Python中不一致的字符解码错误
- python - 为什么我的 CNN 指标在每个时期都没有变化?
- reactjs - collectStyles 未从样式组件 SSR 中的根应用组件获取样式
- vue.js - nativescript vue dialogs.confirm - 代码不在对话框内执行
- c++ - 根据函数输入初始化全局数组
- laravel - 如何使用 laravel 关系在 laravel 中的 3 个表之间建立关系?