首页 > 解决方案 > Kafka 主题似乎只在第一次起作用。为什么?

问题描述

我正在使用 Kafka Connect(使用 Confluent 实现)并且看到一个奇怪的行为。我配置了一个源连接以从数据库表中提取数据,并填充一个主题。这行得通。但是,如果我删除主题,删除源配置,然后重置配置(可能向查询添加另一列),则不会填充主题。如果我将主题名称更改为我以前没有使用过的名称,它会起作用。我正在使用 Postman 来设置配置,但我认为这在这里并不重要。

我的连接配置:

{
    "name": "my-jdbc-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:db2://db2server.mycompany.com:4461/myDB",
        "connection.user: "dbUser",
        "connection.password": "dbPass",
        "dialect.name": "Db2DatabaseDialect",
        "mode": "timestamp",
        "query": "select fname, lname, custId, custRegion, lastUpdate from CustomerMaster",
        "timestamp.column.name": "lastUpdate",
        "table.types": "TABLE",
        "topic.prefix": "master.customer"
    }
}

标签: apache-kafkaapache-kafka-connect

解决方案


KAFKA JDBC 连接器在时间戳列上使用 HighWatermark,即在您的情况下是最后一次更新。它不依赖于主题,甚至您可以删除 JDBC 连接器并使用相同的名称重新创建它,它仍将使用相同的 HighWatermark,因为 HighWatermark 取决于连接器名称。因此,即使您重新创建主题,它也不会再次加载数据。因此,有一种方法可以再次重新处理整个数据,您可以遵循任何一种方式:

  1. 删除主题并删除 JDBC 连接器,重新创建主题,然后使用不同的名称创建 JDBC 连接器。或者

  2. 删除 JDBC 连接器并使用模式 "mode": "bulk" 再次使用相同的名称重新创建。它将在主题中再次转储所有数据库表。加载后,您可以再次将模式更新为时间戳。请参阅 JDBC 连接器配置详细信息

https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html

  1. 将所有记录的 lastUpdate 更新为当前时间戳。

推荐阅读