首页 > 解决方案 > 如何使用 Kafka JDBC 连接器跟踪具有特定列值的行(按 id)?

问题描述

我有一个包含大量记录的表。有一列定义了记录的类型。我想在该列中收集具有特定值的记录。有点儿:

Select * FROM myVeryOwnTable WHERE type = "VERY_IMPORTANT_TYPE" 

我注意到WHERE,当我选择增量(+时间戳)模式时,我不能在自定义查询中使用子句,否则我需要小心自己过滤。我想实现的背景是我使用 Logstash 将某种类型的数据从 MySQL 传输到 ES。这很容易通过使用可以包含 where 子句的查询来实现。但是,使用 Kafka,在数据库中插入新行后,我可以更快(几乎立即)传输数据。

感谢您的任何提示或建议。


感谢@wardziniak,我能够设置它。

query=select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p
topic.prefix=test-mysql-jdbc-
incrementing.column.name=id

但是,我期待一个主题test-mysql-jdbc-myVeryOwnTable,所以我已经注册了我的消费者。但是,使用上面显示的查询表名称被跳过,所以我的主题完全按照上面定义的前缀命名。所以我刚刚更新了我的属性topic.prefix=test-mysql-jdbc-myVeryOwnTable,它似乎工作得很好。

标签: apache-kafkaapache-kafka-connect

解决方案


您可以在 Jdbc Source Connectorquery属性中使用子查询。

示例 JDBC 源连接器配置:

{
    ...
    "query": "select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p",
    "incrementing.column.name": "id",
    ...
}

推荐阅读