apache-kafka - 如何使用 Kafka JDBC 连接器跟踪具有特定列值的行(按 id)?
问题描述
我有一个包含大量记录的表。有一列定义了记录的类型。我想在该列中收集具有特定值的记录。有点儿:
Select * FROM myVeryOwnTable WHERE type = "VERY_IMPORTANT_TYPE"
我注意到WHERE
,当我选择增量(+时间戳)模式时,我不能在自定义查询中使用子句,否则我需要小心自己过滤。我想实现的背景是我使用 Logstash 将某种类型的数据从 MySQL 传输到 ES。这很容易通过使用可以包含 where 子句的查询来实现。但是,使用 Kafka,在数据库中插入新行后,我可以更快(几乎立即)传输数据。
感谢您的任何提示或建议。
感谢@wardziniak,我能够设置它。
query=select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p
topic.prefix=test-mysql-jdbc-
incrementing.column.name=id
但是,我期待一个主题test-mysql-jdbc-myVeryOwnTable
,所以我已经注册了我的消费者。但是,使用上面显示的查询表名称被跳过,所以我的主题完全按照上面定义的前缀命名。所以我刚刚更新了我的属性topic.prefix=test-mysql-jdbc-myVeryOwnTable
,它似乎工作得很好。
解决方案
您可以在 Jdbc Source Connectorquery
属性中使用子查询。
示例 JDBC 源连接器配置:
{
...
"query": "select * from (select * from myVeryOwnTable p where type = 'VERY_IMPORTANT_TYPE') p",
"incrementing.column.name": "id",
...
}
推荐阅读
- firebase - Firebase 身份验证:邀请用户加入公司
- php - 基于数组中的数据创建 SQL 查询
- node.js - 如何在 create-react-app 的 package.json 文件中定义 PROXY?
- c - 如何传递字符串和文件以供 C 中的 open 函数使用?
- java - 在 Ant Design 中获取成功警报消息的文本并使用 Selenium 进行验证
- reactjs - 如何使用 axios 通过 webpack 开发服务器向远程服务器上的 API 发送请求
- android - 如何使我的应用程序包含在文件管理器中每个 txt 文件的“打开方式”或“共享”中?
- android - 用户是否可以在运行时查看变量?
- c# - 在 Linux 上使用 ONVIF 的 .wsdl 文件生成 C# 脚本
- deployment - 以下部署策略在 Kubernetes 中意味着什么?