首页 > 解决方案 > Kafka JDBC 源连接器插入或更新

问题描述

我配置了一个 Kafka JDBC 源连接器,以便从 PostgreSQL 数据库推送记录更改(插入或更新)的 Kafka 主题。我使用“时间戳+递增”模式。似乎工作正常。我没有配置 JDBC Sink 连接器,因为我使用的是听该主题的 Kafka Consumer。

该主题的消息是 JSON。这是一个例子:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "entity_create_date"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "entity_modify_date"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "entity_version"
      },
      {
        "type": "string",
        "optional": true,
        "field": "firstname"
      },
      {
        "type": "string",
        "optional": true,
        "field": "lastname"
      }
    ],
    "optional": false,
    "name": "author"
  },
  "payload": {
    "id": 1,
    "entity_create_date": 1600287236682,
    "entity_modify_date": 1600287236682,
    "entity_version": 1,
    "firstname": "George",
    "lastname": "Orwell"
  }
}

如您所见,没有关于源连接器是否由于插入或更新而捕获此更改的信息。我需要这些信息。怎么能解决?

标签: apache-kafkaapache-kafka-connect

解决方案


您无法使用 JDBC 源连接器获取该信息,除非您在源模式和触发器中执行了定制操作。

这就是为什么基于日志的 CDC 通常是从源数据库获取事件的更好方法的原因之一,以及其他原因包括:

  • 捕获删除
  • 捕获操作类型
  • 捕获所有事件,而不仅仅是连接器轮询时的事件。

有关此细微差别的更多详细信息,请参阅此博客基于相同的演讲


推荐阅读