apache-flink - 如何使用 flink streamtable api 从 jdbc 读取流数据
问题描述
发现jdbc连接器部分只有DDL和yaml格式配置,不知道怎么用,所以想问下写flink app jar时如何从jdbc数据源读取流数据。如果可以,如果数据源中的数据发生更改,流是否会更新。
解决方案
Flink 1.11 支持从 JDBC 数据库中提取 CDC 流。参见FLIP-105。这将满足您的要求,包括在更改基础数据库表时更新流。
有关 Flink 1.10 中已有功能的示例,请参阅Timo Walther 和 Fabian Hueske的 Flink Forward 演讲中展示的Flink SQL 演示。例如,在 Flink 1.10 中,您可以使用 MySQL 中的查找表连接流。在演示中(链接到上面),这是通过使用 Hive 目录来描述一些 MySQL 表来完成的,然后是这个查询
SELECT
l_proctime AS `querytime`,
l_orderkey AS `order`,
l_linenumber AS `linenumber`,
l_currency AS `currency`,
rs_rate AS `cur_rate`,
(l_extendedprice * (1 - l_discount) * (1 + l_tax)) / rs_rate AS `open_in_euro`
FROM prod_lineitem
JOIN hive.`default`.prod_rates FOR SYSTEM_TIME AS OF l_proctime ON rs_symbol = l_currency
WHERE
l_linestatus = 'O';
用于使用存储在 MySQL 中的当前汇率计算欧元标准化金额。
推荐阅读
- c++ - 如何在 C++ 中拆分没有 stringstream 和 strtok 的字符串(提取单词)?
- reactjs - AppBar中左右对齐的图标都带有material-ui next
- ios - 尝试使用 FileManager 移动项目时出错
- html - 间距调整
行标签 - python - 如何在 Python 井字游戏中切换玩家
- java - 使用 KeyListener 移动矩形
- sql - 如何在sql中更新具有平均每周值的列
- angular - Nebular NbEmailPassAuthProvider 显然调用了 API 方法,但 NbLoginComponent 仍然显示“出现问题”消息
- jquery - 如何在 jquery 'each' 循环中对 json 值进行分组和排序
- mysql - 无法在我的实时服务器上维护数据库连接,在本地工作正常。节点.js + mysql