首页 > 解决方案 > 如何使用 flink streamtable api 从 jdbc 读取流数据

问题描述

发现jdbc连接器部分只有DDL和yaml格式配置,不知道怎么用,所以想问下写flink app jar时如何从jdbc数据源读取流数据。如果可以,如果数据源中的数据发生更改,流是否会更新。

标签: apache-flinkflink-streaming

解决方案


Flink 1.11 支持从 JDBC 数据库中提取 CDC 流。参见FLIP-105。这将满足您的要求,包括在更改基础数据库表时更新流。

有关 Flink 1.10 中已有功能的示例,请参阅Timo Walther 和 Fabian Hueske的 Flink Forward 演讲中展示的Flink SQL 演示。例如,在 Flink 1.10 中,您可以使用 MySQL 中的查找表连接流。在演示中(链接到上面),这是通过使用 Hive 目录来描述一些 MySQL 表来完成的,然后是这个查询

SELECT
  l_proctime AS `querytime`,
  l_orderkey AS `order`,
  l_linenumber AS `linenumber`,
  l_currency AS `currency`,
  rs_rate AS `cur_rate`, 
  (l_extendedprice * (1 - l_discount) * (1 + l_tax)) / rs_rate AS `open_in_euro`
FROM prod_lineitem
JOIN hive.`default`.prod_rates FOR SYSTEM_TIME AS OF l_proctime ON rs_symbol = l_currency
WHERE
  l_linestatus = 'O';

用于使用存储在 MySQL 中的当前汇率计算欧元标准化金额。


推荐阅读