首页 > 解决方案 > Kafka JDBC 事务源配置

问题描述

我正在尝试使用 kafka-connect 从两个表中获取行。我connect-file-source.properties以这种方式配置

name=jdbc_source_postgres_foobar_01
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schema.registry.url=http://localhost:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:postgresql://localhost:5432/store?user=postgres&password=root
table.whitelist=author,book
mode=incrementing
incrementing.column.name=id
validate.non.null=false
topics=author,book
topic.prefix=

涉及的表有authorand book,后者有外键指代author

然后我注册了一个监听器来消费来自“作者”和“书籍”主题的消息,以便将它们插入另一个数据库。

@KafkaListener(
    topics={"author","book"}, 
    groupId = "foo", 
    containerFactory = "fooKafkaListenerContainerFactory"
)
public void listenGroupFoo(@Payload PostgresTableRow message) {
    System.out.println("Received" + message);
    String tableName = message.tableName();
    HashMap<String, Object> params = message.params();
    
    insert(tableName, params);
}

当所涉及的表彼此之间没有约束时,这工作得很好,但在这种情况下,当来自“book”主题的消息在来自“author”的消息之前被消耗时,我会收到错误。

例如,我在源数据库中插入作者“George Orwell”id=23和“1984”一书id=37authorId=23,两条消息被推送到Kafka,一条在“作者”主题中,一条在“书”主题中。如果消息首先从“书”主题消费,然后从“作者”主题消费,我会收到无法在我的接收器数据库中插入 ID 为 37 的书的错误,因为不存在 ID 为 23 的此类作者。

那么我该如何解决呢?有没有办法将多个表推入一个主题并授予订单?

标签: apache-kafkaapache-kafka-connect

解决方案


在 CDC(变更数据捕获)世界中,您正面临一个需要解决的复杂问题,其中 Kafka 位于中间。

您希望以这种方式实现从数据库到 Kafka 和从 Kafka 到另一个数据库的事务一致、有序、精确一次的复制,以使您面临的那些引用完整性问题,即:由于竞争条件,不会发生。

我建议阅读 Robin Moffatt 关于 CDC 和 Kafka Connect JDBC 连接器的文章,以及 Shawn Robertson 在 Kafka Summit 18 上关于这个问题的演讲。

  1. 不再孤岛:如何将您的数据库与 Apache Kafka 和 CDC 集成
  2. Kafka Connect Deep Dive – JDBC 源连接器
  3. 事务一致性、有序、Exactly-Once 复制从数据库到云中的 Kafka 并返回——利用 Kafka 提供端到端 ACID 事务的解决方案

不幸的是,如果没有现成的端到端 CDC 解决方案,我想您要么需要非常有创意,要么需要付出相当大的努力来克服这个问题。


推荐阅读