首页 > 解决方案 > 如何保证debezium生成的topic事件顺序,存储在kafka中,发送到spark?

问题描述

我正在从事变更数据捕获项目。我有一个mysql数据库。我使用 debezium 捕获所有更改并将其发送到 kafka。后来我从 Spark 中读取了所有信息,并使用 jdbc 将其发送到 Apache Phoenix。

我正在使用带有重新路由选项的 debezium,它将所有表的更改仅发送到一个 kafka 主题。有了这个配置,我确信我可以按顺序从 spark 中读取独特的 kafka 主题。

但我的问题是:如果我在没有重新路由选项的情况下使用 debezium,并且我在不同的 kafka 主题中更改了每个表,我怎么能保证我以正确的顺序阅读所有主题的事件?

我知道我可以使用 Spark 例如按时间戳对其进行排序,但如果说一个 kafka 主题因出现问题而离线 10 分钟,但其他 kafka 主题继续工作,我将在 Spark 中遇到排序问题。

我该如何面对这个问题?

标签: apache-sparkapache-kafkadebeziumchange-data-capture

解决方案


我在 Debezium 上用这个配置解决了这个问题

{
"name": "name-connector",
"config": {
    "plugin.name": "pgoutput",
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "0.0.0.0",
    "database.port": "5433",
    "database.user": "postgres",
    "database.password": "******",
    "database.dbname" : "database",
    "database.server.name": "database",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.database",
    "decimal.handling.mode" : "string",
    "time.precision.mode" : "connect",
    "tombstones.on.delete" : false,
    "transforms":"routerTopic",
    "transforms.routerTopic.type":"io.debezium.transforms.ByLogicalTableRouter",
    "transforms.routerTopic.topic.regex":"database.public.(.*)",
    "transforms.routerTopic.topic.replacement":"database.public",
}
}

使用 transforms.routerTopic.topic.regex 和 transforms.routerTopic.topic.replacement 配置主题路由

https://debezium.io/documentation/reference/0.10/configuration/topic-routing.html


推荐阅读