apache-kafka - Kafka 主题似乎只在第一次起作用。为什么?
问题描述
我正在使用 Kafka Connect(使用 Confluent 实现)并且看到一个奇怪的行为。我配置了一个源连接以从数据库表中提取数据,并填充一个主题。这行得通。但是,如果我删除主题,删除源配置,然后重置配置(可能向查询添加另一列),则不会填充主题。如果我将主题名称更改为我以前没有使用过的名称,它会起作用。我正在使用 Postman 来设置配置,但我认为这在这里并不重要。
我的连接配置:
{
"name": "my-jdbc-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:db2://db2server.mycompany.com:4461/myDB",
"connection.user: "dbUser",
"connection.password": "dbPass",
"dialect.name": "Db2DatabaseDialect",
"mode": "timestamp",
"query": "select fname, lname, custId, custRegion, lastUpdate from CustomerMaster",
"timestamp.column.name": "lastUpdate",
"table.types": "TABLE",
"topic.prefix": "master.customer"
}
}
解决方案
KAFKA JDBC 连接器在时间戳列上使用 HighWatermark,即在您的情况下是最后一次更新。它不依赖于主题,甚至您可以删除 JDBC 连接器并使用相同的名称重新创建它,它仍将使用相同的 HighWatermark,因为 HighWatermark 取决于连接器名称。因此,即使您重新创建主题,它也不会再次加载数据。因此,有一种方法可以再次重新处理整个数据,您可以遵循任何一种方式:
删除主题并删除 JDBC 连接器,重新创建主题,然后使用不同的名称创建 JDBC 连接器。或者
删除 JDBC 连接器并使用模式 "mode": "bulk" 再次使用相同的名称重新创建。它将在主题中再次转储所有数据库表。加载后,您可以再次将模式更新为时间戳。请参阅 JDBC 连接器配置详细信息
- 将所有记录的 lastUpdate 更新为当前时间戳。
推荐阅读
- snowflake-cloud-data-platform - DBT 工具:运行时错误编译错误无法呈现
- react-native - 键盘打开时隐藏标签栏反应本机
- vue.js - 如何在应用程序内获取开发服务器代理目标?
- amazon-web-services - 允许 S3 访问同一账户中的特定 IAM 角色的 IAM 策略
- zio - 如何为理解片段运行一个简单的 ZIO
- java - 如何每 10 秒执行一次函数
- asp.net - 使用 MySQL 的 Visual Studio ASP.Net
- testing - 有没有办法使用运行器运行具有 TestCafe 的 test.meta 的测试?
- python - 自定义视图组件到基于视图
- c# - 通过 JSON 类进行迭代